DAG Scheduler in Spark: Detailed Explanation, How it is involved at architecture Level

by | Jun 30, 2024 | Pyspark | 0 comments

DAG Scheduler in Spark: Detailed Explanation

The DAG (Directed Acyclic Graph) Scheduler is a crucial component in Spark’s architecture. It plays a vital role in optimizing and executing Spark jobs. Here’s a detailed breakdown of its function, its place in the architecture, and its involvement in Spark execution, illustrated with a complex example.

Overview of Spark Architecture

Before diving into the DAG Scheduler, let’s briefly overview the Spark architecture:

  1. Driver: The Spark Driver is the main control process that creates the SparkContext, connects to the cluster manager, and coordinates all the Spark jobs and tasks.
  2. Cluster Manager: Manages the cluster resources. Examples include YARN, Mesos, or the built-in standalone cluster manager.
  3. Executors: Worker nodes that run individual tasks. They are responsible for executing code and storing data in memory or disk.
  4. SparkContext: The entry point for a Spark application. It initializes the application and allows the Driver to communicate with the cluster.
  5. Task Scheduler: Distributes tasks to executors.
  6. DAG Scheduler: Divides a job into a DAG of stages, each containing a set of tasks.

Role of the DAG Scheduler

The DAG Scheduler is responsible for:

  1. Creating Stages: It converts the logical execution plan (lineage) of transformations into a DAG of stages.
  2. Pipelining Transformations: Groups transformations that can be executed together into a single stage.
  3. Handling Failures: Ensures fault tolerance by recomputing lost partitions.
  4. Optimizing Execution: Attempts to minimize shuffles and optimize execution.

How the DAG Scheduler Works

  1. Logical Plan: When you define transformations on a DataFrame or RDD, Spark creates a logical plan that outlines the sequence of transformations.
  2. DAG Creation: The DAG Scheduler converts this logical plan into a physical execution plan, breaking it into stages.
    • Stages: Each stage contains a set of transformations that can be executed together without requiring a shuffle.
  3. Task Creation: Within each stage, the DAG Scheduler creates tasks, which are the smallest units of work to be executed by the executors.
  4. Task Scheduling: The Task Scheduler then assigns these tasks to executors based on data locality and resource availability.
  5. Execution: Executors run the tasks, process the data, and store the results.
  6. Actions and Triggers: An action triggers the execution of the DAG. For example, calling collect(), save(), or count() on a DataFrame or RDD.

Complex Example

Let’s consider a complex example where we have multiple transformations and actions:

  1. Data Loading: Load data from different sources.
  2. Transformations: These are lazy operations (like map, filter, flatMap) that define a lineage of RDD (Resilient Distributed Datasets) transformations but do not execute immediately. Instead, they build a logical execution plan.
  3. Actions: Trigger actions to execute the transformations.

Here’s how the DAG Scheduler handles this:

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("DAG Example").getOrCreate()

# Load data from multiple sources
df1 = spark.read.csv("hdfs://path/to/file1.csv", header=True, inferSchema=True)
df2 = spark.read.csv("hdfs://path/to/file2.csv", header=True, inferSchema=True)
df3 = spark.read.jdbc(url="jdbc:oracle:thin:@//host:port/service", table="table_name", properties={"user": "username", "password": "password"})

# Perform complex transformations
df1_filtered = df1.filter(df1["age"] > 30)
df2_filtered = df2.filter(df2["salary"] > 50000)
joined_df = df1_filtered.join(df2_filtered, df1_filtered["id"] == df2_filtered["id"]).drop(df2_filtered["id"])
aggregated_df = joined_df.groupBy("department").avg("salary")

# Further transformation with a third dataset
final_df = aggregated_df.join(df3, aggregated_df["department"] == df3["department"]).select(aggregated_df["*"], df3["extra_info"])

# Trigger action
result = final_df.collect()

# Stop Spark session
spark.stop()

DAG Scheduler Breakdown

  1. Logical Plan Creation:
    • Loading df1, df2, and df3 creates initial logical plans for each dataset.
    • df1_filtered, df2_filtered, joined_df, aggregated_df, and final_df define a series of transformations, forming a complex logical plan.
  2. DAG Construction:
    • Stage 1: Load df1 and filter it (df1_filtered).
    • Stage 2: Load df2 and filter it (df2_filtered).
    • Stage 3: Join df1_filtered and df2_filtered, then group by department and calculate the average salary (aggregated_df).
    • Stage 4: Load df3 and join it with aggregated_df, selecting the final columns (final_df).
  3. Task Creation:
    • Each stage is divided into tasks based on the partitions of the data.
    • For example, if df1 is partitioned into 4 parts, Stage 1 will have 4 tasks.
  4. Task Scheduling:
    • Tasks are scheduled to run on executors, considering data locality to reduce data shuffling.
    • Executors run the tasks for each stage.
  5. Execution:
    • Stage 1: df1 is loaded and filtered. The results are stored in memory or disk.
    • Stage 2: df2 is loaded and filtered. The results are stored.
    • Stage 3: The join operation requires shuffling data based on the join key, creating a shuffle boundary. The grouped and aggregated results are stored.
    • Stage 4: df3 is loaded, joined with aggregated_df, and the final result is computed.
  6. Action Trigger:
    • The collect() action triggers the execution of the entire DAG.
    • The results are collected back to the driver.

