Using Yandex Object Storage in Data Proc
This section describes various ways that processes running in Data Proc clusters can access objects from Object Storage buckets.
Note
Before you begin setting up access to Yandex.Cloud services and internet resources, make sure that the cluster network is configured properly.
DistCp
To copy files from Object Storage to HDFS, we recommend that you use the DistCp utility designed for copying data both within clusters and between clusters and external storages.
You can use two approaches to authenticate in Object Storage:
- Use CredentialProvider.
- Pass the
access key
andsecret key
parameters when the job starts.
Copying via CredentialProvider
To use a secret storage provider, place the secrets within the components that need access to Object Storage. To do this, you can use JCEKS (Java Cryptography Extension KeyStore): in this example, first you create a file with secrets and then place it in HDFS.
-
Specify the
access key
andsecret key
, for example:hadoop credential create fs.s3a.access.key -value <access key> -provider localjceks://file/home/jack/yc.jceks hadoop credential create fs.s3a.secret.key -value <secret key> -provider localjceks://file/home/jack/yc.jceks
-
Copy the secrets file to your local HDFS:
hdfs dfs -put /home/jack/yc.jceks /user/root/
-
Copy the file from Object Storage directly to HDFS:
hadoop distcp \ -D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \ -D hadoop.security.credential.provider.path=jceks://hdfs/user/root/yc.jceks \ -update \ -skipcrccheck \ -numListstatusThreads 10 \ s3a://yc-mdb-examples/dataproc/example01/set01 \ hdfs://<HDFS host>/<path>/
<HDFS host>
is the target HDFS server you use. You can get the default server using the command:hdfs getconf -confKey fs.defaultFS
Example of the command to copy files from the bucket:
hadoop distcp \
-D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \
-D hadoop.security.credential.provider.path=jceks://hdfs/user/root/yc.jceks \
-update \
-skipcrccheck \
-numListstatusThreads 10 \
s3a://yc-mdb-examples/dataproc/example01/set01 \
hdfs://rc1b-dataproc-m-d31bs470ivkyrz60.mdb.yandexcloud.net/user/root/datasets/set01/
Copying files by passing keys in arguments
Instead of creating a secrets file, you can pass keys in command arguments:
hadoop distcp \
-D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \
-D fs.s3a.bucket.dataproc-examples.access.key=<access_key> \
-D fs.s3a.bucket.dataproc-examples.secret.key=<secret_key> \
-update \
-skipcrccheck \
-numListstatusThreads 10 \
s3a://yc-mdb-examples/dataproc/example01/set01 \
hdfs://rc1b-dataproc-m-d31bs470ivkyrz60.mdb.yandexcloud.net/user/root/datasets/set01/
Using s3fs
s3fs
lets you mount Object Storage buckets using Fuse. Read more at s3fs
Using Object Storage from Spark
Implement the desired access option:
-
Using JCEKS:
sc.hadoopConfiguration.set("fs.s3a.endpoint", "storage.yandexcloud.net"); sc.hadoopConfiguration.set("hadoop.security.credential.provider.path", "jceks://hdfs/<path to JCEKS file>");
-
Using your access key and secret:
sc.hadoopConfiguration.set("fs.s3a.endpoint", "storage.yandexcloud.net"); sc.hadoopConfiguration.set("fs.s3a.access.key","<access key>>"); sc.hadoopConfiguration.set("fs.s3a.secret.key","<secret_key>");
You can then read the file from Object Storage:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.parquet("s3a://<bucket name>/<object path>")
Select the access method:
-
Accessing the Object Storage objects using JCEKS:
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "storage.yandexcloud.net") sc._jsc.hadoopConfiguration().set("hadoop.security.credential.provider.path", "jceks://hdfs/<path to JCEKS file>")
-
Reading a file using an access key and bucket secret:
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "storage.yandexcloud.net") sc._jsc.hadoopConfiguration().set("fs.s3a.access.key","<access key>") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key","<bucket secret>")
Once you have access, you can read the file directly from Object Storage:
sql = SQLContext(sc)
df = sql.read.parquet("s3a://<bucket_name>/<path_to_file_or_directory>")