Using Yandex Object Storage in Data Proc
This section describes various ways that processes running in Data Proc clusters can access objects from Object Storage buckets.
Note
Configure a cluster network before setting up access to Yandex Cloud services and internet resources.
Component settings impact bucket file read and write performance:
- The settings specified when creating a cluster affect all the jobs running in the cluster.
- The settings specified when creating jobs override cluster-level settings and can be job-specific.
DistCp
To copy files from Object Storage to HDFS, use the DistCp
To authenticate in Object Storage, you can use one of the following approaches:
- Use the IAM token of the cluster service account.
- Use CredentialProvider
. - Pass the
access key
andsecret key
parameters of static access keys when creating a job.
Accessing S3 with authentication via the IAM token of a cluster service account
-
When creating a cluster, specify a service account. If the cluster is already created, add a service account using the Edit cluster button in the management console.
-
The service account must have access to the appropriate bucket. To do this, grant the service account privileges in the bucket ACL, or the
storage.viewer
orstorage.editor
roles.For more information about these roles, see the Object Storage documentation.
For example, get a list of files located in the
yc-mdb-examples
public bucket at the pathdataproc/example01/set01
. To do this, connect to the cluster and run the command:hadoop fs -ls s3a://yc-mdb-examples/dataproc/example01/set01
Result:
Found 12 items -rw-rw-rw- 1 root root 19327838 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_1.parquet -rw-rw-rw- 1 root root 21120204 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_10.parquet -rw-rw-rw- 1 root root 20227757 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/ ...
Copying via CredentialProvider
To use a secret storage provider, place the secrets within the components that need access to Object Storage. To do this, you can use JCEKS
In this example, you first create a file with secrets and then place it in HDFS:
-
Specify the static and secret keys, e.g.:
hadoop credential create fs.s3a.access.key \ -value <static_key> \ -provider localjceks://file/home/jack/yc.jceks && \ hadoop credential create fs.s3a.secret.key \ -value <secret_key> \ -provider localjceks://file/home/jack/yc.jceks
-
Copy the secrets file to your local HDFS:
hdfs dfs -put /home/jack/yc.jceks /user/root/
-
Copy the file from Object Storage directly to HDFS:
hadoop distcp \ -D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \ -D hadoop.security.credential.provider.path=jceks://hdfs/user/root/yc.jceks \ -update \ -skipcrccheck \ -numListstatusThreads 10 \ s3a://yc-mdb-examples/dataproc/example01/set01 \ hdfs://<HDFS_host>/<path>/
<HDFS_host>
is the target HDFS server you use. You can get the default server using the command:hdfs getconf -confKey fs.defaultFS
Example of the command to copy files from the bucket:
hadoop distcp \
-D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \
-D hadoop.security.credential.provider.path=jceks://hdfs/user/root/yc.jceks \
-update \
-skipcrccheck \
-numListstatusThreads 10 \
s3a://yc-mdb-examples/dataproc/example01/set01 \
hdfs://rc1b-dataproc-m-d31bs470ivkyrz60.mdb.yandexcloud.net/user/root/datasets/set01/
Copying files by passing keys in arguments
Instead of creating a secrets file, you can pass keys in the command arguments:
hadoop distcp \
-D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \
-D fs.s3a.bucket.dataproc-examples.access.key=<static_key> \
-D fs.s3a.bucket.dataproc-examples.secret.key=<secret_key> \
-update \
-skipcrccheck \
-numListstatusThreads 10 \
s3a://yc-mdb-examples/dataproc/example01/set01 \
hdfs://rc1b-dataproc-m-d31bs470ivkyrz60.mdb.yandexcloud.net/user/root/datasets/set01/
Optimizing file reads from Object Storage
The method for reading data from a bucket depends on the fs.s3a.experimental.input.fadvise
setting
- In image versions
1.0
through1.4
, the default value issequential
. It is a good choice for sequential file reads, but slow for random access. If you use random file access more frequently, addrandom
to the cluster component properties or job settings. - For version
2.0
images, the default isnormal
: files are accessed sequentially but if an application is performing random access operations, the mode automatically switches torandom
.
For more information on the component versions used, see Runtime environment.
Optimizing file writes to Object Storage
To speed up file writes to Object Storage, you can:
Using S3A committers
S3A committers are Apache Hadoop software modules used for writing data to object storage over the S3 protocol to ensure efficient and near-atomic commits of the changes made. For more information, see the Apache Hadoop
Note
S3A committers are not used or required for operations with tables that are managed using the tools of the DeltaLake
S3A committers run in three basic modes:
Mode | Environment | HDFS is required | Writing data to partitionedtables | Write speed |
---|---|---|---|---|
directory |
MapReduce, Spark | Yes* | Complete overwrite | Standard |
magic |
MapReduce, Spark | No (data is written directly to S3) | Not supported | Maximum |
partitioned |
Spark | Yes* | Replacing partitions and appending them | Standard |
* In directory
and partitioned
modes, no checks are made for whether there is HDFS for storing intermediate data. Some jobs may be successfully completed with no HDFS used. However, this might cause issues with complex jobs, such as "file not found" errors or incomplete uploads of job results to Object Storage.
To enable S3A committers, specify the values of the following settings:
core:fs.s3a.committer.magic.enabled : true
if jobs are to run inmagic
mode.core:fs.s3a.committer.name
: Default mode (directory
,magic
, orpartitioned
).core:fs.s3a.committer.staging.abort.pending.uploads : false
for Hadoop 3.2.2 as part of the Data Proc image version 2.0 orcore:fs.s3a.committer.abort.pending.uploads : false
for Hadoop 3.3.2 as part of the image version 2.1, if multiple concurrent jobs are writing data to the same table.core:mapreduce.outputcommitter.factory.scheme.s3a : org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
.spark:spark.hadoop.fs.s3a.committer.name
: Default mode (directory
,magic
, orpartitioned
).spark:spark.sql.parquet.output.committer.class : org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
.spark:spark.sql.sources.commitProtocolClass : org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
.- (Optional)
core:fs.s3a.committer.staging.conflict-mode
: Action to perform if any existing data partitions are found in the target table (forpartitioned
mode):append
: Append new data to an existing partition.fail
: When making an attempt to overwrite the existing partition, the job fails.replace
: Data in the existing partition is replaced with a new partition's data.
The used S3A committer mode may be overridden for a specific job by setting fs.s3a.committer.name
and spark.hadoop.fs.s3a.committer.name
to the appropriate value (directory
, magic
, or partitioned
).
Do not change the default spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version
setting value, since Yandex Object Storage does not support atomic directory renames.
Apache Hadoop settings
The method of writing data to a Object Storage bucket depends on the core:fs.s3a.fast.upload
setting. Its value depends on the image version used:
- In image versions
1.0
through1.4
, the default value isfalse
to save RAM. Set this property totrue
in the cluster component properties or the job settings. This will improve bucket write performance for large files and prevent node storage from filling up. - In image
2.0
, thefs.s3a.fast.upload
setting is enabled by default.
If required, set values for other settings
fs.s3a.committer.threads
: Number of threads that are committing changes in Object Storage when the job is completed.fs.s3a.connection.maximum
: Number of allowed Object Storage connections.fs.s3a.connection.timeout
: Maximum Object Storage connection timeout in milliseconds.fs.s3a.fast.upload.active.blocks
: Maximum number of blocks in a single output stream.fs.s3a.fast.upload.buffer
: Type of buffer used for the temporary storage of uploaded data:disk
: Data is saved to the folder specified in thefs.s3a.buffer.dir
setting.array
: Arrays on the JVM heap are used.bytebuffer
: RAM from outside the JVM heap is used.
fs.s3a.max.total.tasks
: Size of queued Object Storage bucket operations that cannot be run due to reaching the thread limit.fs.s3a.multipart.size
: Size of chunks in bytes that data bucket copy or upload operations will partition the data into.fs.s3a.threads.max
: Number of threads in the AWS Transfer Manager.
Note
Large values of these parameters might cause an increase in the usage of computing resources on Data Proc cluster hosts.
For more information, see the Apache Hadoop documentation
Apache Spark settings
When accessing data in Object Storage from Spark jobs, we recommend setting spark.sql.hive.metastorePartitionPruning
to true
.
When working with data in Parquet format, the following Spark job settings are recommended:
spark.hadoop.parquet.enable.summary-metadata : false
spark.sql.parquet.mergeSchema : false
spark.sql.parquet.filterPushdown : true
When working with data in Orc format, the following Spark job settings are recommended:
spark.sql.orc.cache.stripe.details.size : 10000
spark.sql.orc.filterPushdown : true
spark.sql.orc.splits.include.file.footer : true
It may take a long time for the jobs creating or updating a large number (hundreds and thousands) of table partitions to update partition records in Hive Metastore. To speed up this process, increase the values of the following settings:
hive:datanucleus.connectionPool.maxPoolSize
: Maximum size of the Hive Metastore DB connection pool.hive:hive.metastore.fshandler.threads
: Number of threads running background operations with the file system within Hive Metastore.spark:spark.sql.addPartitionInBatch.size
: Number of partitions updated per Hive Metastore call. The optimal value is10 × <hive:hive.metastore.fshandler.threads_setting_value>
or higher.
Note
If these parameters are set to too large a value, you might run out of Hive Metastore system resources. If the size of the Hive Metastore database connection pool is large, you might need to change the settings and increase the amount of your cluster's computing resources.
For more information, see the Apache Spark documentation
Using s3fs
s3fs
allows you to mount Object Storage buckets using Fuse. Read more about using the utility at s3fs.
Using Object Storage from Spark
Implement the desired access option:
-
Using JCEKS:
sc.hadoopConfiguration.set("fs.s3a.endpoint", "storage.yandexcloud.net"); sc.hadoopConfiguration.set("fs.s3a.signing-algorithm", ""); sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); sc.hadoopConfiguration.set("hadoop.security.credential.provider.path", "jceks://hdfs/<path_to_JCEKS_file>");
-
Using your access key and secret:
sc.hadoopConfiguration.set("fs.s3a.endpoint", "storage.yandexcloud.net"); sc.hadoopConfiguration.set("fs.s3a.signing-algorithm", ""); sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); sc.hadoopConfiguration.set("fs.s3a.access.key","<access_key>"); sc.hadoopConfiguration.set("fs.s3a.secret.key","<bucket_secret>");
You can then read the file from Object Storage:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.parquet("s3a://<bucket_name>/<object_path>")
Select the access method:
-
Accessing the Object Storage objects using JCEKS:
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "storage.yandexcloud.net") sc._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "") sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") sc._jsc.hadoopConfiguration().set("hadoop.security.credential.provider.path", "jceks://hdfs/<path_to_JCEKS_file>")
-
Reading a file using an access key and bucket secret:
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "storage.yandexcloud.net") sc._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "") sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") sc._jsc.hadoopConfiguration().set("fs.s3a.access.key","<access_key>") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key","<bucket_secret>")
Once you have access, you can read the file directly from Object Storage:
from pyspark.sql import SQLContext
sql = SQLContext(sc)
df = sql.read.parquet("s3a://<bucket_name>/<object_path>")
For more information, see Spark settings for integration with Yandex Object Storage.