How to calculate the percentage of successful payments for each driver. A payment is considered successful if its status is ‘Completed’.
Table Name: Rides
Column Names: ride_id (Ride ID), driver_id (Driver ID), fare_amount (Fare Amount), driver_rating (Driver Rating), start_time (Start Time)
Table Name: Payments
Column Names: payment_id (Payment ID), ride_id (Ride ID), payment_status (Payment Status)
Step-by-Step Solution to Calculate the Percentage of Successful Payments
We will:
- Create sample data for the
Rides
andPayments
tables. - Join the tables on
ride_id
to associate rides with payment statuses. - Calculate the percentage of successful payments (
Completed
status) for each driver.
Spark SQL Solution
Sample Data Creation
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Initialize Spark session
spark = SparkSession.builder.appName("PercentageSuccess").getOrCreate()
# Sample data for Rides table
rides_data = [
(1, 'D1', 100, 4.5, '2024-01-01 10:00:00'),
(2, 'D1', 150, 4.7, '2024-01-02 12:00:00'),
(3, 'D2', 200, 4.8, '2024-01-03 14:00:00'),
(4, 'D2', 180, 4.6, '2024-01-04 16:00:00'),
(5, 'D3', 120, 4.3, '2024-01-05 18:00:00'),
]
rides_columns = ["ride_id", "driver_id", "fare_amount", "driver_rating", "start_time"]
rides_df = spark.createDataFrame(rides_data, rides_columns)
rides_df.createOrReplaceTempView("Rides")
# Sample data for Payments table
payments_data = [
(101, 1, 'Completed'),
(102, 2, 'Failed'),
(103, 3, 'Completed'),
(104, 4, 'Completed'),
(105, 5, 'Failed'),
]
payments_columns = ["payment_id", "ride_id", "payment_status"]
payments_df = spark.createDataFrame(payments_data, payments_columns)
payments_df.createOrReplaceTempView("Payments")
Query in Spark SQL
WITH DriverPayments AS (
SELECT
R.driver_id,
COUNT(*) AS total_rides,
SUM(CASE WHEN P.payment_status = 'Completed' THEN 1 ELSE 0 END) AS successful_payments
FROM Rides R
JOIN Payments P ON R.ride_id = P.ride_id
GROUP BY R.driver_id
),
DriverSuccessRate AS (
SELECT
driver_id,
total_rides,
successful_payments,
ROUND((successful_payments * 100.0) / total_rides, 2) AS success_rate
FROM DriverPayments
)
SELECT * FROM DriverSuccessRate;
PySpark Solution
from pyspark.sql.functions import count, sum, when, col, round
# Join the two DataFrames
joined_df = rides_df.join(payments_df, "ride_id")
# Calculate total rides and successful payments
result_df = joined_df.groupBy("driver_id") \
.agg(
count("*").alias("total_rides"),
sum(when(col("payment_status") == "Completed", 1).otherwise(0)).alias("successful_payments")
) \
.withColumn("success_rate", round((col("successful_payments") * 100.0) / col("total_rides"), 2))
result_df.show()
Expected Output
For the given sample data:
driver_id | total_rides | successful_payments | success_rate |
---|---|---|---|
D1 | 2 | 1 | 50.00 |
D2 | 2 | 2 | 100.00 |
D3 | 1 | 0 | 0.00 |
Key Points:
- Spark SQL uses
SUM(CASE WHEN ...)
for conditional aggregation. - PySpark uses
when().otherwise()
to create conditional expressions. - Both methods compute the same result efficiently for large datasets.
How to calculate the percentage of menu items sold for each restaurant.
Table Name: Items
Column Names: item_id (Item ID), rest_id (Restaurant ID)
Table Name: Orders
Column Names: order_id (Order ID), item_id (Item ID), quantity (Quantity), is_offer (Is Offer), client_id (Client ID), Date_Timestamp (Date Timestamp)
Calculate the Percentage of Menu Items Sold for Each Restaurant
We need to:
- Determine the total unique menu items available (
Items
table) for each restaurant. - Find the number of unique menu items sold (
Orders
table) for each restaurant. - Calculate the percentage of menu items sold for each restaurant.
Spark SQL Solution
Sample Data Creation
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("MenuItemsPercentage").getOrCreate()
# Sample data for Items table
items_data = [
(1, 'R1'),
(2, 'R1'),
(3, 'R1'),
(4, 'R2'),
(5, 'R2'),
(6, 'R3'),
]
items_columns = ["item_id", "rest_id"]
items_df = spark.createDataFrame(items_data, items_columns)
items_df.createOrReplaceTempView("Items")
# Sample data for Orders table
orders_data = [
(101, 1, 2, False, 'C1', '2024-01-01 10:00:00'),
(102, 2, 1, True, 'C2', '2024-01-02 12:00:00'),
(103, 3, 3, False, 'C1', '2024-01-03 14:00:00'),
(104, 4, 1, True, 'C3', '2024-01-04 16:00:00'),
(105, 6, 2, False, 'C4', '2024-01-05 18:00:00'),
]
orders_columns = ["order_id", "item_id", "quantity", "is_offer", "client_id", "date_timestamp"]
orders_df = spark.createDataFrame(orders_data, orders_columns)
orders_df.createOrReplaceTempView("Orders")
Query in Spark SQL
WITH TotalMenuItems AS (
SELECT
rest_id,
COUNT(DISTINCT item_id) AS total_menu_items
FROM Items
GROUP BY rest_id
),
SoldMenuItems AS (
SELECT
I.rest_id,
COUNT(DISTINCT O.item_id) AS sold_menu_items
FROM Orders O
JOIN Items I ON O.item_id = I.item_id
GROUP BY I.rest_id
),
MenuPercentage AS (
SELECT
T.rest_id,
T.total_menu_items,
S.sold_menu_items,
ROUND((S.sold_menu_items * 100.0) / T.total_menu_items, 2) AS percentage_sold
FROM TotalMenuItems T
LEFT JOIN SoldMenuItems S ON T.rest_id = S.rest_id
)
SELECT * FROM MenuPercentage;
PySpark Solution
from pyspark.sql.functions import countDistinct, col, round
# Calculate total menu items for each restaurant
total_menu_items_df = items_df.groupBy("rest_id") \
.agg(countDistinct("item_id").alias("total_menu_items"))
# Calculate sold menu items for each restaurant
sold_menu_items_df = orders_df.join(items_df, "item_id") \
.groupBy("rest_id") \
.agg(countDistinct("item_id").alias("sold_menu_items"))
# Join the two DataFrames and calculate the percentage
result_df = total_menu_items_df.join(sold_menu_items_df, "rest_id", "left_outer") \
.withColumn("percentage_sold", round((col("sold_menu_items") * 100.0) / col("total_menu_items"), 2))
result_df.show()
Expected Output
For the given sample data:
rest_id | total_menu_items | sold_menu_items | percentage_sold |
---|---|---|---|
R1 | 3 | 3 | 100.00 |
R2 | 2 | 1 | 50.00 |
R3 | 1 | 1 | 100.00 |
Key Points:
COUNT(DISTINCT column)
is used to count unique items.- The
LEFT JOIN
ensures that restaurants with no sold items are included in the results. - PySpark and Spark SQL yield the same result, suitable for large datasets.
How to compare the time taken for clients who placed their first order with an offer versus those without an offer to make their next order. Table Name: Orders Column Names: order_id (Order ID), user_id (User ID), is_offer (Is Offer), Date_Timestamp (Date Timestamp)
Compare the time taken for clients who placed their first order with an offer versus those without an offer to make their next order. This involves calculating the time difference between the first and second orders for each user, categorized by whether the first order was placed with an offer.
Steps to Solve
- Identify each user’s first order.
- Identify each user’s second order.
- Calculate the time difference between the first and second orders.
- Categorize the users based on whether their first order was placed with an offer.
- Compare the average time taken for the next order between the two groups.
Sample Data Creation
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, min, lead, datediff
from pyspark.sql.window import Window
# Initialize Spark session
spark = SparkSession.builder.appName("FirstOrderAnalysis").getOrCreate()
# Sample data
data = [
(1, 'U1', True, '2024-01-01 10:00:00'),
(2, 'U1', False, '2024-01-05 14:00:00'),
(3, 'U2', True, '2024-01-02 12:00:00'),
(4, 'U2', False, '2024-01-10 16:00:00'),
(5, 'U3', False, '2024-01-03 08:00:00'),
(6, 'U3', True, '2024-01-12 20:00:00'),
]
columns = ["order_id", "user_id", "is_offer", "Date_Timestamp"]
orders_df = spark.createDataFrame(data, columns)
orders_df = orders_df.withColumn("Date_Timestamp", col("Date_Timestamp").cast("timestamp"))
orders_df.show()
PySpark Solution
1. Identify First and Second Orders for Each User
Use a window function to rank orders based on the timestamp for each user.
from pyspark.sql.functions import unix_timestamp
# Define a window partitioned by user_id and ordered by Date_Timestamp
window_spec = Window.partitionBy("user_id").orderBy("Date_Timestamp")
# Add a rank column to identify the first and second orders
ranked_orders = orders_df.withColumn("order_rank", col("order_id").over(window_spec))
# Separate first and second orders
first_orders = ranked_orders.filter(col("order_rank") == 1) \
.select("user_id", "is_offer", "Date_Timestamp")
second_orders = ranked_orders.filter(col("order_rank") == 2) \
.select("user_id", col("Date_Timestamp").alias("second_order_timestamp"))
2. Calculate the Time Difference Between First and Second Orders
Join the first and second orders and calculate the time difference.
# Join first and second orders
joined_orders = first_orders.join(second_orders, "user_id", "inner")
# Calculate time difference in days
result_df = joined_orders.withColumn(
"days_to_next_order",
(unix_timestamp("second_order_timestamp") - unix_timestamp("Date_Timestamp")) / 86400
)
result_df.show()
3. Group by is_offer
and Calculate Average Time
Group the results by whether the first order used an offer and calculate the average time difference.
from pyspark.sql.functions import avg
# Group by is_offer and calculate average time
average_time_df = result_df.groupBy("is_offer") \
.agg(avg("days_to_next_order").alias("avg_days_to_next_order"))
average_time_df.show()
Expected Output
For the given sample data:
Final Result
is_offer | avg_days_to_next_order |
---|---|
True | 6.5 |
False | 9.0 |
Key Concepts Used
- Window Functions:
partitionBy
andorderBy
to rank the orders for each user. - Date Calculations: Using
unix_timestamp
to calculate the difference between timestamps. - Grouping and Aggregation:
groupBy
to calculate the average days to the next order based on theis_offer
column.
Spark SQL Solution
Query
WITH RankedOrders AS (
SELECT
user_id,
is_offer,
Date_Timestamp,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY Date_Timestamp) AS order_rank
FROM Orders
),
FirstOrders AS (
SELECT
user_id,
is_offer,
Date_Timestamp AS first_order_timestamp
FROM RankedOrders
WHERE order_rank = 1
),
SecondOrders AS (
SELECT
user_id,
Date_Timestamp AS second_order_timestamp
FROM RankedOrders
WHERE order_rank = 2
),
JoinedOrders AS (
SELECT
F.user_id,
F.is_offer,
F.first_order_timestamp,
S.second_order_timestamp,
DATEDIFF(S.second_order_timestamp, F.first_order_timestamp) AS days_to_next_order
FROM FirstOrders F
JOIN SecondOrders S ON F.user_id = S.user_id
)
SELECT
is_offer,
AVG(days_to_next_order) AS avg_days_to_next_order
FROM JoinedOrders
GROUP BY is_offer;
This query achieves the same result as the PySpark solution but directly in SQL.
How to find the length of the longest sequence of consecutive numbers in the table.
Table Name: Consecutive
Column Names: number (Number)
Sample Table –
Number
1
2
3
4
10
11
20
21
22
23
24
30
To find the length of the longest sequence of consecutive numbers in the table Consecutive
, we can use a difference-based grouping strategy in both Spark SQL and PySpark. The idea is to assign a group identifier to each sequence of consecutive numbers by leveraging the difference between the number and its row index.
Spark SQL Solution
Query:
WITH RankedNumbers AS (
SELECT
number,
ROW_NUMBER() OVER (ORDER BY number) AS row_index
FROM Consecutive
),
GroupedNumbers AS (
SELECT
number,
row_index,
number - row_index AS group_id
FROM RankedNumbers
),
GroupLengths AS (
SELECT
group_id,
COUNT(*) AS sequence_length
FROM GroupedNumbers
GROUP BY group_id
)
SELECT
MAX(sequence_length) AS longest_consecutive_sequence
FROM GroupLengths;
Explanation:
- RankedNumbers: Assign a
ROW_NUMBER
to each number ordered by its value. - GroupedNumbers: Calculate
group_id
asnumber - row_index
. This creates a unique identifier for each sequence of consecutive numbers. - GroupLengths: Count the number of elements in each group.
- Final Query: Find the maximum sequence length.
PySpark Solution
Sample Data:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, expr
from pyspark.sql.window import Window
# Initialize Spark session
spark = SparkSession.builder.appName("LongestConsecutiveSequence").getOrCreate()
# Sample data
data = [(1,), (2,), (3,), (4,), (10,), (11,), (20,), (21,), (22,), (23,), (24,), (30,)]
columns = ["number"]
consecutive_df = spark.createDataFrame(data, columns)
PySpark Implementation:
# Assign row numbers based on the order of numbers
window_spec = Window.orderBy("number")
ranked_df = consecutive_df.withColumn("row_index", row_number().over(window_spec))
# Create group_id based on the difference between number and row_index
grouped_df = ranked_df.withColumn("group_id", col("number") - col("row_index"))
# Count the size of each group
group_lengths = grouped_df.groupBy("group_id").count()
# Find the maximum sequence length
longest_sequence = group_lengths.agg({"count": "max"}).withColumnRenamed("max(count)", "longest_sequence")
longest_sequence.show()
Explanation:
- Ranked DataFrame: Compute a
row_index
for each number usingrow_number
ordered bynumber
. - Group Identification: Calculate
group_id
asnumber - row_index
. - Group Lengths: Group by
group_id
and count the number of rows in each group. - Find Maximum: Aggregate to find the maximum count.
Expected Output
For the sample data:
number | longest_sequence |
---|---|
5 |
- Sequences:
[1, 2, 3, 4]
,[10, 11]
,[20, 21, 22, 23, 24]
,[30]
- Longest Sequence:
5
(20, 21, 22, 23, 24
)
How can you find employees whose salary is greater than their manager’s salary?
– Table name: Employees
– Columns: EmployeeID, Salary, ManagerID
To find employees whose salary is greater than their manager’s salary, you can use a self-join in Spark SQL or PySpark. Here’s how:
Spark SQL Solution
Query:
SELECT
e1.EmployeeID AS EmployeeID,
e1.Salary AS EmployeeSalary,
e2.Salary AS ManagerSalary,
e1.ManagerID
FROM Employees e1
INNER JOIN Employees e2
ON e1.ManagerID = e2.EmployeeID
WHERE e1.Salary > e2.Salary;
Explanation:
- Perform a self-join on the
Employees
table:- Join the table
e1
(employees) withe2
(managers) one1.ManagerID = e2.EmployeeID
.
- Join the table
- Filter the results where the employee’s salary (
e1.Salary
) is greater than their manager’s salary (e2.Salary
). - Select relevant columns like employee ID, employee salary, manager salary, and manager ID.
PySpark Solution
Sample Data:
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("EmployeesSalaryComparison").getOrCreate()
# Sample data
data = [
(1, 80000, 3), # Employee 1 with Manager 3
(2, 60000, 3), # Employee 2 with Manager 3
(3, 100000, None), # Employee 3 (Manager) with no manager
(4, 120000, 3), # Employee 4 with Manager 3
(5, 70000, 4) # Employee 5 with Manager 4
]
columns = ["EmployeeID", "Salary", "ManagerID"]
employees_df = spark.createDataFrame(data, columns)
PySpark Implementation:
# Perform self-join on ManagerID and EmployeeID
joined_df = employees_df.alias("e1").join(
employees_df.alias("e2"),
col("e1.ManagerID") == col("e2.EmployeeID"),
"inner"
).select(
col("e1.EmployeeID").alias("EmployeeID"),
col("e1.Salary").alias("EmployeeSalary"),
col("e2.Salary").alias("ManagerSalary"),
col("e1.ManagerID").alias("ManagerID")
)
# Filter employees whose salary is greater than their manager's salary
result_df = joined_df.filter(col("EmployeeSalary") > col("ManagerSalary"))
# Show results
result_df.show()
Explanation:
- Self-Join:
- Alias the
employees_df
ase1
ande2
. - Join on
e1.ManagerID = e2.EmployeeID
.
- Alias the
- Filter Condition:
- Filter rows where the employee’s salary (
e1.Salary
) is greater than the manager’s salary (e2.Salary
).
- Filter rows where the employee’s salary (
- Select Relevant Columns:
- Include employee ID, employee salary, manager salary, and manager ID for output.
Expected Output
For the sample data:
EmployeeID | EmployeeSalary | ManagerSalary | ManagerID |
---|---|---|---|
4 | 120000 | 100000 | 3 |
- Employee 4 has a salary of 120,000, which is greater than their manager’s salary of 100,000.
If you want to find employees whose salary is greater than their manager’s salary without using a self-join, you can try alternative approaches such as broadcast hash join, window functions, or aggregate filtering. Here’s how:
1. Using Window Functions
Instead of joining the table, you can use a window function to propagate the manager’s salary down to employees, avoiding a self-join.
PySpark Solution:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, max
# Create a window to find the manager's salary for each employee
window_spec = Window.partitionBy("ManagerID")
# Calculate manager's salary using window function
employees_with_manager_salary = employees.withColumn(
"ManagerSalary", max("Salary").over(window_spec)
)
# Filter employees whose salary is greater than their manager's salary
result = employees_with_manager_salary.filter(
(col("ManagerSalary").isNotNull()) & (col("Salary") > col("ManagerSalary"))
).select("EmployeeID", "Salary", "ManagerID", "ManagerSalary")
result.show()
Explanation:
- The
Window.partitionBy("ManagerID")
groups rows byManagerID
. - The
max("Salary").over(window_spec)
extracts the manager’s salary for employees reporting to them. - Filter rows where
EmployeeSalary > ManagerSalary
.
2. Using Aggregation and Lookup
Aggregate the table to find manager salaries, then join it back using a lookup table without a full self-join.
PySpark Solution:
# Aggregate to get manager salaries
manager_salaries = employees.groupBy("EmployeeID").agg(
max("Salary").alias("ManagerSalary")
).withColumnRenamed("EmployeeID", "ManagerID")
# Join employees with aggregated manager salaries
result = employees.join(
manager_salaries,
on="ManagerID",
how="left"
).filter(col("Salary") > col("ManagerSalary"))
result.show()
Explanation:
- Aggregate the
Salary
column to calculate each manager’s salary. - Rename
EmployeeID
toManagerID
for an easy lookup. - Perform a simple lookup join instead of a full self-join.
3. Using Cached Manager Lookup
Create a separate lookup table of managers and cache it for efficient repeated lookups.
PySpark Solution:
# Create a manager lookup DataFrame
managers = employees.select("EmployeeID", "Salary").withColumnRenamed("EmployeeID", "ManagerID").alias("managers")
# Cache the lookup table for better performance
managers.cache()
# Join employees with the manager lookup table
result = employees.join(
managers,
on="ManagerID",
how="inner"
).filter(col("Salary") > col("managers.Salary"))
result.show()
Explanation:
- Select manager data as a separate lookup table.
- Cache the lookup table for performance, reducing shuffles during joins.
4. Using RDD Transformation (for large datasets)
If the data is massive, you can use RDD transformations for better scalability (though less intuitive).
PySpark RDD Example:
# Convert the DataFrame to an RDD
rdd = employees.rdd.map(lambda row: (row.EmployeeID, row.ManagerID, row.Salary))
# Create a dictionary of manager salaries
manager_salaries = rdd.map(lambda x: (x[0], x[2])).collectAsMap()
# Filter employees with salary greater than their manager's salary
result_rdd = rdd.filter(lambda x: x[1] in manager_salaries and x[2] > manager_salaries[x[1]])
# Convert RDD back to DataFrame
result = result_rdd.map(lambda x: (x[0], x[1], x[2], manager_salaries[x[1]])).toDF(["EmployeeID", "ManagerID", "EmployeeSalary", "ManagerSalary"])
result.show()
Explanation:
- Use
collectAsMap()
to create a dictionary of manager salaries. - Filter employees directly using the dictionary lookup.
- Avoid shuffles or joins for faster computation.
Which Method is Better?
Method | Advantages | Use Case |
---|---|---|
Window Functions | Avoids join, leverages Spark optimizations | When datasets are moderately large. |
Aggregation Lookup | Simple logic, fewer shuffles | When manager counts are smaller. |
Cached Lookup Table | Efficient for multiple reuse | When repeated queries are needed. |
RDD Transformations | Avoids DataFrame overheads | For very large datasets. |
Each approach has trade-offs; the window function method is usually the most elegant and avoids costly joins while maintaining good performance.