In PySpark, DataFrame transformations and operations can be efficiently handled using two main approaches:
1οΈβ£ PySpark SQL API Programming (Temp Tables / Views)
- Each transformation step can be written as a SQL query.
- Intermediate results can be stored as temporary views (
createOrReplaceTempView
). - Queries can be executed using
spark.sql()
, avoiding direct DataFrame chaining.
Example:
df.createOrReplaceTempView("source_data")
# Step 1: Filter Data
filtered_df = spark.sql("""
SELECT * FROM source_data WHERE status = 'active'
""")
filtered_df.createOrReplaceTempView("filtered_data")
# Step 2: Aggregate Data
aggregated_df = spark.sql("""
SELECT category, COUNT(*) AS count
FROM filtered_data
GROUP BY category
""")
π Benefits:
βοΈ Each transformation is saved as a temp table/view for easy debugging.
βοΈ Queries become more readable and modular.
βοΈ Avoids excessive DataFrame chaining, improving maintainability.
2οΈβ£ Common Table Expressions (CTEs) for Multi-Step Queries
- Instead of multiple temp tables, each transformation step can be wrapped in a CTE.
- The entire logic is written in a single SQL query.
Example using CTEs:
query = """
WITH filtered_data AS (
SELECT * FROM source_data WHERE status = 'active'
),
aggregated_data AS (
SELECT category, COUNT(*) AS count
FROM filtered_data
GROUP BY category
)
SELECT * FROM aggregated_data
"""
df_final = spark.sql(query)
π Benefits:
βοΈ Eliminates the need for multiple temp views.
βοΈ Improves query organization by breaking steps into CTEs.
βοΈ Executes everything in one optimized SQL call, reducing shuffle costs.
Which Approach is Better?
- Use SQL API with Temp Views when:
- You need step-by-step debugging.
- Your query logic is complex and needs intermediate storage.
- You want to break down transformations into separate queries.
- Use CTEs when:
- You want a single optimized query execution.
- The logic is modular but doesnβt require intermediate views.
- You aim for better performance by reducing redundant reads.
Both approaches eliminate excessive DataFrame chaining and leverage PySparkβs SQL execution engine efficiently.
# Best Practice Template for PySpark SQL API & CTE-based ETL
from pyspark.sql import SparkSession
# Initialize Spark Session
spark = SparkSession.builder.appName("PySparkSQL_ETL").getOrCreate()
# Sample Data (Creating a DataFrame)
data = [(1, "A", "active", 100),
(2, "B", "inactive", 200),
(3, "A", "active", 150),
(4, "C", "active", 120),
(5, "B", "inactive", 300)]
columns = ["id", "category", "status", "amount"]
df = spark.createDataFrame(data, columns)
# Approach 1: Using Temp Views for Step-by-Step ETL
df.createOrReplaceTempView("source_data")
# Step 1: Filter Active Records
filtered_query = """
SELECT * FROM source_data WHERE status = 'active'
"""
filtered_df = spark.sql(filtered_query)
filtered_df.createOrReplaceTempView("filtered_data")
# Step 2: Aggregation
aggregated_query = """
SELECT category, SUM(amount) AS total_amount
FROM filtered_data
GROUP BY category
"""
aggregated_df = spark.sql(aggregated_query)
aggregated_df.show()
# Approach 2: Using CTE for Optimized Query Execution
cte_query = """
WITH filtered_data AS (
SELECT * FROM source_data WHERE status = 'active'
),
aggregated_data AS (
SELECT category, SUM(amount) AS total_amount
FROM filtered_data
GROUP BY category
)
SELECT * FROM aggregated_data
"""
cte_df = spark.sql(cte_query)
cte_df.show()
# Additional Example: Using Multiple CTEs for Complex Transformations
complex_query = """
WITH filtered_data AS (
SELECT * FROM source_data WHERE status = 'active'
),
ranked_data AS (
SELECT *, RANK() OVER (PARTITION BY category ORDER BY amount DESC) AS rank
FROM filtered_data
)
SELECT * FROM ranked_data WHERE rank = 1
"""
ranked_df = spark.sql(complex_query)
ranked_df.show()
# Closing Spark Session
spark.stop()
Optimization in PySpark SQL API programming (using spark.sql()
) Vs optimization in PySpark DataFrame API programming
Yes, the optimization in PySpark SQL API programming (using spark.sql()
) differs from the optimization in PySpark DataFrame API programming (using .select()
, .filter()
, .groupBy()
, etc.). Both approaches rely on Catalyst Optimizer and Tungsten Execution Engine, but they have some differences in how they optimize execution.
πΉ 1. PySpark SQL API Optimization (spark.sql()
)
How it Works:
- Queries written using SQL syntax (
spark.sql("SELECT ... FROM table")
) are parsed and optimized by Catalyst Optimizer. - Catalyst applies logical and physical optimizations, such as:
- Predicate Pushdown β Filtering data early.
- Constant Folding β Precomputing expressions.
- Join Reordering β Choosing the best join strategy.
- The optimized query plan is translated into DataFrame API operations before execution.
Example (SQL API with Temp Views)
df.createOrReplaceTempView("source_data")
query = """
SELECT category, SUM(amount) AS total_amount
FROM source_data
WHERE status = 'active'
GROUP BY category
"""
optimized_df = spark.sql(query)
optimized_df.explain(True) # Show optimized execution plan
β Benefits:
- Queries are optimized before execution.
- Easy to write & modular (especially for SQL-heavy workloads).
- Better for analysts who are comfortable with SQL.
π« Limitations:
- May not be as flexible for complex operations like UDFs, iterative computations.
- Harder debugging when issues occur in long SQL queries.
πΉ 2. PySpark DataFrame API Optimization (df.filter().groupBy()
)
How it Works:
- PySparkβs DataFrame API provides a lazy execution model.
- Operations are chained together, and PySpark optimizes the execution only when an action (
show()
,collect()
, etc.) is triggered. - Catalyst applies optimizations similar to SQL API:
- Column Pruning β Only selects required columns.
- Pushdown Filters β Applies filters at the data source level.
- Rearranging Joins β Chooses broadcast joins when applicable.
Example (DataFrame API Approach)
optimized_df = df.filter(df.status == 'active') \
.groupBy("category") \
.agg({"amount": "sum"}) \
.alias("total_amount")
optimized_df.explain(True) # Show optimized execution plan
β Benefits:
- More flexibility (easier to use UDFs, complex transformations).
- Better debugging (each transformation step is separate).
- Easier integration with ML & advanced functions.
π« Limitations:
- Slightly more verbose compared to SQL API.
- Requires chaining multiple transformations.
π₯ Key Differences Between SQL API & DataFrame API Optimization
Feature | PySpark SQL API (spark.sql() ) | PySpark DataFrame API (df.select() , df.filter() ) |
---|---|---|
Optimization Engine | Catalyst Optimizer + Tungsten | Catalyst Optimizer + Tungsten |
Execution Plan | SQL query is converted into a DataFrame plan before execution | DataFrame transformations are optimized lazily before execution |
Readability | Easier for SQL users | More Pythonic & readable for programmers |
Performance | Good for batch queries (pre-optimized execution) | Good for iterative, complex logic |
Debugging | Harder to debug long SQL queries | Easier debugging step by step |
πΉ Which One Should You Use?
1οΈβ£ Use SQL API (spark.sql()
) when:
- You are working with SQL-heavy transformations.
- You need modular queries with temp views.
- You want batch processing & pre-optimized queries.
2οΈβ£ Use DataFrame API (df.filter()
, df.groupBy()
) when:
- You need more flexibility (e.g., UDFs, machine learning, complex logic).
- You want to debug transformations easily.
- You are working in an iterative pipeline (e.g., dynamic processing).
π Best Practice: Combine Both
For optimized ETL workflows, you can mix both approaches:
- Preprocess with DataFrame API (better control over steps).
- Use SQL API for heavy aggregations (better optimization).
Example Hybrid Approach:
# Step 1: DataFrame API - Initial Filtering
filtered_df = df.filter(df.status == 'active')
# Step 2: Register Temp View & Use SQL API for Aggregation
filtered_df.createOrReplaceTempView("filtered_data")
query = "SELECT category, SUM(amount) AS total_amount FROM filtered_data GROUP BY category"
final_df = spark.sql(query)
β Optimizes both transformations & execution performance.
Both PySpark SQL API and DataFrame API are optimized by Catalyst, but their execution models differ:
- SQL API optimizes before execution (good for queries & batch processing).
- DataFrame API optimizes lazily during execution (good for step-by-step debugging).
Letβs compare performance using explain(True)
on a sample dataset for both PySpark SQL API and PySpark DataFrame API.
πΉ Step 1: Create a Sample DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Initialize Spark Session
spark = SparkSession.builder.appName("PySpark_Optimization").getOrCreate()
# Sample Data
data = [(1, "A", "active", 100),
(2, "B", "inactive", 200),
(3, "A", "active", 150),
(4, "C", "active", 120),
(5, "B", "inactive", 300),
(6, "A", "active", 180)]
columns = ["id", "category", "status", "amount"]
df = spark.createDataFrame(data, columns)
πΉ Step 2: SQL API Optimization (spark.sql()
)
# Register temp view
df.createOrReplaceTempView("source_data")
# SQL Query with Filtering and Aggregation
query = """
SELECT category, SUM(amount) AS total_amount
FROM source_data
WHERE status = 'active'
GROUP BY category
"""
# Execute SQL Query
sql_df = spark.sql(query)
# Explain Execution Plan
sql_df.explain(True)
πΉ SQL API Execution Plan Output (Sample)
== Optimized Logical Plan ==
Aggregate [category#32], [category#32, sum(amount#34) AS total_amount#50]
+- Filter (isnotnull(status#33) && (status#33 = active))
+- Relation[id#31,category#32,status#33,amount#34] parquet
β Optimizations Applied:
- Filter Pushdown (
WHERE status = 'active'
applied before aggregation) - Column Pruning (
Only category and amount are selected
) - Aggregate Optimization (
SUM(amount) grouped efficiently
)
πΉ Step 3: DataFrame API Optimization (df.filter().groupBy()
)
# DataFrame API Approach
df_filtered = df.filter(col("status") == "active") \
.groupBy("category") \
.sum("amount") \
.withColumnRenamed("sum(amount)", "total_amount")
# Explain Execution Plan
df_filtered.explain(True)
πΉ DataFrame API Execution Plan Output (Sample)
== Physical Plan ==
*(2) HashAggregate(keys=[category#32], functions=[sum(amount#34)])
+- *(2) HashAggregate(keys=[category#32], functions=[partial_sum(amount#34)])
+- *(1) Project [category#32, amount#34]
+- *(1) Filter (isnotnull(status#33) && (status#33 = active))
+- Scan parquet [id#31,category#32,status#33,amount#34]
β Optimizations Applied:
- Filter Pushdown
- Column Pruning
- Partial Aggregation for Efficiency (
partial_sum()
)
πΉ Step 4: Performance Comparison
Feature | SQL API (spark.sql() ) | DataFrame API (df.filter() ) |
---|---|---|
Execution Plan Type | Logical SQL to DataFrame Plan | Direct Logical DataFrame Plan |
Optimization Applied | Predicate Pushdown, Column Pruning, Aggregation Optimization | Same Optimizations but in step-wise execution |
Performance | β Optimized before execution (Batch processing) | β Optimized lazily (Step-by-step execution) |
Use Case | Best for complex SQL transformations & analytics | Best for incremental processing & iterative transformations |
πΉ Key Takeaways
1οΈβ£ Both SQL API & DataFrame API get optimized using Catalyst Optimizer.
2οΈβ£ Execution Plans are similar (both use Filter Pushdown, Column Pruning, Aggregation).
3οΈβ£ SQL API pre-optimizes everything before execution, while DataFrame API optimizes lazily.
4οΈβ£ SQL API is best for batch processing, while DataFrame API is better for debugging & step-by-step transformations.
Both PySpark SQL API and DataFrame API use Catalyst Optimizer, and in the end, SQL queries are converted into DataFrame operations before execution. However, the key difference lies in how and when optimization happens in each approach.
π 1. SQL API Optimization (Pre-Optimized Before Execution)
What Happens?
- When you write
spark.sql("SELECT ... FROM table")
, PySpark immediately parses the query. - Catalyst Optimizer applies logical optimizations (e.g., filter pushdown, constant folding).
- The optimized query plan is created before execution.
- Then, it is translated into DataFrame operations, and lazy execution kicks in.
Example: SQL API Execution Flow
df.createOrReplaceTempView("source_data")
query = """
SELECT category, SUM(amount) AS total_amount
FROM source_data
WHERE status = 'active'
GROUP BY category
"""
final_df = spark.sql(query)
final_df.show()
π Steps in SQL API Execution:
- Parsing: SQL query is parsed into an unoptimized logical plan.
- Optimization: Catalyst applies logical optimizations before execution.
- Conversion: Optimized SQL is converted into a DataFrame execution plan.
- Execution: Only when
.show()
(or another action) is called, execution happens.
β Key Insight:
- Optimization happens before DataFrame API conversion, so SQL API sends a pre-optimized plan to execution.
- The optimizer has a full view of the query upfront, making multi-step optimizations easier.
π 2. DataFrame API Optimization (Optimized Lazily During Execution)
What Happens?
- When you chain DataFrame transformations (
.select()
,.filter()
, etc.), each transformation adds to the logical execution plan. - No execution happens until an action (
.show()
,.collect()
) is triggered. - Catalyst Optimizer optimizes the entire execution plan at the last moment before execution.
Example: DataFrame API Execution Flow
filtered_df = df.filter(df.status == "active")
aggregated_df = filtered_df.groupBy("category").sum("amount")
aggregated_df.show()
π Steps in DataFrame API Execution:
- Transformation Building: Each
.filter()
,.groupBy()
adds a step to the logical execution plan. - Lazy Optimization: No optimization happens yet.
- Triggering Execution: When
.show()
is called, the entire plan is optimized just before execution. - Execution: Spark runs the optimized execution plan.
β Key Insight:
- Optimization happens at the last step before execution.
- Spark does not have full query context until execution is triggered, which may limit certain optimizations.
π₯ Core Differences Between SQL API & DataFrame API Optimization
Feature | SQL API (spark.sql() ) | DataFrame API (df.select() , df.filter() ) |
---|---|---|
When Optimization Happens | Before execution (on query parsing) | Just before execution (lazy) |
Execution Plan Generation | Optimized upfront before DataFrame conversion | Built step-by-step, optimized at the end |
Handling of Complex Queries | Full query view allows better optimizations | Step-wise transformations may limit some optimizations |
Best Use Case | Multi-step SQL queries, joins, batch processing | Iterative processing, ML pipelines, debugging |
π When to Use Which?
β Use SQL API When:
- You have multi-step transformations that need global query optimization.
- You prefer writing complex logic in SQL (better readability for SQL-heavy workloads).
- You want Catalyst to optimize the entire query upfront before execution.
β Use DataFrame API When:
- You need iterative, flexible transformations (easier debugging).
- Your workflow involves dynamic logic (e.g., using variables and conditions).
- You work with ML pipelines, UDFs, or Python-specific transformations.
π¬ Example: SQL API vs DataFrame API Optimization Difference
Scenario: Filtering, grouping, and aggregation on a dataset.
πΉ SQL API Approach (Optimized Before Execution)
df.createOrReplaceTempView("source_data")
query = """
SELECT category, SUM(amount) AS total_amount
FROM source_data
WHERE status = 'active'
GROUP BY category
"""
final_df = spark.sql(query)
final_df.explain(True) # Show execution plan
β Optimization Advantage:
- Predicate Pushdown:
WHERE status = 'active'
is applied before aggregation. - Better Join Order (if applicable): Joins get pre-optimized.
πΉ DataFrame API Approach (Optimized Lazily)
filtered_df = df.filter(df.status == "active")
aggregated_df = filtered_df.groupBy("category").agg({"amount": "sum"})
aggregated_df.explain(True) # Show execution plan
β Optimization Happens Later:
- Transformations are built step-by-step.
- Catalyst does not optimize until
.show()
is called.
π§ Key Takeaway
Both SQL API and DataFrame API are optimized by Catalyst, but:
- SQL API pre-optimizes the entire query before converting it into DataFrame operations.
- DataFrame API builds the execution plan step-by-step and optimizes only when an action is triggered.
π₯ Catalyst Optimizer & Tungsten Execution Engine in PySpark
PySpark uses two main components for optimization and execution:
- Catalyst Optimizer β Responsible for query optimization (logical & physical plans).
- Tungsten Execution Engine β Handles low-level execution optimizations (CPU, memory, code generation).
Letβs break down both optimizers in detail.
π 1. Catalyst Optimizer (Logical & Physical Query Optimization)
What it Does
Catalyst Optimizer is a rule-based and cost-based optimizer in Spark that optimizes queries before execution. It transforms SQL queries and DataFrame operations into the most efficient execution plan.
Catalyst Workflow (4 Steps)
When you run a DataFrame operation or an SQL query, Catalyst goes through 4 phases:
1οΈβ£ Parse SQL Query / Convert DataFrame to Logical Plan
- If using SQL: The SQL string is parsed into an Unresolved Logical Plan.
- If using DataFrame API: Spark directly creates an Unresolved Logical Plan.
2οΈβ£ Analyze: Resolve Column Names & Types
- Checks whether tables, columns, and functions exist.
- Resolves column data types from the schema.
3οΈβ£ Optimize: Apply Logical Optimizations (Rule-Based Optimizations)
- Pushdown Filters: Move
WHERE
conditions close to data source. - Constant Folding: Precompute expressions (e.g.,
2+3
β5
). - Column Pruning: Remove unused columns.
- Predicate Simplification: Convert complex conditions into simpler ones.
- Join Reordering: Choose the best join order.
4οΈβ£ Generate Physical Plan (Execution Plan Selection)
- Decides execution strategy (e.g., SortMergeJoin vs BroadcastJoin).
- Generates RDD transformations (Resilient Distributed Datasets).
- This optimized plan is sent to the Tungsten execution engine.
Example: Catalyst Optimization in Action
πΉ SQL Query
df.createOrReplaceTempView("transactions")
query = "SELECT category, SUM(amount) FROM transactions WHERE status = 'active' GROUP BY category"
optimized_df = spark.sql(query)
optimized_df.explain(True) # Shows Catalyst Optimized Execution Plan
πΉ DataFrame API
optimized_df = df.filter(df.status == "active").groupBy("category").agg({"amount": "sum"})
optimized_df.explain(True) # Shows Catalyst Optimized Execution Plan
π What Catalyst Does Here
- Moves
WHERE status = 'active'
before aggregation (Predicate Pushdown). - Keeps only
category
andamount
columns (Column Pruning). - Selects the best join strategy if multiple tables are involved (Join Reordering).
β‘ 2. Tungsten Execution Engine (Physical Execution Optimizations)
What it Does
Once Catalyst generates the optimized execution plan, Tungsten takes over and optimizes at a lower level (CPU & memory management).
Tungsten Optimizations (3 Key Areas)
1οΈβ£ Memory Management & Binary Processing
- Uses off-heap memory (bypasses JVM Garbage Collection).
- Avoids unnecessary memory allocations.
2οΈβ£ Code Generation (Whole-Stage Codegen)
- Generates low-level Java bytecode for faster execution.
- Converts high-level Spark plans into optimized machine code.
3οΈβ£ Efficient Data Structures & Algorithms
- Uses compressed columnar storage for faster processing.
- Vectorized Execution: Processes multiple rows in parallel (SIMD operations).
Example: How Tungsten Optimizes Execution
df.groupBy("category").sum("amount").explain(True)
π What Tungsten Does Here
- Uses Whole-Stage Code Generation: Converts
.groupBy().sum()
into optimized Java bytecode. - Uses Columnar Memory Layout: Reduces memory overhead.
- Applies SIMD & Cache-Aware Execution: Processes batches of rows instead of row-by-row.
π§ Catalyst vs Tungsten: How They Work Together
Feature | Catalyst Optimizer | Tungsten Execution Engine |
---|---|---|
Optimization Level | Query-level optimization (logical & physical) | Execution-level optimization (CPU & memory) |
Key Responsibilities | Optimizing query plans (filter pushdown, column pruning, join reordering) | Managing memory, generating optimized bytecode, using efficient data structures |
When It Runs? | Before execution (query transformation) | During execution (hardware-level optimizations) |
Goal | Minimize data movement & optimize transformations | Maximize execution speed & minimize CPU/memory overhead |
π Final Takeaways
- Catalyst Optimizer improves query plans before execution.
- Tungsten Execution Engine boosts runtime performance using efficient memory management & code generation.
- Together, they make PySpark fast & scalable π.
π PySpark Optimizations, Configurations & DAG Explained
Now that you understand Catalyst Optimizer and Tungsten Execution Engine, let’s explore other key optimizations and configurations to improve PySpark execution. We’ll also dive into DAG (Directed Acyclic Graph) and how Spark uses it for execution.
π₯ 1. Optimization Methods & Configurations in PySpark
PySpark optimizations can be categorized into 4 main areas:
Category | Optimization Technique |
---|---|
Query Optimization | Predicate Pushdown, Column Pruning, Join Optimization |
Execution Optimization | Caching, Broadcast Joins, Data Skew Handling |
Shuffle Optimization | Repartitioning, Coalesce, ReduceByKey |
Memory & Performance | Serialization, GC Tuning, Adaptive Query Execution (AQE) |
π A. Query Optimization Techniques
These optimizations reduce data movement and minimize processing time.
β 1. Predicate Pushdown (Filter Early)
π Move filters as close as possible to data source
- Catalyst automatically does this, but explicit
.filter()
improves readability.
# β
Best practice: Filter first, then process
df_filtered = df.filter(df.status == "active").select("category", "amount")
β 2. Column Pruning (Select Only Required Columns)
π Avoid selecting unnecessary columns to reduce data transfer.
# β
Best practice: Select only required columns
df = df.select("category", "amount")
β 3. Join Optimization (Broadcast Joins for Small Tables)
π Use Broadcast Join when one table is small (β€ 10MB).
from pyspark.sql.functions import broadcast
df_large = spark.read.parquet("large_table.parquet")
df_small = spark.read.parquet("small_lookup.parquet")
# β
Best practice: Broadcast small table to avoid shuffle
df_result = df_large.join(broadcast(df_small), "common_key")
πΉ Why? Normal joins trigger shuffles, but Broadcast Joins send small tables to all nodes.
β‘ B. Execution Optimizations
These optimizations improve PySpark job execution by reducing processing overhead.
β 4. Caching & Persisting
π Cache DataFrames that are reused multiple times
df.cache() # Keeps data in memory for faster access
df.show()
πΉ Use .persist(StorageLevel.MEMORY_AND_DISK)
if data is too large to fit in memory.
β 5. Handling Data Skew (Salting Technique)
π If one key has too much data, Spark creates an imbalance
- Add a salt column to distribute data evenly across partitions.
from pyspark.sql.functions import monotonically_increasing_id
df = df.withColumn("salt", (monotonically_increasing_id() % 5))
df_skew_fixed = df.repartition("common_key", "salt") # Distributes load
πΉ Why? This prevents one partition from handling most of the data, avoiding slow execution.
π C. Shuffle Optimizations (Partitioning & Repartitioning)
π Shuffling is expensive (network & disk I/O). Optimize with smart partitioning.
β 6. Repartition vs Coalesce
Operation | Use Case |
---|---|
repartition(n) | Increases partitions (full shuffle, used for balancing) |
coalesce(n) | Reduces partitions (avoids full shuffle, best for minimizing data movement) |
df_repartitioned = df.repartition(10) # Full shuffle
df_coalesced = df.coalesce(2) # Merges partitions (faster for output writing)
β
7. Use reduceByKey()
Instead of groupBy()
π reduceByKey()
avoids shuffling intermediate data.
# β groupByKey() causes unnecessary shuffle
df.groupBy("category").agg({"amount": "sum"})
# β
Use reduceByKey() for aggregation
rdd = df.rdd.map(lambda x: (x["category"], x["amount"]))
rdd_reduced = rdd.reduceByKey(lambda a, b: a + b)
πΉ Why? reduceByKey()
combines values locally before shuffling, reducing data transfer.
π D. Memory & Performance Optimizations
β 8. Kryo Serialization (Faster Object Serialization)
π Use Kryo instead of Java serialization for better performance.
spark = SparkSession.builder.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").getOrCreate()
πΉ Why? Kryo is 2-3x faster than default Java serialization.
β 9. Adaptive Query Execution (AQE)
π Dynamically optimizes joins & partitions at runtime (introduced in Spark 3.0+).
spark.conf.set("spark.sql.adaptive.enabled", True)
πΉ Why? Adjusts join strategies & shuffle partitions dynamically based on data size.
π 2. What is DAG (Directed Acyclic Graph) in PySpark?
DAG = Execution Plan for a PySpark Job
When you execute a PySpark job, Spark creates a DAG (Directed Acyclic Graph) that represents all transformations & actions.
How DAG Works
1οΈβ£ Transforms DataFrames into a DAG
- Each
.select()
,.filter()
,.join()
adds a node to DAG. - No execution happens yet (lazy evaluation).
2οΈβ£ Splits DAG into Stages
- Stages = Boundaries of Shuffles.
- If a shuffle (e.g.,
.groupBy()
,.join()
) is needed, Spark creates a new stage.
3οΈβ£ Converts DAG to RDDs & Executes in Stages
- RDD (Resilient Distributed Dataset) execution happens in parallel across worker nodes.
- Tasks are scheduled & executed stage-by-stage.
π₯ Example: DAG in PySpark
df_filtered = df.filter(df.status == "active")
df_grouped = df_filtered.groupBy("category").sum("amount")
df_grouped.show()
π DAG Breakdown:
- Step 1:
df.filter(df.status == "active")
- Transformation (no execution yet).
- Step 2:
groupBy().sum()
- New stage is created (shuffle required).
- Step 3:
.show()
- Triggers DAG execution (final stage).
π Visualizing DAG
df_grouped.explain(mode="formatted")
π Final Takeaways
Optimization Area | Best Practices |
---|---|
Query Optimization | Pushdown filters, column pruning, broadcast joins |
Execution Optimization | Caching, handling skew, adaptive query execution (AQE) |
Shuffle Optimization | Reduce partitions with coalesce, prefer reduceByKey over groupBy |
Memory & Performance | Kryo serialization, vectorized execution |
Understanding DAG
- DAG represents the execution flow of a PySpark job.
- Splits execution into stages (shuffle boundaries).
- Ensures parallel execution & fault tolerance.
---
### **π PySpark Performance Comparison Script**
```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
# Initialize Spark Session
spark = SparkSession.builder \
.appName("PySpark Optimization Comparison") \
.config("spark.sql.adaptive.enabled", True) \ # Enable AQE (Adaptive Query Execution)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ # Use Kryo Serialization
.getOrCreate()
# Load Sample Data
df_large = spark.read.parquet("large_dataset.parquet")
df_small = spark.read.parquet("small_lookup.parquet")
# --------------------------------------------------
# π 1. Filter Pushdown Optimization
# --------------------------------------------------
# β Without Filter Pushdown (Inefficient)
df_no_pushdown = df_large.select("*").filter(df_large.status == "active")
df_no_pushdown.explain(mode="formatted")
# β
With Filter Pushdown (Optimized)
df_pushdown = df_large.filter("status = 'active'")
df_pushdown.explain(mode="formatted")
# --------------------------------------------------
# π 2. Column Pruning Optimization
# --------------------------------------------------
# β Selecting All Columns (Inefficient)
df_all_columns = df_large.select("*")
df_all_columns.explain(mode="formatted")
# β
Selecting Only Required Columns (Optimized)
df_pruned_columns = df_large.select("id", "category", "amount")
df_pruned_columns.explain(mode="formatted")
# --------------------------------------------------
# π 3. Broadcast Join Optimization
# --------------------------------------------------
# β Normal Join (Causes Expensive Shuffle)
df_normal_join = df_large.join(df_small, "common_key")
df_normal_join.explain(mode="formatted")
# β
Broadcast Join (Optimized for Small Tables)
df_broadcast_join = df_large.join(broadcast(df_small), "common_key")
df_broadcast_join.explain(mode="formatted")
# --------------------------------------------------
# π 4. Repartition vs Coalesce Optimization
# --------------------------------------------------
# β Using Repartition (Forces Full Shuffle)
df_repartitioned = df_large.repartition(10)
df_repartitioned.explain(mode="formatted")
# β
Using Coalesce (Minimizes Data Movement)
df_coalesced = df_large.coalesce(2)
df_coalesced.explain(mode="formatted")
# --------------------------------------------------
# π 5. groupBy() vs reduceByKey() Optimization
# --------------------------------------------------
# β Using groupBy() (Expensive Shuffle)
df_grouped = df_large.groupBy("category").agg({"amount": "sum"})
df_grouped.explain(mode="formatted")
# β
Using reduceByKey() (Optimized for Aggregations)
rdd = df_large.rdd.map(lambda x: (x["category"], x["amount"]))
rdd_reduced = rdd.reduceByKey(lambda a, b: a + b)
rdd_reduced_df = rdd_reduced.toDF(["category", "total_amount"])
rdd_reduced_df.explain(mode="formatted")
# --------------------------------------------------
# π 6. Adaptive Query Execution (AQE) Impact
# --------------------------------------------------
# β
Enabling AQE allows Spark to dynamically optimize joins & partitions
spark.conf.set("spark.sql.adaptive.enabled", True)
df_aqe_test = df_large.join(df_small, "common_key")
df_aqe_test.explain(mode="formatted")
You can generate a real DAG (Directed Acyclic Graph) visualization using Spark UI. Hereβs how you can do it step by step:
π Steps to Generate a DAG in Spark UI
1οΈβ£ Start Your PySpark Session with Spark UI Enabled
Run the following in your PySpark environment (local or cluster):
from pyspark.sql import SparkSession
# Start Spark session with UI enabled
spark = SparkSession.builder \
.appName("DAG_Visualization") \
.config("spark.ui.port", "4040") \ # Enable Spark UI on port 4040
.getOrCreate()
πΉ By default, Spark UI runs on localhost:4040.
πΉ Open http://localhost:4040 in your browser to view DAGs.
2οΈβ£ Run a Spark Job to Generate a DAG
Now, execute a simple transformation to create a DAG visualization:
python
Copy
Edit
df_large = spark.read.parquet("large_dataset.parquet")
df_small = spark.read.parquet("small_lookup.parquet")
# Perform transformations
df_filtered = df_large.filter("status = 'active'")
df_joined = df_filtered.join(df_small, "common_key")
df_result = df_joined.groupBy("category").agg({"amount": "sum"})
# Trigger an action (forces DAG execution)
df_result.show()
πΉ The DAG (Directed Acyclic Graph) will appear in Spark UI under the "Jobs" tab.
3οΈβ£ View DAG in Spark UI
Open http://localhost:4040 in your browser.
Navigate to the "Jobs" section.
Click on your job to see the DAG Visualization.
You can also check Stages β Executors β SQL β Storage tabs to analyze execution details.
4οΈβ£ Save DAG as an Image (Optional)
If you want to export the DAG, you can take a screenshot, or use:
wget -O dag.png http://localhost:4040/stages/stage/0/dagViz.svg
This saves the DAG as an image.
Discover more from HintsToday
Subscribe to get the latest posts sent to your email.
Home › Forums › PySpark SQL API Programming- How To, Approaches, Optimization