explain a typical Pyspark execution Logs
A typical PySpark execution log provides detailed information about the various stages and tasks of a Spark job. These logs are essential for debugging and optimizing Spark applications. Here’s a step-by-step explanation of what you might see in a typical PySpark execution log:
Step 1: Spark Context Initialization
When you start a PySpark application, the first entries in the log are related to initializing the SparkContext and SparkSession.
INFO SparkContext: Running Spark version 3.1.2
INFO SparkContext: Submitted application: Example App
INFO ResourceUtils: ==============================================================
INFO ResourceUtils: No custom resources configured for spark.driver.
INFO SparkContext: Added JAR file:/path/to/pyspark-shell
Step 2: Application and Job Start
The log will show details about the Spark application and the start of a job.
INFO SparkContext: Starting job: show at ExampleApp.py:25
INFO DAGScheduler: Got job 0 (show at ExampleApp.py:25) with 1 output partitions
INFO DAGScheduler: Final stage: ResultStage 0 (show at ExampleApp.py:25)
INFO DAGScheduler: Parents of final stage: List()
INFO DAGScheduler: Missing parents: List()
Step 3: Stage and Task Scheduling
Next, the log will include details about the stages and tasks that Spark schedules for execution.
INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at show at ExampleApp.py:25), which has no missing parents
INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 4.5 KB, free 365.2 MB)
INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.1 KB, free 365.2 MB)
INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.1:49919 (size: 2.1 KB, free: 366.3 MB)
INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1039
INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at show at ExampleApp.py:25) (first 15 tasks are for partitions Vector(0))
INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 192.168.1.1, executor driver, partition 0, PROCESS_LOCAL, 4735 bytes)
INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
Step 4: Task Execution
Each task’s execution will be logged, including any shuffling and data read/write operations.
INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.1:49919 (size: 2.1 KB, free: 366.3 MB)
INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1380 bytes result sent to driver
INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 213 ms on 192.168.1.1 (executor driver) (1/1)
INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
INFO DAGScheduler: ResultStage 0 (show at ExampleApp.py:25) finished in 0.254 s
INFO DAGScheduler: Job 0 is finished. 1/1 tasks completed.
Step 5: Stage and Job Completion
Logs will indicate the completion of stages and jobs, including the time taken for execution.
INFO DAGScheduler: Job 0 finished: show at ExampleApp.py:25, took 0.270467 s
INFO SparkContext: Starting job: count at ExampleApp.py:27
INFO DAGScheduler: Registering RDD 4 (count at ExampleApp.py:27)
INFO DAGScheduler: Got job 1 (count at ExampleApp.py:27) with 1 output partitions
INFO DAGScheduler: Final stage: ResultStage 1 (count at ExampleApp.py:27)
INFO DAGScheduler: Parents of final stage: List()
INFO DAGScheduler: Missing parents: List()
INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at count at ExampleApp.py:27), which has no missing parents
INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 5.4 KB, free 365.2 MB)
INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.7 KB, free 365.2 MB)
INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.1:49919 (size: 2.7 KB, free: 366.3 MB)
INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1039
INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at count at ExampleApp.py:27) (first 15 tasks are for partitions Vector(0))
INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, 192.168.1.1, executor driver, partition 0, PROCESS_LOCAL, 4735 bytes)
INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.1:49919 (size: 2.7 KB, free: 366.3 MB)
INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1380 bytes result sent to driver
INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 221 ms on 192.168.1.1 (executor driver) (1/1)
INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
INFO DAGScheduler: ResultStage 1 (count at ExampleApp.py:27) finished in 0.273 s
INFO DAGScheduler: Job 1 finished: count at ExampleApp.py:27, took 0.277596 s
Step 6: Resource Cleanup
After job completion, Spark will clean up the resources and shut down the session.
INFO SparkUI: Stopped Spark web UI at http://192.168.1.1:4040
INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
INFO MemoryStore: MemoryStore cleared
INFO BlockManager: BlockManager stopped
INFO BlockManagerMaster: BlockManagerMaster stopped
INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
INFO SparkContext: Successfully stopped SparkContext
INFO ShutdownHookManager: Shutdown hook called
INFO ShutdownHookManager: Deleting directory /tmp/spark-12a26f8f-d833-439a-ae3a-6c9ed6c29c67
Summary of Key Log Entries
- Initialization:
- Information about Spark version, application submission, and resource configuration.
- Job Start:
- Details of job submission, including the number of output partitions and stages.
- Stage and Task Scheduling:
- Logs about stage submission, task scheduling, and resource allocation.
- Task Execution:
- Logs on task start, execution, data shuffling, and completion.
- Stage and Job Completion:
- Logs indicating the completion of stages and jobs with execution time.
- Resource Cleanup:
- Information on stopping the Spark context, cleaning up resources, and shutting down Spark.
Example PySpark Log Analysis
Here’s a breakdown of an example PySpark log entry:
INFO DAGScheduler: Got job 0 (show at ExampleApp.py:25) with 1 output partitions
- INFO: Log level.
- DAGScheduler: Component generating the log.
- Got job 0 (show at ExampleApp.py:25): Job ID and action.
- with 1 output partitions: Number of output partitions for the job.
By understanding these log entries, you can monitor the execution of your PySpark jobs, identify performance bottlenecks, and debug errors efficiently.
let’s delve deeper into the key components and execution process of a PySpark job, explaining each step with a real-life example and corresponding log entries.
Real-Life Example: Word Count Application
We’ll use a word count application as our example. This application reads a text file, counts the occurrences of each word, and outputs the result.
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# Initialize SparkConf and SparkContext
conf = SparkConf().setAppName("Word Count").setMaster("local[*]")
sc = SparkContext(conf=conf)
# Create SparkSession
spark = SparkSession(sc)
# Read text file into RDD
text_file = sc.textFile("path/to/textfile.txt")
# Split lines into words, map each word to a (word, 1) pair, and reduce by key to count occurrences
words = text_file.flatMap(lambda line: line.split(" "))
word_pairs = words.map(lambda word: (word, 1))
word_counts = word_pairs.reduceByKey(lambda a, b: a + b)
# Collect results
results = word_counts.collect()
# Print results
for word, count in results:
print(f"{word}: {count}")
# Stop the SparkContext
sc.stop()
Explanation with Log Entries
1. Initialization:
Code:
conf = SparkConf().setAppName("Word Count").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
Log Entries:
INFO SparkContext: Running Spark version 3.1.2
INFO SparkContext: Submitted application: Word Count
INFO ResourceUtils: ==============================================================
INFO ResourceUtils: No custom resources configured for spark.driver.
INFO SparkContext: Added JAR file:/path/to/pyspark-shell
INFO Utils: Successfully started service 'SparkUI' on port 4040.
INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.1:4040
- Explanation: Initializes the
SparkConf
andSparkContext
. ASparkSession
is created using the existingSparkContext
. The Spark UI is started on port 4040.
2. Resource Allocation:
Spark allocates resources for the application, such as memory and CPUs specified in the configuration (local[*]
means using all available cores).
Log Entries:
INFO SparkContext: Added JAR file:/path/to/pyspark-shell
INFO MemoryStore: MemoryStore started with capacity 366.3 MB
INFO DiskBlockManager: Created local directory at /tmp/blockmgr-123456
INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.1, 49919)
INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.1:49919 with 366.3 MB RAM, BlockManagerId(driver, 192.168.1.1, 49919)
INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.1, 49919)
- Explanation: Registers block managers and allocates memory and disk space for storing RDD partitions.
3. Tasks Creation by Driver for Jobs:
Code:
text_file = sc.textFile("path/to/textfile.txt")
Log Entries:
INFO SparkContext: Starting job: textFile at WordCount.py:12
INFO DAGScheduler: Got job 0 (textFile at WordCount.py:12) with 1 output partitions
INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at textFile at WordCount.py:12), which has no missing parents
INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 4.5 KB, free 365.2 MB)
INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1039
- Explanation: The driver converts the RDD operation (
textFile
) into a job and creates a stage with tasks. The file is split into partitions.
4. DAG Scheduler Working:
The DAG (Directed Acyclic Graph) scheduler divides the job into stages based on shuffle boundaries.
Log Entries:
INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at textFile at WordCount.py:12), which has no missing parents
INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[2] at flatMap at WordCount.py:14), which has no missing parents
INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[3] at map at WordCount.py:15), which has no missing parents
INFO DAGScheduler: Submitting ResultStage 3 (ShuffledRDD[4] at reduceByKey at WordCount.py:16), which has no missing parents
- Explanation: Submits stages in the DAG to be executed. Here, stages correspond to reading the text file, performing
flatMap
,map
, andreduceByKey
operations.
5. Broadcast:
Broadcast variables are used to efficiently distribute large read-only data to all worker nodes.
Log Entries:
plaintextCopy codeINFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.5 KB, free 365.2 MB)
INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.1:49919 (size: 2.1 KB, free: 366.3 MB)
INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1039
- Explanation: Broadcasts the variable to all executors. In this example, broadcasting might occur if there are configuration settings or large data structures needed by all tasks.
6. Stages Creations:
Stages are created for different transformations. Each stage contains tasks that are executed in parallel.
Log Entries:
INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at textFile at WordCount.py:12)
INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[2] at flatMap at WordCount.py:14)
INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[3] at map at WordCount.py:15)
INFO DAGScheduler: Submitting ResultStage 3 (ShuffledRDD[4] at reduceByKey at WordCount.py:16)
- Explanation: Each transformation (
flatMap
,map
,reduceByKey
) results in a new stage.
7. Task Execution:
Tasks within each stage are sent to executors for parallel execution.
Log Entries:
INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 192.168.1.1, executor driver, partition 0, PROCESS_LOCAL, 4735 bytes)
INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.1:49919 (size: 2.1 KB, free: 366.3 MB)
INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1380 bytes result sent to driver
INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 213 ms on 192.168.1.1 (executor driver) (1/1)
INFO DAGScheduler: ResultStage 0 (MapPartitionsRDD[1] at textFile at WordCount.py:12) finished in 0.254 s
INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
- Explanation: Each task processes its partition of data. The executor runs the task and returns the result to the driver.
8. Next Job Initiation:
Once the current job is complete, the next job starts if there are further actions.
Code:
results = word_counts.collect()
Log Entries:
INFO SparkContext: Starting job: collect at WordCount.py:18
INFO DAGScheduler: Got job 1 (collect at WordCount.py:18) with 2 output partitions
INFO DAGScheduler: Final stage: ResultStage 4 (collect at WordCount.py:18)
INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 3)
INFO DAGScheduler: Missing parents: List(ShuffleMapStage 3)
INFO DAGScheduler: Submitting ShuffleMapStage 3 (MapPartitionsRDD[3] at map at WordCount.py:15) with 2 output partitions
INFO DAGScheduler: Submitting ResultStage 4 (ShuffledRDD[4] at reduceByKey at WordCount.py:16) with 1 output partitions
- Explanation: The next job is initiated, here the
collect
action, and Spark plans the stages and tasks accordingly.
Good Post but diagrams or chart wise explaining would have been better.