What is the difference between row_number(), rank(), and dense_rank() in SQL?
The functions row_number()
, rank()
, and dense_rank()
are all window functions in SQL that assign unique numbers to rows within a window (or partition). However, they differ in how they handle ties (rows with identical ordering values).
1. row_number()
- Assigns a unique sequential number to each row within a window or partition.
- Does not handle ties โ each row gets a unique number regardless of duplicate values in the ordering column.
Example:
EmployeeID | Salary | row_number() (ORDER BY Salary DESC) |
---|---|---|
1 | 9000 | 1 |
2 | 9000 | 2 |
3 | 8000 | 3 |
4 | 7000 | 4 |
- Tied rows (same Salary) are given unique ranks in the order they appear.
2. rank()
- Assigns a rank to each row based on the specified order.
- Rows with the same ordering value (ties) get the same rank, and the next rank skips by the number of tied rows.
Example:
EmployeeID | Salary | rank() (ORDER BY Salary DESC) |
---|---|---|
1 | 9000 | 1 |
2 | 9000 | 1 |
3 | 8000 | 3 |
4 | 7000 | 4 |
- Rows 1 and 2 tie for first place, so they both get rank 1, but the next rank starts at 3.
3. dense_rank()
- Similar to
rank()
, but does not skip ranks after ties. Tied rows get the same rank, and the next rank is incremented by 1.
Example:
EmployeeID | Salary | dense_rank() (ORDER BY Salary DESC) |
---|---|---|
1 | 9000 | 1 |
2 | 9000 | 1 |
3 | 8000 | 2 |
4 | 7000 | 3 |
- Rows 1 and 2 tie for first place, so they both get rank 1, but the next rank starts at 2, with no gaps.
Key Differences:
Feature | row_number() | rank() | dense_rank() |
---|---|---|---|
Handling Ties | Does not handle ties | Same rank for ties, skips ranks | Same rank for ties, no skipped ranks |
Uniqueness | Always unique | May repeat for ties | May repeat for ties |
Rank Gaps | No gaps in numbering | Gaps after ties | No gaps |
Use Cases:
row_number()
:- When you need a unique identifier for each row in a partition.
- Commonly used to find the first N rows in each partition.
rank()
:- When ranking with acknowledgment of ties is important, but gaps are acceptable.
dense_rank()
:- When ranking with acknowledgment of ties is important, but gaps are not acceptable.
PySpark Example:
Sample Data:
data = [(1, 9000), (2, 9000), (3, 8000), (4, 7000)]
columns = ["EmployeeID", "Salary"]
df = spark.createDataFrame(data, columns)
Code:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank
window_spec = Window.orderBy(col("Salary").desc())
df.select(
col("EmployeeID"),
col("Salary"),
row_number().over(window_spec).alias("row_number"),
rank().over(window_spec).alias("rank"),
dense_rank().over(window_spec).alias("dense_rank")
).show()
Output:
EmployeeID | Salary | row_number | rank | dense_rank |
---|---|---|---|---|
1 | 9000 | 1 | 1 | 1 |
2 | 9000 | 2 | 1 | 1 |
3 | 8000 | 3 | 3 | 2 |
4 | 7000 | 4 | 4 | 3 |
How would you find customers who have bought a product at least 3 times consecutively? – Table name: Orders – Columns: CustomerID, ProductID, OrderDate? Please provide solution in spark sql and in pyspark?
Question is Bit vague as most can read it as 3 times on consecutive Days,but if not mentioned First i will consider it as 3 consecutive orders even if it happened on same days.
We will use window functions but which one:-
Here Answer demands consecutive order but no mention of any constraint on Date. So Considering Orders might have happened on same day we should use Dense_Rank.
Here are the solutions in Spark SQL and PySpark:
Sample Data
Python
data = [
(1, 101, "2022-01-01"),
(1, 101, "2022-01-15"),
(1, 101, "2022-02-01"),
(1, 102, "2022-03-01"),
(2, 101, "2022-01-01"),
(2, 101, "2022-01-20"),
(3, 101, "2022-01-01"),
(3, 101, "2022-01-10"),
(3, 101, "2022-01-20"),
(3, 101, "2022-02-01")
]
columns = ["CustomerID", "ProductID", "OrderDate"]
df = spark.createDataFrame(data).toDF(*columns)
Spark SQL Solution
WITH RankedOrders AS (
SELECT CustomerID, ProductID, OrderDate,
DENSE_RANK() OVER (PARTITION BY CustomerID, ProductID ORDER BY OrderDate) AS OrderRank
FROM Orders
)
SELECT DISTINCT CustomerID
FROM RankedOrders
WHERE OrderRank >= 3
PySpark Solution
from pyspark.sql import Window
from pyspark.sql.functions import dense_rank, col
window = Window.partitionBy("CustomerID", "ProductID").orderBy("OrderDate")
ranked_orders = df.withColumn("OrderRank", dense_rank().over(window))
consecutive_customers = ranked_orders.filter(col("OrderRank") >= 3).select("CustomerID").distinct()
In both solutions, we use a window function to rank the orders for each customer and product. We then filter the results to find customers who have placed at least 3 consecutive orders.
Note that the Spark SQL solution uses the DENSE_RANK
function, while the PySpark solution uses the dense_rank
function from the pyspark.sql.functions
module.
Also, in the PySpark solution, we use the withColumn
method to add the OrderRank
column to the DataFrame.
Output
The output of both solutions will be:
+———-+ |CustomerID| +———-+ | 1| | 3| +———-+
But there can be a very interesting version of Code, if we have to consider consecutive days Orders.
L
CREATE TABLE Orders (
CustomerID INT,
ProductID INT,
OrderDate DATE
);
INSERT INTO Orders (CustomerID, ProductID, OrderDate)
VALUES
(1, 101, '2022-01-01'),
(1, 101, '2022-01-02'),
(1, 101, '2022-01-03'),
(1, 102, '2022-01-10'),
(2, 101, '2022-01-01'),
(2, 101, '2022-01-05'),
(3, 101, '2022-01-01'),
(3, 101, '2022-01-02'),
(3, 101, '2022-01-03'),
(3, 101, '2022-01-04');
Query
SQL
WITH OrderedOrders AS (
SELECT CustomerID, ProductID, OrderDate,
DENSE_RANK() OVER (PARTITION BY CustomerID, ProductID ORDER BY OrderDate) AS DenseRank,
LAG(OrderDate, 1) OVER (PARTITION BY CustomerID, ProductID ORDER BY OrderDate) AS PrevOrderDate,
LAG(OrderDate, 2) OVER (PARTITION BY CustomerID, ProductID ORDER BY OrderDate) AS PrevPrevOrderDate
FROM Orders
)
SELECT *
FROM OrderedOrders
ORDER BY CustomerID, ProductID, OrderDate;
Result
CustomerID | ProductID | OrderDate | DenseRank | PrevOrderDate | PrevPrevOrderDate |
---|---|---|---|---|---|
1 | 101 | 2022-01-01 | 1 | NULL | NULL |
1 | 101 | 2022-01-02 | 2 | 2022-01-01 | NULL |
1 | 101 | 2022-01-03 | 3 | 2022-01-02 | 2022-01-01 |
1 | 102 | 2022-01-10 | 1 | NULL | NULL |
2 | 101 | 2022-01-01 | 1 | NULL | NULL |
2 | 101 | 2022-01-05 | 2 | 2022-01-01 | NULL |
3 | 101 | 2022-01-01 | 1 | NULL | NULL |
3 | 101 | 2022-01-02 | 2 | 2022-01-01 | NULL |
3 | 101 | 2022-01-03 | 3 | 2022-01-02 | 2022-01-01 |
3 | 101 | 2022-01-04 | 4 | 2022-01-03 | 2022-01-02 |
Note that:
- The
DENSE_RANK
function assigns a unique rank to each row within a partition of a result set. - The
LAG
function returns the value of a previous row within a partition of a result set. - The
PARTITION BY
clause divides the result set into partitions to which the function is applied. - The
ORDER BY
clause specifies the order of the rows within each partition.
Python
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
# Create a SparkSession
spark = SparkSession.builder.appName("Example").getOrCreate()
# Create a DataFrame
data = [
(1, 101, "2022-01-01"),
(1, 101, "2022-01-02"),
(1, 101, "2022-01-03"),
(1, 102, "2022-01-10"),
(2, 101, "2022-01-01"),
(2, 101, "2022-01-05"),
(3, 101, "2022-01-01"),
(3, 101, "2022-01-02"),
(3, 101, "2022-01-03"),
(3, 101, "2022-01-04")
]
columns = ["CustomerID", "ProductID", "OrderDate"]
df = spark.createDataFrame(data).toDF(*columns)
# Define the window
window = Window.partitionBy("CustomerID", "ProductID").orderBy("OrderDate")
# Calculate the dense rank, lag, and date differences
df = df.withColumn("DenseRank", F.dense_rank().over(window))
df = df.withColumn("PrevOrderDate", F.lag("OrderDate", 1).over(window))
df = df.withColumn("PrevPrevOrderDate", F.lag("OrderDate", 2).over(window))
df = df.withColumn("OrderDate_PrevOrderDate", F.datediff("OrderDate", F.col("PrevOrderDate")))
df = df.withColumn("OrderDate_PrevPrevOrderDate", F.datediff("OrderDate", F.col("PrevPrevOrderDate")))
# Show the results
df.show()
Here are the solutions in Spark SQL and PySpark:
Spark SQL Solution
SQL
WITH OrderedOrders AS (
SELECT CustomerID, ProductID, OrderDate,
LAG(OrderDate, 1) OVER (PARTITION BY CustomerID, ProductID ORDER BY OrderDate) AS PrevOrderDate,
LAG(OrderDate, 2) OVER (PARTITION BY CustomerID, ProductID ORDER BY OrderDate) AS PrevPrevOrderDate
FROM Orders
)
SELECT DISTINCT CustomerID
FROM OrderedOrders
WHERE OrderDate - PrevOrderDate <= INTERVAL 1 DAY
AND OrderDate - PrevPrevOrderDate <= INTERVAL 2 DAY
PySpark Solution
Python
from pyspark.sql import Window
from pyspark.sql.functions import lag, col
window = Window.partitionBy("CustomerID", "ProductID").orderBy("OrderDate")
ordered_orders = df.withColumn("PrevOrderDate", lag("OrderDate", 1).over(window)) \
.withColumn("PrevPrevOrderDate", lag("OrderDate", 2).over(window))
consecutive_orders = ordered_orders.filter((col("OrderDate") - col("PrevOrderDate")).cast("long") <= 86400) \
.filter((col("OrderDate") - col("PrevPrevOrderDate")).cast("long") <= 172800)
consecutive_customers = consecutive_orders.select("CustomerID").distinct()
In both solutions, we use a window function to lag the OrderDate column by 1 and 2 rows, partitioned by CustomerID and ProductID. We then filter the results to find customers who have placed orders at least 3 times consecutively.
Note that the Spark SQL solution uses the LAG
function, while the PySpark solution uses the lag
function from the pyspark.sql.functions
module.
Also, in the PySpark solution, we cast the result of the subtraction to a long value to compare it with the number of seconds in a day (86400) or two days (172800).
How to find employees whose salary is greater than the average salary of employees in their respective location?
Table Name: Employee
Column Names: EmpID (Employee ID), Emp_name (Employee Name), Manager_id (Manager ID), Salary (Employee Salary), Location (Employee Location)
using spark sql or pyspark
Complete PySpark Solution with Sample Data
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import col, avg
# Initialize Spark session
spark = SparkSession.builder.master("local").appName("EmployeeSalaryExample").getOrCreate()
# Sample data
data = [
(1, "Alice", 10, 8000, "New York"),
(2, "Bob", 11, 12000, "New York"),
(3, "Charlie", 10, 7000, "Chicago"),
(4, "David", 12, 11000, "New York"),
(5, "Eve", 13, 6000, "Chicago")
]
# Define schema
columns = ["EmpID", "Emp_name", "Manager_id", "Salary", "Location"]
# Create DataFrame
employee_df = spark.createDataFrame(data, columns)
# Define a window partitioned by location
location_window = Window.partitionBy("Location")
# Step 1: Calculate the average salary for each location
employee_with_avg_salary = employee_df.withColumn(
"avg_salary", avg("Salary").over(location_window)
)
# Step 2: Filter employees whose salary is greater than the average salary
result = employee_with_avg_salary.filter(col("Salary") > col("avg_salary"))
# Show the result
result.select("EmpID", "Emp_name", "Manager_id", "Salary", "Location", "avg_salary").show()
Explanation of the Process:
- Spark Session: The Spark session is initialized with
spark = SparkSession.builder.master("local").appName("EmployeeSalaryExample").getOrCreate()
. - Sample Data: A list of tuples is created where each tuple represents an employee. Each tuple contains:
EmpID
: Employee IDEmp_name
: Employee NameManager_id
: Manager IDSalary
: Employee SalaryLocation
: Employee Location
- Schema: The column names are defined, and a DataFrame
employee_df
is created using the sample data and schema. - Window Function: The window is defined by partitioning the data by
Location
so that the average salary can be computed for each location. - Filter: We filter out employees whose salary is greater than the calculated average salary for their respective location.
- Show Result: Finally, the result is displayed showing the
EmpID
,Emp_name
,Manager_id
,Salary
,Location
, andavg_salary
.
Example Output:
+-----+--------+-----------+------+--------+----------+
|EmpID|Emp_name|Manager_id |Salary|Location|avg_salary|
+-----+--------+-----------+------+--------+----------+
| 2| Bob| 11| 12000| New York| 10000|
| 4| David| 12| 11000| New York| 10000|
+-----+--------+-----------+------+--------+----------+
Spark SQL Version
If you’d prefer to use Spark SQL, the following steps will also work. You can first register the DataFrame as a temporary table and then run the SQL query.
# Register the DataFrame as a temporary SQL table
employee_df.createOrReplaceTempView("Employee")
# Spark SQL to find employees with salary greater than average salary in their location
query = """
WITH LocationAvgSalary AS (
SELECT
EmpID,
Emp_name,
Manager_id,
Salary,
Location,
AVG(Salary) OVER (PARTITION BY Location) AS avg_salary
FROM Employee
)
SELECT
EmpID,
Emp_name,
Manager_id,
Salary,
Location,
avg_salary
FROM LocationAvgSalary
WHERE Salary > avg_salary
"""
# Run the SQL query and show the results
result_sql = spark.sql(query)
result_sql.show()
Both methods (PySpark with Window
functions and Spark SQL) will provide the same result.
aggregate functions can be applied with window functions in PySpark. These functions are used to compute aggregate values over a defined window or partition, and they do not require explicit grouping.
Supported Aggregate Functions for Window Operations
Some commonly used aggregate functions include:
avg
: Computes the average value.sum
: Computes the sum of values.count
: Counts the number of values.min
: Finds the minimum value.max
: Finds the maximum value.stddev
: Computes the standard deviation.first
: Gets the first value in the window.last
: Gets the last value in the window.collect_list
: Collects all values in the window as a list.collect_set
: Collects all distinct values in the window as a set.
General Syntax for Window Aggregate Functions
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, sum, count, min, max
# Define a window
window_spec = Window.partitionBy("partition_column").orderBy("order_column")
# Apply aggregate function over the window
df.withColumn("new_column", aggregate_function("column").over(window_spec))
Examples
1. Average Salary by Department
Compute the average salary for each department and append it as a new column.
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import avg
# Initialize Spark session
spark = SparkSession.builder.master("local").appName("WindowFunctionsExample").getOrCreate()
# Sample data
data = [
(1, "Alice", "HR", 6000),
(2, "Bob", "HR", 7000),
(3, "Charlie", "IT", 8000),
(4, "David", "IT", 12000),
(5, "Eve", "Finance", 9000)
]
columns = ["EmpID", "Emp_name", "Department", "Salary"]
# Create DataFrame
df = spark.createDataFrame(data, columns)
# Define the window specification
window_spec = Window.partitionBy("Department")
# Add a column with the average salary per department
df = df.withColumn("avg_salary", avg("Salary").over(window_spec))
df.show()
Output:
+-----+--------+----------+------+----------+
|EmpID|Emp_name|Department|Salary|avg_salary|
+-----+--------+----------+------+----------+
| 1| Alice| HR| 6000| 6500.0|
| 2| Bob| HR| 7000| 6500.0|
| 3| Charlie| IT| 8000| 10000.0|
| 4| David| IT| 12000| 10000.0|
| 5| Eve| Finance| 9000| 9000.0|
+-----+--------+----------+------+----------+
2. Rank Employees by Salary Within Departments
Calculate rank based on salary within each department.
from pyspark.sql.functions import rank
# Define a window specification
window_spec = Window.partitionBy("Department").orderBy(col("Salary").desc())
# Add a rank column
df = df.withColumn("rank", rank().over(window_spec))
df.show()
Output:
+-----+--------+----------+------+----------+----+
|EmpID|Emp_name|Department|Salary|avg_salary|rank|
+-----+--------+----------+------+----------+----+
| 2| Bob| HR| 7000| 6500.0| 1|
| 1| Alice| HR| 6000| 6500.0| 2|
| 4| David| IT| 12000| 10000.0| 1|
| 3| Charlie| IT| 8000| 10000.0| 2|
| 5| Eve| Finance| 9000| 9000.0| 1|
+-----+--------+----------+------+----------+----+
3. Total Salary by Department
Calculate the total salary for each department.
from pyspark.sql.functions import sum
# Add a column with the total salary per department
df = df.withColumn("total_salary", sum("Salary").over(window_spec))
df.show()
Output:
+-----+--------+----------+------+----------+----+------------+
|EmpID|Emp_name|Department|Salary|avg_salary|rank|total_salary|
+-----+--------+----------+------+----------+----+------------+
| 2| Bob| HR| 7000| 6500.0| 1| 13000|
| 1| Alice| HR| 6000| 6500.0| 2| 13000|
| 4| David| IT| 12000| 10000.0| 1| 20000|
| 3| Charlie| IT| 8000| 10000.0| 2| 20000|
| 5| Eve| Finance| 9000| 9000.0| 1| 9000|
+-----+--------+----------+------+----------+----+------------+
Points to Remember
- Window Specification: You can define partitioning (
partitionBy
) and ordering (orderBy
) for your window. - Partition vs Aggregate:
- Without a window, aggregates work on the entire DataFrame.
- With a window, aggregates are calculated for each partition.
- Aggregate Functions: These work seamlessly with window specifications and are highly optimized in PySpark.
How to calculate the percentage of successful payments for each driver. A payment is considered successful if its status is ‘Completed’.
Table Name: Rides
Column Names: ride_id (Ride ID), driver_id (Driver ID), fare_amount (Fare Amount), driver_rating (Driver Rating), start_time (Start Time)
Table Name: Payments
Column Names: payment_id (Payment ID), ride_id (Ride ID), payment_status (Payment Status)
Step-by-Step Solution to Calculate the Percentage of Successful Payments
We will:
- Create sample data for the
Rides
andPayments
tables. - Join the tables on
ride_id
to associate rides with payment statuses. - Calculate the percentage of successful payments (
Completed
status) for each driver.
Spark SQL Solution
Sample Data Creation
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Initialize Spark session
spark = SparkSession.builder.appName("PercentageSuccess").getOrCreate()
# Sample data for Rides table
rides_data = [
(1, 'D1', 100, 4.5, '2024-01-01 10:00:00'),
(2, 'D1', 150, 4.7, '2024-01-02 12:00:00'),
(3, 'D2', 200, 4.8, '2024-01-03 14:00:00'),
(4, 'D2', 180, 4.6, '2024-01-04 16:00:00'),
(5, 'D3', 120, 4.3, '2024-01-05 18:00:00'),
]
rides_columns = ["ride_id", "driver_id", "fare_amount", "driver_rating", "start_time"]
rides_df = spark.createDataFrame(rides_data, rides_columns)
rides_df.createOrReplaceTempView("Rides")
# Sample data for Payments table
payments_data = [
(101, 1, 'Completed'),
(102, 2, 'Failed'),
(103, 3, 'Completed'),
(104, 4, 'Completed'),
(105, 5, 'Failed'),
]
payments_columns = ["payment_id", "ride_id", "payment_status"]
payments_df = spark.createDataFrame(payments_data, payments_columns)
payments_df.createOrReplaceTempView("Payments")
Query in Spark SQL
WITH DriverPayments AS (
SELECT
R.driver_id,
COUNT(*) AS total_rides,
SUM(CASE WHEN P.payment_status = 'Completed' THEN 1 ELSE 0 END) AS successful_payments
FROM Rides R
JOIN Payments P ON R.ride_id = P.ride_id
GROUP BY R.driver_id
),
DriverSuccessRate AS (
SELECT
driver_id,
total_rides,
successful_payments,
ROUND((successful_payments * 100.0) / total_rides, 2) AS success_rate
FROM DriverPayments
)
SELECT * FROM DriverSuccessRate;
PySpark Solution
from pyspark.sql.functions import count, sum, when, col, round
# Join the two DataFrames
joined_df = rides_df.join(payments_df, "ride_id")
# Calculate total rides and successful payments
result_df = joined_df.groupBy("driver_id") \
.agg(
count("*").alias("total_rides"),
sum(when(col("payment_status") == "Completed", 1).otherwise(0)).alias("successful_payments")
) \
.withColumn("success_rate", round((col("successful_payments") * 100.0) / col("total_rides"), 2))
result_df.show()
Expected Output
For the given sample data:
driver_id | total_rides | successful_payments | success_rate |
---|---|---|---|
D1 | 2 | 1 | 50.00 |
D2 | 2 | 2 | 100.00 |
D3 | 1 | 0 | 0.00 |
Key Points:
- Spark SQL uses
SUM(CASE WHEN ...)
for conditional aggregation. - PySpark uses
when().otherwise()
to create conditional expressions. - Both methods compute the same result efficiently for large datasets.
How to calculate the percentage of menu items sold for each restaurant.
Table Name: Items
Column Names: item_id (Item ID), rest_id (Restaurant ID)
Table Name: Orders
Column Names: order_id (Order ID), item_id (Item ID), quantity (Quantity), is_offer (Is Offer), client_id (Client ID), Date_Timestamp (Date Timestamp)
Calculate the Percentage of Menu Items Sold for Each Restaurant
We need to:
- Determine the total unique menu items available (
Items
table) for each restaurant. - Find the number of unique menu items sold (
Orders
table) for each restaurant. - Calculate the percentage of menu items sold for each restaurant.
Spark SQL Solution
Sample Data Creation
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("MenuItemsPercentage").getOrCreate()
# Sample data for Items table
items_data = [
(1, 'R1'),
(2, 'R1'),
(3, 'R1'),
(4, 'R2'),
(5, 'R2'),
(6, 'R3'),
]
items_columns = ["item_id", "rest_id"]
items_df = spark.createDataFrame(items_data, items_columns)
items_df.createOrReplaceTempView("Items")
# Sample data for Orders table
orders_data = [
(101, 1, 2, False, 'C1', '2024-01-01 10:00:00'),
(102, 2, 1, True, 'C2', '2024-01-02 12:00:00'),
(103, 3, 3, False, 'C1', '2024-01-03 14:00:00'),
(104, 4, 1, True, 'C3', '2024-01-04 16:00:00'),
(105, 6, 2, False, 'C4', '2024-01-05 18:00:00'),
]
orders_columns = ["order_id", "item_id", "quantity", "is_offer", "client_id", "date_timestamp"]
orders_df = spark.createDataFrame(orders_data, orders_columns)
orders_df.createOrReplaceTempView("Orders")
Query in Spark SQL
WITH TotalMenuItems AS (
SELECT
rest_id,
COUNT(DISTINCT item_id) AS total_menu_items
FROM Items
GROUP BY rest_id
),
SoldMenuItems AS (
SELECT
I.rest_id,
COUNT(DISTINCT O.item_id) AS sold_menu_items
FROM Orders O
JOIN Items I ON O.item_id = I.item_id
GROUP BY I.rest_id
),
MenuPercentage AS (
SELECT
T.rest_id,
T.total_menu_items,
S.sold_menu_items,
ROUND((S.sold_menu_items * 100.0) / T.total_menu_items, 2) AS percentage_sold
FROM TotalMenuItems T
LEFT JOIN SoldMenuItems S ON T.rest_id = S.rest_id
)
SELECT * FROM MenuPercentage;
PySpark Solution
from pyspark.sql.functions import countDistinct, col, round
# Calculate total menu items for each restaurant
total_menu_items_df = items_df.groupBy("rest_id") \
.agg(countDistinct("item_id").alias("total_menu_items"))
# Calculate sold menu items for each restaurant
sold_menu_items_df = orders_df.join(items_df, "item_id") \
.groupBy("rest_id") \
.agg(countDistinct("item_id").alias("sold_menu_items"))
# Join the two DataFrames and calculate the percentage
result_df = total_menu_items_df.join(sold_menu_items_df, "rest_id", "left_outer") \
.withColumn("percentage_sold", round((col("sold_menu_items") * 100.0) / col("total_menu_items"), 2))
result_df.show()
Expected Output
For the given sample data:
rest_id | total_menu_items | sold_menu_items | percentage_sold |
---|---|---|---|
R1 | 3 | 3 | 100.00 |
R2 | 2 | 1 | 50.00 |
R3 | 1 | 1 | 100.00 |
Key Points:
COUNT(DISTINCT column)
is used to count unique items.- The
LEFT JOIN
ensures that restaurants with no sold items are included in the results. - PySpark and Spark SQL yield the same result, suitable for large datasets.
How to compare the time taken for clients who placed their first order with an offer versus those without an offer to make their next order. Table Name: Orders Column Names: order_id (Order ID), user_id (User ID), is_offer (Is Offer), Date_Timestamp (Date Timestamp)
Compare the time taken for clients who placed their first order with an offer versus those without an offer to make their next order. This involves calculating the time difference between the first and second orders for each user, categorized by whether the first order was placed with an offer.
Steps to Solve
- Identify each user’s first order.
- Identify each user’s second order.
- Calculate the time difference between the first and second orders.
- Categorize the users based on whether their first order was placed with an offer.
- Compare the average time taken for the next order between the two groups.
Sample Data Creation
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, min, lead, datediff
from pyspark.sql.window import Window
# Initialize Spark session
spark = SparkSession.builder.appName("FirstOrderAnalysis").getOrCreate()
# Sample data
data = [
(1, 'U1', True, '2024-01-01 10:00:00'),
(2, 'U1', False, '2024-01-05 14:00:00'),
(3, 'U2', True, '2024-01-02 12:00:00'),
(4, 'U2', False, '2024-01-10 16:00:00'),
(5, 'U3', False, '2024-01-03 08:00:00'),
(6, 'U3', True, '2024-01-12 20:00:00'),
]
columns = ["order_id", "user_id", "is_offer", "Date_Timestamp"]
orders_df = spark.createDataFrame(data, columns)
orders_df = orders_df.withColumn("Date_Timestamp", col("Date_Timestamp").cast("timestamp"))
orders_df.show()
PySpark Solution
1. Identify First and Second Orders for Each User
Use a window function to rank orders based on the timestamp for each user.
from pyspark.sql.functions import unix_timestamp
# Define a window partitioned by user_id and ordered by Date_Timestamp
window_spec = Window.partitionBy("user_id").orderBy("Date_Timestamp")
# Add a rank column to identify the first and second orders
ranked_orders = orders_df.withColumn("order_rank", col("order_id").over(window_spec))
# Separate first and second orders
first_orders = ranked_orders.filter(col("order_rank") == 1) \
.select("user_id", "is_offer", "Date_Timestamp")
second_orders = ranked_orders.filter(col("order_rank") == 2) \
.select("user_id", col("Date_Timestamp").alias("second_order_timestamp"))
2. Calculate the Time Difference Between First and Second Orders
Join the first and second orders and calculate the time difference.
# Join first and second orders
joined_orders = first_orders.join(second_orders, "user_id", "inner")
# Calculate time difference in days
result_df = joined_orders.withColumn(
"days_to_next_order",
(unix_timestamp("second_order_timestamp") - unix_timestamp("Date_Timestamp")) / 86400
)
result_df.show()
3. Group by is_offer
and Calculate Average Time
Group the results by whether the first order used an offer and calculate the average time difference.
from pyspark.sql.functions import avg
# Group by is_offer and calculate average time
average_time_df = result_df.groupBy("is_offer") \
.agg(avg("days_to_next_order").alias("avg_days_to_next_order"))
average_time_df.show()
Expected Output
For the given sample data:
Final Result
is_offer | avg_days_to_next_order |
---|---|
True | 6.5 |
False | 9.0 |
Key Concepts Used
- Window Functions:
partitionBy
andorderBy
to rank the orders for each user. - Date Calculations: Using
unix_timestamp
to calculate the difference between timestamps. - Grouping and Aggregation:
groupBy
to calculate the average days to the next order based on theis_offer
column.
Spark SQL Solution
Query
WITH RankedOrders AS (
SELECT
user_id,
is_offer,
Date_Timestamp,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY Date_Timestamp) AS order_rank
FROM Orders
),
FirstOrders AS (
SELECT
user_id,
is_offer,
Date_Timestamp AS first_order_timestamp
FROM RankedOrders
WHERE order_rank = 1
),
SecondOrders AS (
SELECT
user_id,
Date_Timestamp AS second_order_timestamp
FROM RankedOrders
WHERE order_rank = 2
),
JoinedOrders AS (
SELECT
F.user_id,
F.is_offer,
F.first_order_timestamp,
S.second_order_timestamp,
DATEDIFF(S.second_order_timestamp, F.first_order_timestamp) AS days_to_next_order
FROM FirstOrders F
JOIN SecondOrders S ON F.user_id = S.user_id
)
SELECT
is_offer,
AVG(days_to_next_order) AS avg_days_to_next_order
FROM JoinedOrders
GROUP BY is_offer;
This query achieves the same result as the PySpark solution but directly in SQL.
How to find the length of the longest sequence of consecutive numbers in the table.
Table Name: Consecutive
Column Names: number (Number)
Sample Table –
Number
1
2
3
4
10
11
20
21
22
23
24
30
To find the length of the longest sequence of consecutive numbers in the table Consecutive
, we can use a difference-based grouping strategy in both Spark SQL and PySpark. The idea is to assign a group identifier to each sequence of consecutive numbers by leveraging the difference between the number and its row index.
Spark SQL Solution
Query:
WITH RankedNumbers AS (
SELECT
number,
ROW_NUMBER() OVER (ORDER BY number) AS row_index
FROM Consecutive
),
GroupedNumbers AS (
SELECT
number,
row_index,
number - row_index AS group_id
FROM RankedNumbers
),
GroupLengths AS (
SELECT
group_id,
COUNT(*) AS sequence_length
FROM GroupedNumbers
GROUP BY group_id
)
SELECT
MAX(sequence_length) AS longest_consecutive_sequence
FROM GroupLengths;
Explanation:
- RankedNumbers: Assign a
ROW_NUMBER
to each number ordered by its value. - GroupedNumbers: Calculate
group_id
asnumber - row_index
. This creates a unique identifier for each sequence of consecutive numbers. - GroupLengths: Count the number of elements in each group.
- Final Query: Find the maximum sequence length.
PySpark Solution
Sample Data:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, expr
from pyspark.sql.window import Window
# Initialize Spark session
spark = SparkSession.builder.appName("LongestConsecutiveSequence").getOrCreate()
# Sample data
data = [(1,), (2,), (3,), (4,), (10,), (11,), (20,), (21,), (22,), (23,), (24,), (30,)]
columns = ["number"]
consecutive_df = spark.createDataFrame(data, columns)
PySpark Implementation:
# Assign row numbers based on the order of numbers
window_spec = Window.orderBy("number")
ranked_df = consecutive_df.withColumn("row_index", row_number().over(window_spec))
# Create group_id based on the difference between number and row_index
grouped_df = ranked_df.withColumn("group_id", col("number") - col("row_index"))
# Count the size of each group
group_lengths = grouped_df.groupBy("group_id").count()
# Find the maximum sequence length
longest_sequence = group_lengths.agg({"count": "max"}).withColumnRenamed("max(count)", "longest_sequence")
longest_sequence.show()
Explanation:
- Ranked DataFrame: Compute a
row_index
for each number usingrow_number
ordered bynumber
. - Group Identification: Calculate
group_id
asnumber - row_index
. - Group Lengths: Group by
group_id
and count the number of rows in each group. - Find Maximum: Aggregate to find the maximum count.
Expected Output
For the sample data:
number | longest_sequence |
---|---|
5 |
- Sequences:
[1, 2, 3, 4]
,[10, 11]
,[20, 21, 22, 23, 24]
,[30]
- Longest Sequence:
5
(20, 21, 22, 23, 24
)
How can you find employees whose salary is greater than their manager’s salary?
– Table name: Employees
– Columns: EmployeeID, Salary, ManagerID
To find employees whose salary is greater than their manager’s salary, you can use a self-join in Spark SQL or PySpark. Here’s how:
Spark SQL Solution
Query:
SELECT
e1.EmployeeID AS EmployeeID,
e1.Salary AS EmployeeSalary,
e2.Salary AS ManagerSalary,
e1.ManagerID
FROM Employees e1
INNER JOIN Employees e2
ON e1.ManagerID = e2.EmployeeID
WHERE e1.Salary > e2.Salary;
Explanation:
- Perform a self-join on the
Employees
table:- Join the table
e1
(employees) withe2
(managers) one1.ManagerID = e2.EmployeeID
.
- Join the table
- Filter the results where the employee’s salary (
e1.Salary
) is greater than their manager’s salary (e2.Salary
). - Select relevant columns like employee ID, employee salary, manager salary, and manager ID.
PySpark Solution
Sample Data:
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("EmployeesSalaryComparison").getOrCreate()
# Sample data
data = [
(1, 80000, 3), # Employee 1 with Manager 3
(2, 60000, 3), # Employee 2 with Manager 3
(3, 100000, None), # Employee 3 (Manager) with no manager
(4, 120000, 3), # Employee 4 with Manager 3
(5, 70000, 4) # Employee 5 with Manager 4
]
columns = ["EmployeeID", "Salary", "ManagerID"]
employees_df = spark.createDataFrame(data, columns)
PySpark Implementation:
# Perform self-join on ManagerID and EmployeeID
joined_df = employees_df.alias("e1").join(
employees_df.alias("e2"),
col("e1.ManagerID") == col("e2.EmployeeID"),
"inner"
).select(
col("e1.EmployeeID").alias("EmployeeID"),
col("e1.Salary").alias("EmployeeSalary"),
col("e2.Salary").alias("ManagerSalary"),
col("e1.ManagerID").alias("ManagerID")
)
# Filter employees whose salary is greater than their manager's salary
result_df = joined_df.filter(col("EmployeeSalary") > col("ManagerSalary"))
# Show results
result_df.show()
Explanation:
- Self-Join:
- Alias the
employees_df
ase1
ande2
. - Join on
e1.ManagerID = e2.EmployeeID
.
- Alias the
- Filter Condition:
- Filter rows where the employee’s salary (
e1.Salary
) is greater than the manager’s salary (e2.Salary
).
- Filter rows where the employee’s salary (
- Select Relevant Columns:
- Include employee ID, employee salary, manager salary, and manager ID for output.
Expected Output
For the sample data:
EmployeeID | EmployeeSalary | ManagerSalary | ManagerID |
---|---|---|---|
4 | 120000 | 100000 | 3 |
- Employee 4 has a salary of 120,000, which is greater than their manager’s salary of 100,000.
If you want to find employees whose salary is greater than their manager’s salary without using a self-join, you can try alternative approaches such as broadcast hash join, window functions, or aggregate filtering. Here’s how:
1. Using Window Functions
Instead of joining the table, you can use a window function to propagate the manager’s salary down to employees, avoiding a self-join.
PySpark Solution:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, max
# Create a window to find the manager's salary for each employee
window_spec = Window.partitionBy("ManagerID")
# Calculate manager's salary using window function
employees_with_manager_salary = employees.withColumn(
"ManagerSalary", max("Salary").over(window_spec)
)
# Filter employees whose salary is greater than their manager's salary
result = employees_with_manager_salary.filter(
(col("ManagerSalary").isNotNull()) & (col("Salary") > col("ManagerSalary"))
).select("EmployeeID", "Salary", "ManagerID", "ManagerSalary")
result.show()
Explanation:
- The
Window.partitionBy("ManagerID")
groups rows byManagerID
. - The
max("Salary").over(window_spec)
extracts the manager’s salary for employees reporting to them. - Filter rows where
EmployeeSalary > ManagerSalary
.
2. Using Aggregation and Lookup
Aggregate the table to find manager salaries, then join it back using a lookup table without a full self-join.
PySpark Solution:
# Aggregate to get manager salaries
manager_salaries = employees.groupBy("EmployeeID").agg(
max("Salary").alias("ManagerSalary")
).withColumnRenamed("EmployeeID", "ManagerID")
# Join employees with aggregated manager salaries
result = employees.join(
manager_salaries,
on="ManagerID",
how="left"
).filter(col("Salary") > col("ManagerSalary"))
result.show()
Explanation:
- Aggregate the
Salary
column to calculate each manager’s salary. - Rename
EmployeeID
toManagerID
for an easy lookup. - Perform a simple lookup join instead of a full self-join.
3. Using Cached Manager Lookup
Create a separate lookup table of managers and cache it for efficient repeated lookups.
PySpark Solution:
# Create a manager lookup DataFrame
managers = employees.select("EmployeeID", "Salary").withColumnRenamed("EmployeeID", "ManagerID").alias("managers")
# Cache the lookup table for better performance
managers.cache()
# Join employees with the manager lookup table
result = employees.join(
managers,
on="ManagerID",
how="inner"
).filter(col("Salary") > col("managers.Salary"))
result.show()
Explanation:
- Select manager data as a separate lookup table.
- Cache the lookup table for performance, reducing shuffles during joins.
4. Using RDD Transformation (for large datasets)
If the data is massive, you can use RDD transformations for better scalability (though less intuitive).
PySpark RDD Example:
# Convert the DataFrame to an RDD
rdd = employees.rdd.map(lambda row: (row.EmployeeID, row.ManagerID, row.Salary))
# Create a dictionary of manager salaries
manager_salaries = rdd.map(lambda x: (x[0], x[2])).collectAsMap()
# Filter employees with salary greater than their manager's salary
result_rdd = rdd.filter(lambda x: x[1] in manager_salaries and x[2] > manager_salaries[x[1]])
# Convert RDD back to DataFrame
result = result_rdd.map(lambda x: (x[0], x[1], x[2], manager_salaries[x[1]])).toDF(["EmployeeID", "ManagerID", "EmployeeSalary", "ManagerSalary"])
result.show()
Explanation:
- Use
collectAsMap()
to create a dictionary of manager salaries. - Filter employees directly using the dictionary lookup.
- Avoid shuffles or joins for faster computation.
Which Method is Better?
Method | Advantages | Use Case |
---|---|---|
Window Functions | Avoids join, leverages Spark optimizations | When datasets are moderately large. |
Aggregation Lookup | Simple logic, fewer shuffles | When manager counts are smaller. |
Cached Lookup Table | Efficient for multiple reuse | When repeated queries are needed. |
RDD Transformations | Avoids DataFrame overheads | For very large datasets. |
Each approach has trade-offs; the window function method is usually the most elegant and avoids costly joins while maintaining good performance.