Visualization of the DAG Scheduler

Here’s a simple visualization of the DAG Scheduler’s process for the above example:

Logical Plan:
df1 -> filter -> df1_filtered
df2 -> filter -> df2_filtered
df1_filtered + df2_filtered -> join -> joined_df
joined_df -> groupBy + avg -> aggregated_df
aggregated_df + df3 -> join -> final_df

DAG of Stages:
Stage 1:
[Load df1, filter]

Stage 2:
[Load df2, filter]

Stage 3:
[Join df1_filtered and df2_filtered, groupBy, avg]

Stage 4:
[Load df3, join with aggregated_df, select final columns]

Tasks:
- Stage 1: [Task 1, Task 2, Task 3, Task 4] (based on partitions of df1)
- Stage 2: [Task 1, Task 2, Task 3, Task 4] (based on partitions of df2)
- Stage 3: [Task 1, Task 2, Task 3, Task 4] (shuffle, based on join key)
- Stage 4: [Task 1, Task 2, Task 3, Task 4] (based on partitions of df3)

Execution:
- Stage 1 tasks -> Stage 2 tasks -> Shuffle -> Stage 3 tasks -> Stage 4 tasks

By understanding the role and functioning of the DAG Scheduler, you can better optimize and troubleshoot your Spark jobs, ensuring efficient and scalable data processing.

Job as Sets of Transformations, Actions, and Triggers

When you perform transformations on an RDD, Spark does not immediately execute these transformations. Instead, it builds a logical execution plan, representing the transformations as a DAG. This process is called lazy evaluation.

Once an action is triggered, Spark’s DAG Scheduler converts this logical plan into a physical execution plan, breaking it down into stages. Each stage contains a set of transformations that can be executed as a unit of computation, typically ending with a wide transformation requiring a shuffle.

The DAG Scheduler then submits these stages as a series of tasks to the Task Scheduler, which schedules them on worker nodes. The actions and transformations are executed in a distributed manner, with the results collected and returned to the driver program or written to storage.

Example with Complex Scenario

Consider a scenario where you have a Spark job that reads data from a Hive table, performs complex transformations, joins with data from an Oracle table, and writes the results back to Hive.

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
.appName("Complex Spark Job") \
.enableHiveSupport() \
.getOrCreate()

# Read data from Hive
hive_df = spark.sql("SELECT * FROM hive_database.hive_table")

# Read data from Oracle
oracle_df = spark.read \
.format("jdbc") \
.option("url", "jdbc:oracle:thin:@//hostname:port/service") \
.option("dbtable", "oracle_table") \
.option("user", "username") \
.option("password", "password") \
.load()

# Perform transformations
transformed_df = hive_df.filter("condition").join(oracle_df, "join_key").groupBy("column").agg({"column": "max"})

# Write results back to Hive
transformed_df.write.mode("overwrite").saveAsTable("hive_database.target_table")

# Trigger an action
result = transformed_df.count()
print(f"Number of rows in the result: {result}")

Execution Flow

  1. Driver Program: Initializes the SparkSession and SparkContext.
  2. Transformations: filter, join, groupBy, agg are defined but not executed.
  3. Action: count triggers the execution.
  4. DAG Scheduler: Converts the logical plan of transformations into a physical execution plan, breaking it down into stages.
  5. Task Scheduler: Schedules tasks for each stage on the worker nodes.
  6. Execution Engine: Executes the tasks, reads data from Hive and Oracle, performs transformations, and writes the results back to Hive.
  7. Shuffle: Data is shuffled as required by the groupBy operation.
  8. Caching/Persistence: Intermediate results can be cached to optimize performance if needed.

Written by HintsToday Team

Related Posts

Project Alert: Automation in Pyspark

Here is a detailed approach for dividing a monthly PySpark script into multiple code steps. Each step will be saved in the code column of a control DataFrame and executed sequentially. The script will include error handling and pre-checks to ensure source tables are...

read more

Get the latest news

Subscribe to our Newsletter

0 Comments

Submit a Comment

Your email address will not be published. Required fields are marked *