Working with PySpark jobs
Apache Spark
In this section, we provide a simple example that demonstrates how to use PySpark
Getting started
-
Create a service account with the
mdb.dataproc.agent
role. -
In Object Storage, create buckets and configure access to them:
- Create a bucket for the input data and grant the cluster service account
READ
permissions for this bucket. - Create a bucket for the processing output and grant the cluster service account
READ and WRITE
permissions for this bucket.
- Create a bucket for the input data and grant the cluster service account
-
Create a Data Proc cluster with the following settings:
- Services:
HDFS
SPARK
YARN
- Service account: Select the previously created service account with the
mdb.dataproc.agent
role. - Bucket name: Select a bucket to hold the processing results.
- Services:
Create a PySpark job
-
Download the text.txt
file and upload it to the input data bucket for processing:text.txtshe sells sea shells on the sea shore the shells that she sells are sea shells I am sure so if she sells sea shells on the sea shore I am sure that the shells are sea shore shells
-
Download a file and upload it to the input data bucket containing the Python code for the word_count.py
analysis routine:word_count.pyimport sys from pyspark import SparkConf, SparkContext def main(): if len(sys.argv) != 3: print('Usage job.py <input_directory> <output_directory>') sys.exit(1) in_dir = sys.argv[1] out_dir = sys.argv[2] conf = SparkConf().setAppName("Word count - PySpark") sc = SparkContext(conf=conf) text_file = sc.textFile(in_dir) counts = text_file.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b) if out_dir.startswith('s3a://'): counts.saveAsTextFile(out_dir) else: default_fs = sc._jsc.hadoopConfiguration().get('fs.defaultFS') counts.saveAsTextFile(default_fs + out_dir) if __name__ == "__main__": main()
-
Create a PySpark job with the following parameters:
-
Main python file:
s3a://<input_data_bucket_name>/word_count.py
-
Arguments:
s3a://<input_data_bucket_name>/text.txt
s3a://<processing_output_bucket_name>/<output_folder>
-
-
Wait for the job status to change to
Done
. -
Download from the bucket and review the files with the results from the bucket:
part-00000('sea', 6) ('are', 2) ('am', 2) ('sure', 2)
part-00001('she', 3) ('sells', 3) ('shells', 6) ('on', 2) ('the', 4) ('shore', 3) ('that', 2) ('I', 2) ('so', 1) ('if', 1)
Delete the resources you created
Some resources are not free of charge. To avoid paying for them, delete the resources you no longer need: