Common Table Expressions (CTEs) are a useful feature in SQL for simplifying complex queries and improving readability. Both Oracle PL/SQL and Apache Hive support CTEs, although there may be slight differences in their syntax and usage.
Common Table Expressions in Oracle PL/SQL
In Oracle, CTEs are defined using the WITH
clause. They are used to create a temporary result set that can be referenced within the main query.
Syntax:
WITH cte_name AS (
SELECT query
)
SELECT columns
FROM cte_name
WHERE condition;
Example Usage:
WITH department_totals AS (
SELECT department_id, SUM(salary) AS total_salary
FROM employees
GROUP BY department_id
)
SELECT d.department_id, d.department_name, t.total_salary
FROM departments d
JOIN department_totals t ON d.department_id = t.department_id;
In this example:
- The CTE
department_totals
calculates the total salary for each department. - The main query then joins the departments table with the
department_totals
CTE to display the department name and total salary.
Common Table Expressions in Apache Hive
In Hive, CTEs are also defined using the WITH
clause. They function similarly to those in Oracle, allowing you to create temporary result sets for use within a larger query.
Syntax:
WITH cte_name AS (
SELECT query
)
SELECT columns
FROM cte_name
WHERE condition;
Example Usage:
WITH department_totals AS (
SELECT department_id, SUM(salary) AS total_salary
FROM employees
GROUP BY department_id
)
SELECT d.department_id, d.department_name, t.total_salary
FROM departments d
JOIN department_totals t ON d.department_id = t.department_id;
In this example:
- The syntax and functionality are essentially the same as in Oracle PL/SQL.
- The CTE
department_totals
is used to calculate the total salary for each department. - The main query joins the departments table with the
department_totals
CTE to display the department name and total salary.
Key Points
- Scope: CTEs in both Oracle and Hive are limited to the scope of the query they are defined in. Once the query execution is complete, the CTEs are discarded.
- Multiple CTEs: You can define multiple CTEs in a single query by separating them with commas.
- Recursion: Oracle supports recursive CTEs, which allow a CTE to refer to itself. Hive also supports recursive CTEs starting from version 3.1.0.
Example of Multiple CTEs:
WITH
cte1 AS (
SELECT department_id, SUM(salary) AS total_salary
FROM employees
GROUP BY department_id
),
cte2 AS (
SELECT department_id, COUNT(*) AS employee_count
FROM employees
GROUP BY department_id
)
SELECT c1.department_id, c1.total_salary, c2.employee_count
FROM cte1 c1
JOIN cte2 c2 ON c1.department_id = c2.department_id;
In both Oracle and Hive, this example:
- Defines two CTEs:
cte1
andcte2
. - The main query joins these two CTEs to display the department ID, total salary, and employee count for each department.
CTEs are a powerful tool in both Oracle PL/SQL and Hive for breaking down complex queries, making them easier to read and maintain.
Here’s an example of a complex query using multiple CTEs and embedded CTEs in a single WITH
statement. This query is designed to work in both MySQL and Spark SQL. It involves six layers of CTEs to illustrate a comprehensive use case.
Scenario:
- You have two tables:
products
with columns (product_id, product_name, category_id, price) andcategories
with columns (category_id, category_name). - You want to perform various aggregations and calculations to ultimately determine the price difference between each product and the next most expensive product in the same category, while including additional metrics such as category-level statistics and rank.
SQL Query with Six Layers of CTEs
WITH
-- Layer 1: CTE to join products and categories
product_categories AS (
SELECT
p.product_id,
p.product_name,
p.category_id,
p.price,
c.category_name
FROM
products p
JOIN
categories c ON p.category_id = c.category_id
),
-- Layer 2: CTE to calculate the minimum and maximum price per category
category_min_max_prices AS (
SELECT
category_id,
MIN(price) AS min_price,
MAX(price) AS max_price
FROM
product_categories
GROUP BY
category_id
),
-- Layer 3: CTE to rank products within each category by price
ranked_products AS (
SELECT
pc.product_id,
pc.product_name,
pc.category_id,
pc.price,
pc.category_name,
ROW_NUMBER() OVER (PARTITION BY pc.category_id ORDER BY pc.price DESC) AS price_rank
FROM
product_categories pc
),
-- Layer 4: CTE to calculate the next higher price for each product
next_higher_prices AS (
SELECT
rp.product_id,
rp.product_name,
rp.category_id,
rp.price,
rp.category_name,
rp.price_rank,
LEAD(rp.price) OVER (PARTITION BY rp.category_id ORDER BY rp.price DESC) AS next_higher_price
FROM
ranked_products rp
),
-- Layer 5: CTE to calculate the price difference with the next higher price
price_differences AS (
SELECT
nhp.product_id,
nhp.product_name,
nhp.category_id,
nhp.price,
nhp.category_name,
nhp.price_rank,
nhp.next_higher_price,
nhp.price - nhp.next_higher_price AS price_difference
FROM
next_higher_prices nhp
),
-- Layer 6: CTE to calculate category-level statistics and combine all previous data
category_statistics AS (
SELECT
pd.category_id,
pd.category_name,
AVG(pd.price) AS avg_price,
COUNT(pd.product_id) AS total_products,
cm.min_price,
cm.max_price,
pd.product_id,
pd.product_name,
pd.price,
pd.price_rank,
pd.next_higher_price,
pd.price_difference
FROM
price_differences pd
JOIN
category_min_max_prices cm ON pd.category_id = cm.category_id
GROUP BY
pd.category_id, pd.category_name, cm.min_price, cm.max_price, pd.product_id, pd.product_name, pd.price, pd.price_rank, pd.next_higher_price, pd.price_difference
)
-- Final query to select all relevant columns
SELECT
cs.product_id,
cs.product_name,
cs.category_id,
cs.category_name,
cs.price,
cs.price_rank,
cs.next_higher_price,
cs.price_difference,
cs.avg_price,
cs.total_products,
cs.min_price,
cs.max_price
FROM
category_statistics cs
ORDER BY
cs.category_id, cs.price DESC;
Explanation:
product_categories
CTE:- Joins the
products
andcategories
tables to select relevant columns.
- Joins the
category_min_max_prices
CTE:- Calculates the minimum and maximum price for each category.
ranked_products
CTE:- Ranks products within each category by price.
next_higher_prices
CTE:- Calculates the next higher price for each product within the same category.
price_differences
CTE:- Calculates the price difference between each product and the next higher price.
category_statistics
CTE:- Calculates category-level statistics and combines data from all previous CTEs.
- Final Query:
- Selects the final results, including product details, price difference, and category-level statistics.
This approach demonstrates how to use multiple and embedded CTEs to build a complex query step-by-step, making it easier to understand and maintain.
Let us suppose a scenario in which i have converted six steps of pyspark scripts (Mostly Dataframes ) into a multiple CTEs with ‘single with’ statement in Spark sql Query. which will be performing better?
When deciding whether to use multiple CTEs in a single WITH
statement in Spark SQL versus writing equivalent transformations in PySpark, performance can depend on several factors:
Factors Affecting Performance:
- Execution Plan Optimization:
- Spark SQL: The Catalyst optimizer in Spark SQL can optimize the query plan, potentially reducing the execution time. Spark SQL queries are usually well-optimized because of this.
- PySpark: The DataFrame API also benefits from the Catalyst optimizer. However, PySpark scripts can be more flexible in terms of optimization strategies and custom transformations.
- Readability and Maintainability:
- Spark SQL: Using multiple CTEs can make the query easier to read and maintain, especially for complex transformations. It provides a declarative approach.
- PySpark: While PySpark scripts can be more verbose, they offer more control over the transformations and are often easier to debug step-by-step.
- Lazy Evaluation:
- Both Spark SQL and PySpark follow lazy evaluation principles, meaning transformations are not executed until an action (e.g.,
show()
,count()
,collect()
) is called.
- Both Spark SQL and PySpark follow lazy evaluation principles, meaning transformations are not executed until an action (e.g.,
- Resource Utilization:
- The performance of Spark SQL and PySpark can vary depending on cluster resource configurations, data size, and complexity of transformations.
Example: Converting Multiple Steps of PySpark to CTEs in Spark SQL
PySpark Script Example:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, lead
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("Example").getOrCreate()
# Step 1: Read data
products = spark.read.csv("products.csv", header=True, inferSchema=True)
categories = spark.read.csv("categories.csv", header=True, inferSchema=True)
# Step 2: Calculate total number of products
product_totals = products.groupBy("category_id").count().alias("total_products")
# Step 3: Calculate average price in each category
category_averages = products.groupBy("category_id").agg(avg("price").alias("avg_price"))
# Step 4: Identify products with prices above the average in their category
above_average_products = products.join(category_averages, "category_id").filter(col("price") > col("avg_price"))
# Step 5: Rank products within their categories by price
window_spec = Window.partitionBy("category_id").orderBy(col("price").desc())
ranked_products = products.withColumn("price_rank", rank().over(window_spec))
# Step 6: Calculate the next most expensive product price within each category
next_expensive_prices = products.withColumn("next_higher_price", lead("price").over(window_spec))
# Final Step: Combine all previous steps
final_result = products
.join(product_totals, "category_id")
.join(category_averages, "category_id")
.join(above_average_products, "product_id", "left")
.join(ranked_products, "product_id")
.join(next_expensive_prices, "product_id")
.select(products["*"],
product_totals["total_products"],
category_averages["avg_price"],
above_average_products["price"].alias("above_avg_price"),
ranked_products["price_rank"],
next_expensive_prices["next_higher_price"],
(products["price"] - next_expensive_prices["next_higher_price"]).alias("price_difference"))
final_result.show()
Equivalent Spark SQL with Multiple CTEs:
WITH
product_totals AS (
SELECT
category_id,
COUNT(*) AS total_products
FROM
products
GROUP BY
category_id
),
category_averages AS (
SELECT
category_id,
AVG(price) AS avg_price
FROM
products
GROUP BY
category_id
),
above_average_products AS (
SELECT
p.product_id,
p.product_name,
p.category_id,
p.price,
ca.avg_price
FROM
products p
JOIN
category_averages ca ON p.category_id = ca.category_id
WHERE
p.price > ca.avg_price
),
ranked_products AS (
SELECT
p.product_id,
p.product_name,
p.category_id,
p.price,
RANK() OVER (PARTITION BY p.category_id ORDER BY p.price DESC) AS price_rank
FROM
products p
),
next_expensive_prices AS (
SELECT
p.product_id,
p.product_name,
p.category_id,
p.price,
LEAD(p.price) OVER (PARTITION BY p.category_id ORDER BY p.price DESC) AS next_higher_price
FROM
products p
),
final_result AS (
SELECT
p.product_id,
p.product_name,
p.category_id,
c.category_name,
p.price,
pt.total_products,
ca.avg_price,
ap.price AS above_avg_price,
rp.price_rank,
nep.next_higher_price,
p.price - nep.next_higher_price AS price_difference
FROM
products p
JOIN
categories c ON p.category_id = c.category_id
LEFT JOIN
product_totals pt ON p.category_id = pt.category_id
LEFT JOIN
category_averages ca ON p.category_id = ca.category_id
LEFT JOIN
above_average_products ap ON p.product_id = ap.product_id
LEFT JOIN
ranked_products rp ON p.product_id = rp.product_id
LEFT JOIN
next_expensive_prices nep ON p.product_id = nep.product_id
)
SELECT
product_id,
product_name,
category_name,
price,
total_products,
avg_price,
above_avg_price,
price_rank,
next_higher_price,
price_difference
FROM
final_result
ORDER BY
category_name, price DESC;
Performance Considerations:
- Execution Plan:
- Run
EXPLAIN
orEXPLAIN EXTENDED
on both the Spark SQL and PySpark transformations to analyze the execution plans. This will help you understand how Spark plans to execute each query and transformation.
- Run
- Benchmarking:
- Benchmark both approaches with representative data sizes. Measure the execution time, resource usage (CPU, memory), and shuffle read/write operations. This will give you practical insights into which approach performs better in your specific context.
- Caching Intermediate Results:
- In PySpark, you can cache intermediate DataFrames if they are reused multiple times, which can improve performance. This can be controlled more explicitly compared to Spark SQL.
- Complex Transformations:
- If your transformations are very complex and require custom logic, PySpark may offer more flexibility and control. However, for standard SQL operations, Spark SQL might be more concise and easier to optimize automatically.
In general, both approaches benefit from Spark’s Catalyst optimizer, but the best choice depends on your specific use case, data size, and complexity of transformations. Performing benchmarking and analyzing execution plans will provide the most accurate assessment of performance differences.
Recursive CTE
Recursive Common Table Expressions (CTEs) are powerful tools for handling queries that involve hierarchical or tree-like data structures, and iterative or recursive operations. They allow you to define a base case and then repeatedly apply the recursive logic until the final result is obtained. Recursive CTEs are widely used in various real-world use cases.
Here are some common use cases for recursive CTEs:
1. Employee Hierarchies (Organizational Trees)
Recursive CTEs are often used to represent and query employee-manager relationships, where employees report to managers, and managers report to higher-level executives. This hierarchical structure can be easily modeled and queried using a recursive CTE.
Use Case: Find All Subordinates for a Manager
Given an employee table where each employee has a manager_id
, you may want to find all employees who report directly or indirectly to a specific manager.
Example:
WITH RECURSIVE subordinates AS (
-- Base case: Start with the given manager (e.g., manager_id=1)
SELECT emp_id, emp_name, manager_id
FROM employees
WHERE manager_id = 1
UNION ALL
-- Recursive step: Find employees who report to the employees from the previous step
SELECT e.emp_id, e.emp_name, e.manager_id
FROM employees e
INNER JOIN subordinates s ON e.manager_id = s.emp_id
)
SELECT * FROM subordinates;
Result:
This query will recursively find all subordinates who report directly or indirectly to the manager with manager_id = 1
.
2. Analyzing Bill of Materials (BOM)
In manufacturing, a Bill of Materials (BOM) defines the components required to produce a product, and those components may have their own subcomponents, creating a multi-level hierarchy.
Use Case: Exploding a Bill of Materials
You want to recursively find all the subcomponents required to manufacture a given product.
Example:
WITH RECURSIVE bom AS (
-- Base case: Start with the main product (e.g., product_id=100)
SELECT product_id, component_id, quantity
FROM components
WHERE product_id = 100
UNION ALL
-- Recursive step: Find the subcomponents of the components from the previous step
SELECT c.product_id, c.component_id, c.quantity
FROM components c
INNER JOIN bom b ON c.product_id = b.component_id
)
SELECT * FROM bom;
Result:
This query will return the entire hierarchy of components needed to build the product, including all subcomponents.
3. Pathfinding in Graphs (Shortest Path)
Recursive CTEs can be used to perform graph traversal, such as finding the shortest path between two nodes in a graph. This is commonly used in transportation networks, social networks, and routing algorithms.
Use Case: Finding the Shortest Path Between Two Nodes
Given a table representing connections (edges) between nodes (vertices), you can use a recursive CTE to find the shortest path between two nodes.
Example:
WITH RECURSIVE paths AS (
-- Base case: Start with the source node (e.g., node_id=1)
SELECT node_id, connected_node_id, 1 AS path_length
FROM edges
WHERE node_id = 1
UNION ALL
-- Recursive step: Continue finding connected nodes
SELECT e.node_id, e.connected_node_id, p.path_length + 1
FROM edges e
INNER JOIN paths p ON e.node_id = p.connected_node_id
)
SELECT * FROM paths
WHERE connected_node_id = 5
ORDER BY path_length
LIMIT 1;
Result:
This query will return the shortest path from node 1 to node 5.
4. Working with Family Trees
Recursive CTEs are useful in genealogical applications, where you need to model and query family trees. Each person can have a parent or a child, and you may want to trace ancestry or descendants.
Use Case: Find All Ancestors of a Person
Given a table of family relationships, you can use a recursive CTE to trace the ancestors of a specific person.
Example:
WITH RECURSIVE ancestors AS (
-- Base case: Start with the person (e.g., person_id=100)
SELECT person_id, parent_id
FROM family
WHERE person_id = 100
UNION ALL
-- Recursive step: Find the parents of the ancestors
SELECT f.person_id, f.parent_id
FROM family f
INNER JOIN ancestors a ON f.person_id = a.parent_id
)
SELECT * FROM ancestors;
Result:
This query will return all ancestors of the person with person_id = 100
.
5. Time-Series Data (Running Totals)
Recursive CTEs can also be used to calculate running totals or cumulative sums over time-series data, where you need to recursively aggregate values.
Use Case: Calculate a Running Total of Sales by Date
You want to calculate the cumulative total of sales up to each date.
Example:
WITH RECURSIVE running_totals AS (
-- Base case: Start with the first date
SELECT sale_date, amount, amount AS running_total
FROM sales
WHERE sale_date = (SELECT MIN(sale_date) FROM sales)
UNION ALL
-- Recursive step: Calculate the running total for each subsequent date
SELECT s.sale_date, s.amount, rt.running_total + s.amount AS running_total
FROM sales s
INNER JOIN running_totals rt ON s.sale_date > rt.sale_date
ORDER BY s.sale_date
)
SELECT * FROM running_totals;
Result:
This query will return a running total of sales for each date.
6. Data Lineage and Change Tracking (History)
In data warehousing and ETL processes, Recursive CTEs can be used to track the history of changes in records or tables, allowing you to understand how data has evolved over time.
Use Case: Trace the History of Changes to a Record
You may have a table that records changes made to data over time, and you want to trace how a particular record has changed.
Example:
WITH RECURSIVE history AS (
-- Base case: Start with the current version of the record
SELECT record_id, change_date, old_value, new_value
FROM changes
WHERE record_id = 100
UNION ALL
-- Recursive step: Trace back to the previous version
SELECT c.record_id, c.change_date, c.old_value, c.new_value
FROM changes c
INNER JOIN history h ON c.new_value = h.old_value
)
SELECT * FROM history;
Result:
This query will return the entire change history of the record with record_id = 100
.
7. Recursive Summation of Hierarchical Data (Financial Reporting)
In financial reporting, recursive CTEs are useful for aggregating financial data across hierarchical levels, such as summing revenues or expenses for all subsidiaries in a corporate structure.
Use Case: Summing Revenues for All Subsidiaries
Given a table that lists companies and their parent companies, you can recursively calculate the total revenue for all subsidiaries.
Example:
WITH RECURSIVE revenue_summary AS (
-- Base case: Start with the main company (e.g., parent_company_id=1)
SELECT company_id, revenue
FROM companies
WHERE parent_company_id = 1
UNION ALL
-- Recursive step: Sum the revenues of all subsidiaries
SELECT c.company_id, rs.revenue + c.revenue
FROM companies c
INNER JOIN revenue_summary rs ON c.parent_company_id = rs.company_id
)
SELECT SUM(revenue) FROM revenue_summary;
Result:
This query will return the total revenue for the company and all its subsidiaries.
8. Data Cleanup and De-Duplication
Recursive CTEs can also be used in data cleaning operations, such as de-duplicating data where there are multiple records for the same entity, and you need to recursively eliminate duplicates.
Use Case: Removing Duplicate Entries Based on Conditions
You can use a recursive CTE to keep only the most recent or relevant record for each entity.
Example:
WITH RECURSIVE de_duplication AS (
-- Base case: Select the first occurrence of each entity
SELECT entity_id, data, change_date
FROM records
WHERE entity_id IN (SELECT DISTINCT entity_id FROM records)
UNION ALL
-- Recursive step: Select subsequent occurrences and decide whether to keep them
SELECT r.entity_id, r.data, r.change_date
FROM records r
INNER JOIN de_duplication dd ON r.entity_id = dd.entity_id AND r.change_date > dd.change_date
)
SELECT * FROM de_duplication;
Result:
This query will return a de-duplicated list of records based on the latest
entry for each entity.
Points:
Recursive CTEs are incredibly versatile and can be used in various real-world scenarios such as:
- Hierarchical Data (employee-manager relationships, family trees).
- Graph and Network Traversal (shortest paths, pathfinding).
- Bill of Materials (BOM) and multi-level hierarchies.
- Time-Series Calculations (running totals, cumulative sums).
- Change Tracking (historical data lineage).
- Financial Rollups (aggregating hierarchical financial data).
- Data Cleanup (de-duplication and data correction).
Recursive CTE in PySpark (Simulating Recursion)
PySpark does not directly support Recursive CTEs. However, you can simulate the same using iterative loops or DataFrame joins.
Simulating Recursive CTE Using Iterative Joins in PySpark
Let’s try to mimic the recursive CTE using loops in PySpark to get all subordinates for a manager.
1. Sample Data (Employee Hierarchy in PySpark):
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("RecursiveCTE").getOrCreate()
# Sample data
data = [(1, 'John', None), (2, 'Mary', 1), (3, 'Mike', 1), (4, 'Jane', 2), (5, 'Peter', 3)]
columns = ['emp_id', 'emp_name', 'manager_id']
# Create the DataFrame
employees_df = spark.createDataFrame(data, columns)
# Show the initial data
employees_df.show()
emp_id | emp_name | manager_id |
---|---|---|
1 | John | NULL |
2 | Mary | 1 |
3 | Mike | 1 |
4 | Jane | 2 |
5 | Peter | 3 |
2. Recursive Query to Find Subordinates of a Manager:
In this example, we will simulate recursion by iteratively joining the DataFrame on itself.
from pyspark.sql.functions import col
# Define the starting manager (e.g., John, emp_id=1)
manager_id = 1
# Anchor: Start with the manager
result_df = employees_df.filter(col('emp_id') == manager_id)
# Iteratively find all subordinates (mimicking recursion)
for i in range(5): # Loop to find multiple levels of subordinates
new_subordinates = employees_df.join(result_df, employees_df['manager_id'] == result_df['emp_id'], 'inner')
.select(employees_df['emp_id'], employees_df['emp_name'], employees_df['manager_id'])
result_df = result_df.union(new_subordinates)
# Show the final result (all subordinates)
result_df.show()
Result:
emp_id | emp_name | manager_id |
---|---|---|
1 | John | NULL |
2 | Mary | 1 |
3 | Mike | 1 |
4 | Jane | 2 |
5 | Peter | 3 |
Explanation:
- Anchor (Base case): Start with the base manager (John,
emp_id=1
). - Iterative Join: In each iteration, we join the
employees_df
with the result of the previous iteration to find employees reporting to the current manager or subordinates. - Union: We accumulate the results by unioning the DataFrames.
- Loop: The loop mimics recursion, where we keep expanding the subordinates until no more subordinates are found.
3. Alternative: Using GraphX (for Hierarchical or Graph-like Data)
If you are dealing with hierarchical data such as employee hierarchies, family trees, or network graphs, you may want to consider using GraphX or GraphFrames, which are part of Apache Spark.
Example Using GraphFrames:
from graphframes import GraphFrame
# Create vertices (employees)
vertices = spark.createDataFrame([(1, 'John'), (2, 'Mary'), (3, 'Mike'), (4, 'Jane'), (5, 'Peter')], ["id", "name"])
# Create edges (manager relationships)
edges = spark.createDataFrame([(1, 2), (1, 3), (2, 4), (3, 5)], ["src", "dst"])
# Create the GraphFrame
g = GraphFrame(vertices, edges)
# Run BFS (Breadth First Search) to find all subordinates of a manager (John)
results = g.bfs(fromExpr="id = 1", toExpr="id > 0")
results.show()
This approach can be more efficient when dealing with deeply hierarchical or graph-based data in PySpark.
Summary:
- Recursive CTE is commonly used in SQL databases to handle hierarchical or tree-structured data.
- While Spark SQL doesn’t natively support Recursive CTE, you can simulate it in PySpark using iterative
JOIN
operations. - Alternatively, for graph-related hierarchical data, consider using GraphFrames or GraphX in Spark for more efficient traversal of complex structures.
Discover more from HintsToday
Subscribe to get the latest posts sent to your email.