Working with Spark jobs
Apache Spark
In this section, we provide a simple example that demonstrates how to use the Spark interface for Scala and Java in Yandex Data Proc. In the example, we use Spark to count the number of times each word is seen in a short text.
Getting started
-
Create a service account with the
mdb.dataproc.agent
role. -
In Object Storage, create buckets and configure access to them:
- Create a bucket for the input data and grant the cluster service account
READ
permissions for this bucket. - Create a bucket for the processing output and grant the cluster service account
READ and WRITE
permissions for this bucket.
- Create a bucket for the input data and grant the cluster service account
-
Create a Yandex Data Proc cluster with the following settings:
- Services:
HDFS
SPARK
YARN
- Service account: Select the previously created service account with the
mdb.dataproc.agent
role. - Bucket name: Select a bucket to hold the processing results.
- Services:
Create a Spark job
-
Download the text.txt
file and upload it to the input data bucket for processing:text.txtshe sells sea shells on the sea shore the shells that she sells are sea shells I am sure so if she sells sea shells on the sea shore I am sure that the shells are sea shore shells
-
Download a .jar file
and upload it to the input data bucket. It contains the Scala code for the word_count.scala analysis routine:word_count.scalapackage com.yandex.cloud.dataproc.scala import org.apache.spark.{SparkConf, SparkContext} object Main { def main(args: Array[String]) { if (args.length != 2){ // check number of args System.err.println("Usage spark-app.jar <input_directory> <output_directory>"); System.exit(-1); } val inDir = args(0); //input URI val outDir = args(1); //output URI val conf = new SparkConf().setAppName("Word count - Scala App") val sc = new SparkContext(conf) val text_file = sc.textFile(inDir) val counts = text_file.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) val defaultFS = sc.hadoopConfiguration.get("fs.defaultFS") if (outDir.toLowerCase().startsWith("s3a://")) { counts.saveAsTextFile(outDir) } else { counts.saveAsTextFile(defaultFS + "/" + outDir) } sc.stop() } }
For more information about building an application written in Scala for Spark, see Using Spark Submit.
-
Create a Spark job with the following parameters:
- Main jar:
s3a://<input_data_bucket_name>/spark-app_2.11-0.1.0-SNAPSHOT.jar
- Main class:
com.yandex.cloud.dataproc.scala.Main
- Arguments:
s3a://<input_data_bucket_name>/text.txt
s3a://<processing_output_bucket_name>/<output_folder>
- Main jar:
-
Wait for the job status to change to
Done
. -
Download from the bucket and review the files with the results from the bucket:
part-00000(are,2) (am,2) (she,3) (so,1)
part-00001(shore,3) (if,1) (that,2) (on,2) (shells,6) (I,2) (sure,2) (sea,6) (the,4) (sells,3)
Delete the resources you created
Some resources are not free of charge. To avoid paying for them, delete the resources you no longer need: