Yandex Cloud
  • Services
  • Solutions
  • Why Yandex Cloud
  • Pricing
  • Documentation
  • Contact us
Get started
Language / Region
© 2022 Yandex.Cloud LLC
Yandex Data Proc
  • Practical guidelines
    • Working with jobs
      • Overview
      • Working with Hive jobs
      • Working with MapReduce jobs
      • Working with PySpark jobs
      • Working with Spark jobs
      • Using Apache Hive
      • Running Spark applications
      • Running applications from a remote host
    • Configuring networks for Data Proc clusters
    • Using Yandex Object Storage in Data Proc
  • Step-by-step instructions
    • All instructions
    • Information about existing clusters
    • Creating clusters
    • Connecting to clusters
    • Editing clusters
    • Updating subclusters
    • Managing subclusters
    • Sqoop usage
    • Managing jobs
      • All jobs
      • Spark jobs
      • PySpark jobs
      • Hive jobs
      • MapReduce jobs
    • Deleting clusters
    • Monitoring the state of a cluster and hosts
    • Working with logs
  • Concepts
    • Data Proc overview
    • Host classes
    • Hadoop and component versions
    • Component interfaces and ports
    • Component web interfaces
    • Jobs in Data Proc
    • Autoscaling
    • Decommissioning subclusters and hosts
    • Network in Data Proc
    • Quotas and limits
    • Storage in Data Proc
    • Component properties
    • Logs in Data Proc
    • Initialization scripts
  • Access management
  • Pricing policy
  • API reference
    • Authentication in the API
    • gRPC
      • Overview
      • ClusterService
      • JobService
      • ResourcePresetService
      • SubclusterService
      • OperationService
    • REST
      • Overview
      • Cluster
        • Overview
        • create
        • delete
        • get
        • list
        • listHosts
        • listOperations
        • listUILinks
        • start
        • stop
        • update
      • Job
        • Overview
        • cancel
        • create
        • get
        • list
        • listLog
      • ResourcePreset
        • Overview
        • get
        • list
      • Subcluster
        • Overview
        • create
        • delete
        • get
        • list
        • update
  • Releases
    • Images
  • Questions and answers
  1. Practical guidelines
  2. Working with jobs
  3. Working with PySpark jobs

Working with PySpark jobs

Written by
Yandex Cloud
  • Before you start
  • Create a PySpark job
  • Delete the resources you created

Apache Spark is a distributed processing framework for unstructured and semi-structured data and a part of the Hadoop project ecosystem.

In this section, we provide a simple example that demonstrates how to use PySpark, the Spark interface for Python, in Data Proc. In the example, we use PySpark to count the number of times each word is seen in a short text.

Before you start

  1. Create a service account with the mdb.dataproc.agent role.

  2. In Object Storage, create buckets and configure access to them:

    1. Create a bucket for the input data and grant the cluster service account READ permissions for this bucket.
    2. Create a bucket for the processing output and grant the cluster service account READ and WRITE permissions for this bucket.
  3. Create a Data Proc cluster with the following configuration:

    • Services:
      • HDFS
      • SPARK
      • YARN
    • Service account: Select the service account with the mdb.dataproc.agent role you created earlier.
    • Bucket name: Select a bucket to hold the processing output.

Create a PySpark job

  1. Download a file and upload it to the input data bucket for processing: text.txt:

    text.txt
    she sells sea shells on the sea shore
    the shells that she sells are sea shells I am sure
    so if she sells sea shells on the sea shore
    I am sure that the shells are sea shore shells
    
  2. Download a file and upload it to the input data bucket containing the Python code for the word_count.py analysis routine:

    word_count.py
    import sys
    from pyspark import SparkConf, SparkContext
    
    
    def main():
    
        if len(sys.argv) != 3:
            print('Usage job.py <input_dir> <output_dir>')
            sys.exit(1)
    
        in_dir = sys.argv[1]
        out_dir = sys.argv[2]
    
        conf = SparkConf().setAppName("Word count - PySpark")
        sc = SparkContext(conf=conf)
    
        text_file = sc.textFile(in_dir)
        counts = text_file.flatMap(lambda line: line.split(" ")) \
            .map(lambda word: (word, 1)) \
            .reduceByKey(lambda a, b: a + b)
    
        if out_dir.startswith('s3a://'):
            counts.saveAsTextFile(out_dir) 
        else:
            default_fs = sc._jsc.hadoopConfiguration().get('fs.defaultFS')
            counts.saveAsTextFile(default_fs + out_dir)
    
    
    if __name__ == "__main__":
        main()
    
  3. Create a PySpark job with the following parameters:

    • Main python file: s3a://<input data bucket name>/word_count.py

    • Job arguments:

      • s3a://<input data bucket name>/text.txt
      • s3a://<output processing bucket name>/<output folder>
  4. Wait for the job status to change to Done.

  5. Download and review the output files from the bucket:

    part-00000
    ('sea', 6)
    ('are', 2)
    ('am', 2)
    ('sure', 2)
    
    part-00001
    ('she', 3)
    ('sells', 3)
    ('shells', 6)
    ('on', 2)
    ('the', 4)
    ('shore', 3)
    ('that', 2)
    ('I', 2)
    ('so', 1)
    ('if', 1)
    

Delete the resources you created

If you no longer need these resources, delete them:

  1. Delete the cluster.
  2. Delete buckets.
  3. Delete the service account.

Was the article helpful?

Language / Region
© 2022 Yandex.Cloud LLC
In this article:
  • Before you start
  • Create a PySpark job
  • Delete the resources you created