Working with PySpark jobs
Apache Spark is a distributed processing framework for unstructured and semi-structured data and a part of the Hadoop project ecosystem.
In this section, we provide a simple example that demonstrates how to use PySpark, the Spark interface for Python, in Data Proc. In the example, we use PySpark to count the number of times each word is seen in a short text.
Before you start
-
Create a service account with the
mdb.dataproc.agent
role. -
In Object Storage, create buckets and configure access to them:
- Create a bucket for the input data and grant the cluster service account
READ
permissions for this bucket. - Create a bucket for the processing output and grant the cluster service account
READ and WRITE
permissions for this bucket.
- Create a bucket for the input data and grant the cluster service account
-
Create a Data Proc cluster with the following configuration:
- Services:
HDFS
SPARK
YARN
- Service account: Select the service account with the
mdb.dataproc.agent
role you created earlier. - Bucket name: Select a bucket to hold the processing output.
- Services:
Create a PySpark job
-
Download a file and upload it to the input data bucket for processing: text.txt:
text.txtshe sells sea shells on the sea shore the shells that she sells are sea shells I am sure so if she sells sea shells on the sea shore I am sure that the shells are sea shore shells
-
Download a file and upload it to the input data bucket containing the Python code for the word_count.py analysis routine:
word_count.pyimport sys from pyspark import SparkConf, SparkContext def main(): if len(sys.argv) != 3: print('Usage job.py <input_dir> <output_dir>') sys.exit(1) in_dir = sys.argv[1] out_dir = sys.argv[2] conf = SparkConf().setAppName("Word count - PySpark") sc = SparkContext(conf=conf) text_file = sc.textFile(in_dir) counts = text_file.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b) if out_dir.startswith('s3a://'): counts.saveAsTextFile(out_dir) else: default_fs = sc._jsc.hadoopConfiguration().get('fs.defaultFS') counts.saveAsTextFile(default_fs + out_dir) if __name__ == "__main__": main()
-
Create a PySpark job with the following parameters:
-
Main python file:
s3a://<input data bucket name>/word_count.py
-
Job arguments:
s3a://<input data bucket name>/text.txt
s3a://<output processing bucket name>/<output folder>
-
-
Wait for the job status to change to
Done
. -
Download and review the output files from the bucket:
part-00000('sea', 6) ('are', 2) ('am', 2) ('sure', 2)
part-00001('she', 3) ('sells', 3) ('shells', 6) ('on', 2) ('the', 4) ('shore', 3) ('that', 2) ('I', 2) ('so', 1) ('if', 1)
Delete the resources you created
If you no longer need these resources, delete them: