In How many ways pyspark script can be executed? Detailed explanation

by | Jul 7, 2024 | Pyspark | 0 comments

PySpark scripts can be executed in various environments and through multiple methods, each with its own configurations and settings. Here’s a detailed overview of the different ways to execute PySpark scripts:

1. Using spark-submit Command

The spark-submit command is used to submit applications to a Spark cluster. It has various options to configure the execution environment, specify application details, manage resources, and more. Below are the details of the spark-submit command along with examples explaining all the options.

Basic Syntax

spark-submit [options] <app-jar | python-file> [app-arguments]

Key Options

Application Properties

  • --class CLASS_NAME: The entry point for your application (required for Java/Scala apps).

  • --master MASTER_URL: The master URL for the cluster (local, yarn, mesos, k8s, etc.)

  • --conf PROP=VALUE: Arbitrary Spark configuration properties.

spark-submit --class com.example.MyApp --master local[4] myapp.jar

Resource Management

  • --driver-memory MEM: Memory for driver (e.g., 512M, 2G).
  • --driver-cores NUM: Number of cores for the driver (YARN and standalone).
  • --executor-memory MEM: Memory per executor (e.g., 1G, 2G).
  • --executor-cores NUM: Number of cores per executor.
  • --num-executors NUM: Number of executors (YARN and standalone).
  • --total-executor-cores NUM: Total number of cores across all executors (standalone and Mesos).

spark-submit --master yarn --deploy-mode cluster --driver-memory 4G --executor-memory 8G --num-executors 10 myapp.py

YARN Cluster Mode Options

  • --queue QUEUE_NAME: The YARN queue to submit to.
  • --files FILES: Comma-separated list of files to be distributed with the job.
  • --archives ARCHIVES: Comma-separated list of archives to be distributed with the job.
  • --principal PRINCIPAL: Principal to be used for Kerberos authentication.
  • --keytab KEYTAB: Keytab to be used for Kerberos authentication.

spark-submit --master yarn --deploy-mode cluster --queue default --num-executors 20 --executor-memory 4G --executor-cores 4 myapp.py

Kubernetes Cluster Mode Options

  • --kubernetes-namespace NAMESPACE: The namespace to use in the Kubernetes cluster.
  • --conf spark.kubernetes.container.image=IMAGE: Docker image to use for the Spark driver and executors.

spark-submit --master k8s://https://<k8s-apiserver>:<k8s-port> --deploy-mode cluster --name myapp --conf spark.executor.instances=5 --conf spark.kubernetes.container.image=spark:latest myapp.py

JAR Dependencies and Files

  • --jars JARS: Comma-separated list of JARs to include on the driver and executor classpaths.--packages PACKAGES: Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths.--py-files PY_FILES: Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps.

spark-submit --master yarn --deploy-mode cluster --jars mysql-connector-java-5.1.45-bin.jar --py-files dependencies.zip myapp.py

Python-Specific Options

  • --py-files: Additional Python files to add to the PYTHONPATH.--archives: Comma-separated list of archives to be extracted into the working directory of each executor.

spark-submit --master yarn --deploy-mode cluster --py-files dependencies.zip myapp.py

Advanced Options

  • --properties-file FILE: Path to a file from which to load extra properties.--driver-java-options OPTIONS: Extra Java options to pass to the driver.--driver-library-path PATH: Extra library path entries to pass to the driver.--driver-class-path CLASS_PATH: Extra classpath entries to pass to the driver.

spark-submit --properties-file spark-defaults.conf --driver-java-options "-Dlog4j.configuration=file:log4j.properties" myapp.py

Comprehensive Example

spark-submit \
  --class com.example.MyApp \
  --master yarn \
  --deploy-mode cluster \
  --name MySparkApp \
  --driver-memory 4G \
  --executor-memory 8G \
  --num-executors 10 \
  --executor-cores 4 \
  --queue default \
  --jars mysql-connector-java-5.1.45-bin.jar \
  --files hdfs:///user/config/config.json \
  --py-files dependencies.zip \
  --conf spark.executor.extraJavaOptions="-XX:+PrintGCDetails -Dkey=value" \
  --properties-file spark-defaults.conf \
  hdfs:///user/jars/myapp.jar \
  arg1 arg2 arg3

Explanation

  • Basic Information:
    • --class com.example.MyApp: Specifies the entry point for a Java/Scala application.
    • --master yarn: Sets YARN as the cluster manager.
    • --deploy-mode cluster: Specifies cluster mode for execution.
  • Resource Allocation:
    • --driver-memory 4G: Allocates 4 GB of memory for the driver.
    • --executor-memory 8G: Allocates 8 GB of memory for each executor.
    • --num-executors 10: Requests 10 executors.
    • --executor-cores 4: Allocates 4 cores for each executor.
  • YARN and Dependencies:
    • --queue default: Submits the job to the default YARN queue.
    • --jars mysql-connector-java-5.1.45-bin.jar: Includes an additional JAR file.
    • --files hdfs:///user/config/config.json: Distributes a configuration file.
    • --py-files dependencies.zip: Distributes additional Python files.
  • Configuration and Properties:
    • --conf spark.executor.extraJavaOptions="-XX:+PrintGCDetails -Dkey=value": Sets extra Java options for executors.
    • --properties-file spark-defaults.conf: Specifies a properties file for additional configuration.

This example covers the most common options you’ll use with spark-submit. Depending on your specific use case, you might need to adjust or include additional options.

The spark-submit command is the most common way to run PySpark applications. It supports various options for running applications in local, standalone, or cluster modes.

2. Using Interactive Shell (PySpark Shell)

You can run PySpark interactively using the PySpark shell, which provides a REPL (Read-Eval-Print Loop) environment.

Jupyter Notebooks provide an interactive environment for running PySpark code. You need to configure the notebook to use PySpark.

The PySpark shell provides an interactive environment to run Spark applications in Python. It’s a convenient way to experiment with Spark and run ad-hoc queries and transformations. Below are the details of PySpark shell execution with examples explaining all the options.

Starting the PySpark Shell

To start the PySpark shell, you typically run the pyspark command from your terminal.

 
pyspark

This command starts the PySpark shell with default settings.

Key Options

You can customize the PySpark shell using various options. These options can be passed in different ways, such as command-line arguments, environment variables, or configuration files.

  1. Basic Options

    • --master MASTER_URL: The master URL for the cluster (local, yarn, mesos, etc.).

       
      pyspark --master local[4]
    • --name NAME: A name for your application.

       
      pyspark --name MyApp
    • --conf PROP=VALUE: Arbitrary Spark configuration properties.

       
      pyspark --conf spark.executor.memory=2g --conf spark.executor.cores=2
  2. Resource Management

    • --driver-memory MEM: Memory for the driver (e.g., 512M, 2G).

       
      pyspark --driver-memory 4G
    • --executor-memory MEM: Memory per executor (e.g., 1G, 2G).

       
      pyspark --conf spark.executor.memory=4G
    • --executor-cores NUM: Number of cores per executor.

       
      pyspark --conf spark.executor.cores=4
  3. Python-Specific Options

    • --py-files FILES: Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH.

       
      pyspark --py-files dependencies.zip
    • --files FILES: Comma-separated list of files to be distributed with the job.

       
      pyspark --files hdfs:///user/config/config.json
  4. Environment Variables

    • PYSPARK_DRIVER_PYTHON: Specify the Python interpreter for the driver.

       
      export PYSPARK_DRIVER_PYTHON=jupyter
      export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
      pyspark
    • PYSPARK_PYTHON: Specify the Python interpreter for the executors.

       
      export PYSPARK_PYTHON=python3
      pyspark
  5. Advanced Options

    • --jars JARS: Comma-separated list of JARs to include on the driver and executor classpaths.

       
      pyspark --jars mysql-connector-java-5.1.45-bin.jar
    • --packages PACKAGES: Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths.

       
      pyspark --packages com.databricks:spark-csv_2.10:1.5.0

Examples

Starting the Shell with a Local Master

pyspark --master local[4] --name MyLocalApp --driver-memory 2G --executor-memory 2G

Starting the Shell with YARN

 
pyspark --master yarn --deploy-mode client --name MyYarnApp --driver-memory 4G --executor-memory 4G --num-executors 10 --executor-cores 4

Starting the Shell with Additional JARs and Python Files

 
pyspark --jars mysql-connector-java-5.1.45-bin.jar --py-files dependencies.zip --files hdfs:///user/config/config.json

Using Custom Python Interpreters

To use Jupyter Notebook as the PySpark driver:

 
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
pyspark

To use Python 3 as the interpreter for executors:

 
export PYSPARK_PYTHON=python3
pyspark

Interactive Usage Example

Once the PySpark shell is started, you can perform various operations interactively:

# Import necessary modules
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.appName(“MyApp”).getOrCreate()

# Create a DataFrame from a CSV file
df = spark.read.csv(“hdfs:///path/to/csvfile.csv”, header=True, inferSchema=True)

# Show the DataFrame schema
df.printSchema()

# Display the first few rows
df.show()

# Perform a group by operation
grouped_df = df.groupBy(“column_name”).count()
grouped_df.show()

# Filter the DataFrame
filtered_df = df.filter(df[“column_name”] > 100)
filtered_df.show()

# Register a temporary view and run SQL queries
df.createOrReplaceTempView(“my_table”)
sql_df = spark.sql(“SELECT * FROM my_table WHERE column_name > 100”)
sql_df.show()

# Save the DataFrame to a Parquet file
df.write.parquet(“hdfs:///path/to/output.parquet”)

Comprehensive Example with All Options

 
pyspark \
--master yarn \
--deploy-mode client \
--name MyComprehensiveApp \
--driver-memory 4G \
--executor-memory 4G \
--num-executors 10 \
--executor-cores 4 \
--jars mysql-connector-java-5.1.45-bin.jar \
--py-files dependencies.zip \
--files hdfs:///user/config/config.json \
--packages com.databricks:spark-csv_2.10:1.5.0 \
--conf spark.executor.extraJavaOptions="-XX:+PrintGCDetails -Dkey=value" \
--conf spark.sql.shuffle.partitions=100 \
--conf spark.dynamicAllocation.enabled=true

Explanation

  • Basic Information:

    • --master yarn: Specifies YARN as the cluster manager.
    • --deploy-mode client: Specifies client mode for execution.
    • --name MyComprehensiveApp: Names the application.
  • Resource Allocation:

    • --driver-memory 4G: Allocates 4 GB of memory for the driver.
    • --executor-memory 4G: Allocates 4 GB of memory for each executor.
    • --num-executors 10: Requests 10 executors.
    • --executor-cores 4: Allocates 4 cores for each executor.
  • Dependencies:

    • --jars mysql-connector-java-5.1.45-bin.jar: Includes an additional JAR file.
    • --py-files dependencies.zip: Distributes additional Python files.
    • --files hdfs:///user/config/config.json: Distributes a configuration file.
    • --packages com.databricks:spark-csv_2.10:1.5.0: Includes a package for handling CSV files.
  • Configuration:

    • --conf spark.executor.extraJavaOptions="-XX:+PrintGCDetails -Dkey=value": Sets extra Java options for executors.
    • --conf spark.sql.shuffle.partitions=100: Sets the number of partitions for shuffles.
    • --conf spark.dynamicAllocation.enabled=true: Enables dynamic resource allocation.

3. Using Jupyter Notebooks

Setup:

  1. Install Jupyter Notebook:
pip install jupyter
  1. Start Jupyter Notebook:
jupyter notebook
  1. Create a new notebook and set up the PySpark environment:
import os
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[4]").appName("Jupyter PySpark").getOrCreate()

4. Using Databricks

Databricks provides a unified analytics platform for big data and machine learning.

Setup:

  1. Create a new cluster in Databricks.
  2. Create a new notebook.
  3. Write and execute your PySpark code in the notebook cells.
  4. Schedule jobs using the Databricks Jobs feature.

5. Using Apache Zeppelin

Apache Zeppelin is an open-source web-based notebook for data analytics.

Setup:

  1. Start the Zeppelin server:
bin/zeppelin-daemon.sh start
  1. Create a new notebook in Zeppelin.
  2. Write and execute your PySpark code in the notebook cells.

6. Using Apache Livy

Apache Livy is a service that enables easy interaction with a Spark cluster over a REST interface.

Setup:

  1. Deploy Livy on your Spark cluster.
  2. Submit PySpark jobs via the Livy REST API.
curl -X POST --data '{"file":"local:/path/to/your_script.py"}' -H "Content-Type: application/json" http://livy-server:8998/batches

7. Using Workflow Managers (Airflow, Luigi, Oozie)

Workflow managers can be used to schedule and manage PySpark jobs.

Apache Airflow:

  1. Define a DAG for your workflow.
  2. Use BashOperator or SparkSubmitOperator to run PySpark scripts.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

dag = DAG('spark_job', schedule_interval='@daily')

spark_task = BashOperator(
task_id='spark_submit',
bash_command='spark-submit --master local[4] /path/to/your_script.py',
dag=dag)

8. Using Crontab for Scheduling

You can schedule PySpark jobs using cron jobs.

Setup:

  1. Edit the crontab file:
crontab -e
  1. Schedule a PySpark script to run at a specific time.
bashCopy code0 0 * * * /path/to/spark/bin/spark-submit /path/to/your_script.py

9. Running within a Python Script

You can submit Spark jobs programmatically within a Python script using the subprocess module.

Example:

import subprocess

subprocess.run(["spark-submit", "--master", "local[4]", "your_script.py"])

10. Using Spark Job Server

Spark Job Server provides a RESTful API for submitting and managing Spark jobs.

Setup:

  1. Deploy Spark Job Server.
  2. Submit jobs via the REST API.
bashCopy codecurl -X POST --data-binary @your_script.py http://spark-jobserver:8090/jobs

11. Using AWS Glue

AWS Glue is a fully managed ETL service.

Setup:

  1. Create an ETL job in AWS Glue.
  2. Write your PySpark code in the job script.
  3. Schedule and run the job in AWS Glue.

12. Using YARN

For cluster environments, PySpark jobs can be submitted to a YARN cluster.

Example:

spark-submit --master yarn --deploy-mode cluster your_script.py

13. Using Kubernetes

Spark supports running on Kubernetes clusters.

Example:

spark-submit --master k8s://<k8s-apiserver> --deploy-mode cluster --name spark-pi --class org.apache.spark.examples.SparkPi local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar

14. Using Google Colab

Google Colab can be used to run PySpark code interactively.

Setup:

  1. Install PySpark in a Colab notebook:
!pip install pyspark
  1. Import PySpark in the notebook:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[4]").appName("Colab PySpark").getOrCreate()


Spark Submit with memory calculation

When using spark-submit, it’s crucial to allocate the appropriate amount of memory to both the driver and executor to ensure efficient execution of your Spark application. Below are the detailed steps and considerations for calculating and setting memory configurations for spark-submit.

Key Considerations for Memory Calculation

  1. Driver Memory:
    • The driver is responsible for the orchestration of the application. It must have enough memory to handle the metadata and small data structures.
    • The driver memory is specified using --driver-memory.
  2. Executor Memory:
    • Executors are responsible for executing the tasks. They need enough memory to process the data and handle shuffling operations.
    • Executor memory is specified using --executor-memory.
    • Memory overhead should be considered as Spark uses additional memory beyond the allocated executor memory.
  3. Number of Executors and Cores:
    • The number of executors and the number of cores per executor need to be balanced based on the cluster’s capacity.
    • Too many executors with fewer cores can lead to excessive communication overhead.
    • Too few executors with many cores can lead to inefficient utilization of resources.

Calculating Memory Allocation

Basic Memory Allocation Formula

  • Total Memory for Executors:
    • Total Cluster MemoryMemory for DriverMemory for OS and other processes
  • Memory per Executor:
    • Total Memory for Executors / Number of Executors
  • Memory Overhead:
    • Spark requires additional memory overhead for JVM and other internal processes. It’s typically 10% of executor memory or at least 384 MB, whichever is larger.

Example Calculation

Suppose you have a cluster with 100 GB of total memory and you want to allocate resources for your Spark job.

  1. Cluster Resources:
    • Total Memory: 100 GB
    • Memory for OS and other processes: 10 GB
    • Memory for Driver: 4 GB
    • Available Memory for Executors: 100 GB – 10 GB – 4 GB = 86 GB
  2. Executor Configuration:
    • Number of Executors: 5
    • Memory per Executor: 86 GB / 5 = 17.2 GB (approximately 17 GB per executor)
    • Memory Overhead: max(0.1 * 17 GB, 384 MB) = 1.7 GB (approximately 2 GB per executor)
  3. Final Memory Configuration:
    • Total Memory per Executor: 17 GB + 2 GB (overhead) = 19 GB

Spark-Submit Command

Using the above calculations, you can construct the spark-submit command as follows:

spark-submit \
--class com.example.MySparkApp \
--master yarn \
--deploy-mode cluster \
--driver-memory 4G \
--executor-memory 17G \
--executor-cores 4 \
--num-executors 5 \
--conf spark.executor.memoryOverhead=2G \
path/to/my-spark-app.jar

Detailed Explanation

  1. Class and Application:
    • --class com.example.MySparkApp: Specifies the main class of the application.
    • path/to/my-spark-app.jar: Path to your application’s JAR file.
  2. Cluster and Deploy Mode:
    • --master yarn: Specifies YARN as the cluster manager.
    • --deploy-mode cluster: Specifies cluster mode for deployment.
  3. Driver Configuration:
    • --driver-memory 4G: Allocates 4 GB of memory for the driver.
  4. Executor Configuration:
    • --executor-memory 17G: Allocates 17 GB of memory for each executor.
    • --executor-cores 4: Allocates 4 cores for each executor.
    • --num-executors 5: Specifies the number of executors.
    • --conf spark.executor.memoryOverhead=2G: Sets the memory overhead for each executor.

Additional Considerations

  • Dynamic Resource Allocation: If your cluster supports dynamic resource allocation, you can configure Spark to adjust the number of executors dynamically based on the workload.--conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.initialExecutors=5 --conf spark.dynamicAllocation.minExecutors=2 --conf spark.dynamicAllocation.maxExecutors=10
  • Fine-tuning Parallelism: Adjust the parallelism to ensure that the tasks are evenly distributed across the executors. --conf spark.sql.shuffle.partitions=200

By carefully calculating and setting the memory configurations, you can optimize the performance of your Spark application and make efficient use of your cluster resources.

How to allocate executor-cores

Allocating the appropriate number of cores per executor is crucial for the performance of your Spark application. The number of cores determines the parallelism within each executor and affects how tasks are executed concurrently.

Considerations for Allocating Executor Cores

  1. Cluster Capacity:
    • Ensure that the total number of cores allocated to executors does not exceed the total number of cores available in the cluster.
  2. Task Parallelism:
    • More cores per executor allow for higher parallelism, meaning more tasks can run concurrently within an executor.
    • However, too many cores per executor can lead to inefficient resource utilization and increased communication overhead.
  3. Data Locality:
    • Having a reasonable number of cores per executor helps maintain data locality, reducing the need for data transfer across the network.
  4. Memory Requirements:
    • Each core consumes memory, so ensure that the executor memory is sufficient to support the allocated cores without causing memory errors.

Example Calculation

Let’s expand the previous example by including the calculation for executor cores.

Suppose you have the following cluster resources:

  • Total cluster memory: 100 GB
  • Total cluster cores: 40
  • Memory for OS and other processes: 10 GB
  • Memory for driver: 4 GB
  • Available memory for executors: 86 GB
  • Number of executors: 5
  1. Memory per Executor:
    • Memory per executor = 86 GB / 5 = 17.2 GB (approximately 17 GB per executor)
    • Memory overhead = max(0.1 * 17 GB, 384 MB) = 1.7 GB (approximately 2 GB per executor)
    • Total memory per executor = 17 GB + 2 GB = 19 GB
  2. Cores per Executor:
    • Total cores available for executors: 40
    • Cores per executor = Total cores available / Number of executors = 40 / 5 = 8 cores per executor

Final spark-submit Command

Using the above calculations, you can construct the spark-submit command as follows:

spark-submit \
--class com.example.MySparkApp \
--master yarn \
--deploy-mode cluster \
--driver-memory 4G \
--executor-memory 17G \
--executor-cores 8 \
--num-executors 5 \
--conf spark.executor.memoryOverhead=2G \
path/to/my-spark-app.jar

Detailed Explanation of Executor Configuration

  1. Executor Memory:
    • --executor-memory 17G: Allocates 17 GB of memory for each executor.
  2. Executor Cores:
    • --executor-cores 8: Allocates 8 cores for each executor.
  3. Number of Executors:
    • --num-executors 5: Specifies the number of executors.
  4. Memory Overhead:
    • --conf spark.executor.memoryOverhead=2G: Sets the memory overhead for each executor to 2 GB.

Additional Considerations

  • Dynamic Resource Allocation: If supported by your cluster, you can enable dynamic resource allocation to adjust the number of executors based on workload. --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.initialExecutors=5 --conf spark.dynamicAllocation.minExecutors=2 --conf spark.dynamicAllocation.maxExecutors=10
  • Fine-tuning Parallelism: Adjust the shuffle partitions to ensure that tasks are evenly distributed. --conf spark.sql.shuffle.partitions=200

By carefully calculating and setting the number of cores per executor along with memory configurations, you can optimize the performance of your Spark application and make efficient use of your cluster resources.

Written by HintsToday Team

Related Posts

Project Alert: Automation in Pyspark

Here is a detailed approach for dividing a monthly PySpark script into multiple code steps. Each step will be saved in the code column of a control DataFrame and executed sequentially. The script will include error handling and pre-checks to ensure source tables are...

read more

Apache Spark- Partitioning and Shuffling

Apache Spark is a powerful distributed computing system that handles large-scale data processing through a framework based on Resilient Distributed Datasets (RDDs). Understanding how Spark partitions data and distributes it via shuffling or other operations is crucial...

read more

Are Dataframes in PySpark Lazy evaluated?

Yes, DataFrames in PySpark are lazily evaluated, similar to RDDs. Lazy evaluation is a key feature of Spark's processing model, which helps optimize the execution of transformations and actions on large datasets. What is Lazy Evaluation? Lazy evaluation means that...

read more
BDL Ecosystem-HDFS and Hive Tables

BDL Ecosystem-HDFS and Hive Tables

Big Data Lake: Data Storage HDFS is a scalable storage solution designed to handle massive datasets across clusters of machines. Hive tables provide a structured approach for querying and analyzing data stored in HDFS. Understanding how these components work together...

read more

Get the latest news

Subscribe to our Newsletter

0 Comments

Submit a Comment

Your email address will not be published. Required fields are marked *