Coding Questions in Spark SQL, Pyspark, and Python

How to calculate the percentage of successful payments for each driver. A payment is considered successful if its status is ‘Completed’.

Table Name: Rides
Column Names: ride_id (Ride ID), driver_id (Driver ID), fare_amount (Fare Amount), driver_rating (Driver Rating), start_time (Start Time)

Table Name: Payments
Column Names: payment_id (Payment ID), ride_id (Ride ID), payment_status (Payment Status)

Step-by-Step Solution to Calculate the Percentage of Successful Payments

We will:

  1. Create sample data for the Rides and Payments tables.
  2. Join the tables on ride_id to associate rides with payment statuses.
  3. Calculate the percentage of successful payments (Completed status) for each driver.

Spark SQL Solution

Sample Data Creation

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("PercentageSuccess").getOrCreate()

# Sample data for Rides table
rides_data = [
    (1, 'D1', 100, 4.5, '2024-01-01 10:00:00'),
    (2, 'D1', 150, 4.7, '2024-01-02 12:00:00'),
    (3, 'D2', 200, 4.8, '2024-01-03 14:00:00'),
    (4, 'D2', 180, 4.6, '2024-01-04 16:00:00'),
    (5, 'D3', 120, 4.3, '2024-01-05 18:00:00'),
]
rides_columns = ["ride_id", "driver_id", "fare_amount", "driver_rating", "start_time"]

rides_df = spark.createDataFrame(rides_data, rides_columns)
rides_df.createOrReplaceTempView("Rides")

# Sample data for Payments table
payments_data = [
    (101, 1, 'Completed'),
    (102, 2, 'Failed'),
    (103, 3, 'Completed'),
    (104, 4, 'Completed'),
    (105, 5, 'Failed'),
]
payments_columns = ["payment_id", "ride_id", "payment_status"]

payments_df = spark.createDataFrame(payments_data, payments_columns)
payments_df.createOrReplaceTempView("Payments")

Query in Spark SQL

WITH DriverPayments AS (
    SELECT 
        R.driver_id,
        COUNT(*) AS total_rides,
        SUM(CASE WHEN P.payment_status = 'Completed' THEN 1 ELSE 0 END) AS successful_payments
    FROM Rides R
    JOIN Payments P ON R.ride_id = P.ride_id
    GROUP BY R.driver_id
),
DriverSuccessRate AS (
    SELECT 
        driver_id,
        total_rides,
        successful_payments,
        ROUND((successful_payments * 100.0) / total_rides, 2) AS success_rate
    FROM DriverPayments
)
SELECT * FROM DriverSuccessRate;

PySpark Solution

from pyspark.sql.functions import count, sum, when, col, round

# Join the two DataFrames
joined_df = rides_df.join(payments_df, "ride_id")

# Calculate total rides and successful payments
result_df = joined_df.groupBy("driver_id") \
    .agg(
        count("*").alias("total_rides"),
        sum(when(col("payment_status") == "Completed", 1).otherwise(0)).alias("successful_payments")
    ) \
    .withColumn("success_rate", round((col("successful_payments") * 100.0) / col("total_rides"), 2))

result_df.show()

Expected Output

For the given sample data:

driver_idtotal_ridessuccessful_paymentssuccess_rate
D12150.00
D222100.00
D3100.00

Key Points:

  • Spark SQL uses SUM(CASE WHEN ...) for conditional aggregation.
  • PySpark uses when().otherwise() to create conditional expressions.
  • Both methods compute the same result efficiently for large datasets.

How to calculate the percentage of menu items sold for each restaurant.

Table Name: Items
Column Names: item_id (Item ID), rest_id (Restaurant ID)

Table Name: Orders
Column Names: order_id (Order ID), item_id (Item ID), quantity (Quantity), is_offer (Is Offer), client_id (Client ID), Date_Timestamp (Date Timestamp)

Calculate the Percentage of Menu Items Sold for Each Restaurant

We need to:

  1. Determine the total unique menu items available (Items table) for each restaurant.
  2. Find the number of unique menu items sold (Orders table) for each restaurant.
  3. Calculate the percentage of menu items sold for each restaurant.

Spark SQL Solution

Sample Data Creation

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("MenuItemsPercentage").getOrCreate()

# Sample data for Items table
items_data = [
    (1, 'R1'),
    (2, 'R1'),
    (3, 'R1'),
    (4, 'R2'),
    (5, 'R2'),
    (6, 'R3'),
]
items_columns = ["item_id", "rest_id"]

items_df = spark.createDataFrame(items_data, items_columns)
items_df.createOrReplaceTempView("Items")

# Sample data for Orders table
orders_data = [
    (101, 1, 2, False, 'C1', '2024-01-01 10:00:00'),
    (102, 2, 1, True, 'C2', '2024-01-02 12:00:00'),
    (103, 3, 3, False, 'C1', '2024-01-03 14:00:00'),
    (104, 4, 1, True, 'C3', '2024-01-04 16:00:00'),
    (105, 6, 2, False, 'C4', '2024-01-05 18:00:00'),
]
orders_columns = ["order_id", "item_id", "quantity", "is_offer", "client_id", "date_timestamp"]

orders_df = spark.createDataFrame(orders_data, orders_columns)
orders_df.createOrReplaceTempView("Orders")

Query in Spark SQL

WITH TotalMenuItems AS (
    SELECT
        rest_id,
        COUNT(DISTINCT item_id) AS total_menu_items
    FROM Items
    GROUP BY rest_id
),
SoldMenuItems AS (
    SELECT
        I.rest_id,
        COUNT(DISTINCT O.item_id) AS sold_menu_items
    FROM Orders O
    JOIN Items I ON O.item_id = I.item_id
    GROUP BY I.rest_id
),
MenuPercentage AS (
    SELECT
        T.rest_id,
        T.total_menu_items,
        S.sold_menu_items,
        ROUND((S.sold_menu_items * 100.0) / T.total_menu_items, 2) AS percentage_sold
    FROM TotalMenuItems T
    LEFT JOIN SoldMenuItems S ON T.rest_id = S.rest_id
)
SELECT * FROM MenuPercentage;

PySpark Solution

from pyspark.sql.functions import countDistinct, col, round

# Calculate total menu items for each restaurant
total_menu_items_df = items_df.groupBy("rest_id") \
    .agg(countDistinct("item_id").alias("total_menu_items"))

# Calculate sold menu items for each restaurant
sold_menu_items_df = orders_df.join(items_df, "item_id") \
    .groupBy("rest_id") \
    .agg(countDistinct("item_id").alias("sold_menu_items"))

# Join the two DataFrames and calculate the percentage
result_df = total_menu_items_df.join(sold_menu_items_df, "rest_id", "left_outer") \
    .withColumn("percentage_sold", round((col("sold_menu_items") * 100.0) / col("total_menu_items"), 2))

result_df.show()

Expected Output

For the given sample data:

rest_idtotal_menu_itemssold_menu_itemspercentage_sold
R133100.00
R22150.00
R311100.00

Key Points:

  • COUNT(DISTINCT column) is used to count unique items.
  • The LEFT JOIN ensures that restaurants with no sold items are included in the results.
  • PySpark and Spark SQL yield the same result, suitable for large datasets.

How to compare the time taken for clients who placed their first order with an offer versus those without an offer to make their next order. Table Name: Orders Column Names: order_id (Order ID), user_id (User ID), is_offer (Is Offer), Date_Timestamp (Date Timestamp)

Compare the time taken for clients who placed their first order with an offer versus those without an offer to make their next order. This involves calculating the time difference between the first and second orders for each user, categorized by whether the first order was placed with an offer.


Steps to Solve

  1. Identify each user’s first order.
  2. Identify each user’s second order.
  3. Calculate the time difference between the first and second orders.
  4. Categorize the users based on whether their first order was placed with an offer.
  5. Compare the average time taken for the next order between the two groups.

Sample Data Creation

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, min, lead, datediff
from pyspark.sql.window import Window

# Initialize Spark session
spark = SparkSession.builder.appName("FirstOrderAnalysis").getOrCreate()

# Sample data
data = [
    (1, 'U1', True, '2024-01-01 10:00:00'),
    (2, 'U1', False, '2024-01-05 14:00:00'),
    (3, 'U2', True, '2024-01-02 12:00:00'),
    (4, 'U2', False, '2024-01-10 16:00:00'),
    (5, 'U3', False, '2024-01-03 08:00:00'),
    (6, 'U3', True, '2024-01-12 20:00:00'),
]
columns = ["order_id", "user_id", "is_offer", "Date_Timestamp"]

orders_df = spark.createDataFrame(data, columns)
orders_df = orders_df.withColumn("Date_Timestamp", col("Date_Timestamp").cast("timestamp"))
orders_df.show()

PySpark Solution

1. Identify First and Second Orders for Each User

Use a window function to rank orders based on the timestamp for each user.

from pyspark.sql.functions import unix_timestamp

# Define a window partitioned by user_id and ordered by Date_Timestamp
window_spec = Window.partitionBy("user_id").orderBy("Date_Timestamp")

# Add a rank column to identify the first and second orders
ranked_orders = orders_df.withColumn("order_rank", col("order_id").over(window_spec))

# Separate first and second orders
first_orders = ranked_orders.filter(col("order_rank") == 1) \
    .select("user_id", "is_offer", "Date_Timestamp")

second_orders = ranked_orders.filter(col("order_rank") == 2) \
    .select("user_id", col("Date_Timestamp").alias("second_order_timestamp"))

2. Calculate the Time Difference Between First and Second Orders

Join the first and second orders and calculate the time difference.

# Join first and second orders
joined_orders = first_orders.join(second_orders, "user_id", "inner")

# Calculate time difference in days
result_df = joined_orders.withColumn(
    "days_to_next_order",
    (unix_timestamp("second_order_timestamp") - unix_timestamp("Date_Timestamp")) / 86400
)

result_df.show()

3. Group by is_offer and Calculate Average Time

Group the results by whether the first order used an offer and calculate the average time difference.

from pyspark.sql.functions import avg

# Group by is_offer and calculate average time
average_time_df = result_df.groupBy("is_offer") \
    .agg(avg("days_to_next_order").alias("avg_days_to_next_order"))

average_time_df.show()

Expected Output

For the given sample data:

Final Result

is_offeravg_days_to_next_order
True6.5
False9.0

Key Concepts Used

  1. Window Functions: partitionBy and orderBy to rank the orders for each user.
  2. Date Calculations: Using unix_timestamp to calculate the difference between timestamps.
  3. Grouping and Aggregation: groupBy to calculate the average days to the next order based on the is_offer column.

Spark SQL Solution

Query

WITH RankedOrders AS (
    SELECT 
        user_id,
        is_offer,
        Date_Timestamp,
        ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY Date_Timestamp) AS order_rank
    FROM Orders
),
FirstOrders AS (
    SELECT 
        user_id, 
        is_offer, 
        Date_Timestamp AS first_order_timestamp
    FROM RankedOrders
    WHERE order_rank = 1
),
SecondOrders AS (
    SELECT 
        user_id, 
        Date_Timestamp AS second_order_timestamp
    FROM RankedOrders
    WHERE order_rank = 2
),
JoinedOrders AS (
    SELECT 
        F.user_id, 
        F.is_offer, 
        F.first_order_timestamp, 
        S.second_order_timestamp,
        DATEDIFF(S.second_order_timestamp, F.first_order_timestamp) AS days_to_next_order
    FROM FirstOrders F
    JOIN SecondOrders S ON F.user_id = S.user_id
)
SELECT 
    is_offer,
    AVG(days_to_next_order) AS avg_days_to_next_order
FROM JoinedOrders
GROUP BY is_offer;

This query achieves the same result as the PySpark solution but directly in SQL.

How to find the length of the longest sequence of consecutive numbers in the table.

Table Name: Consecutive
Column Names: number (Number)
Sample Table –
Number
1
2
3
4
10
11
20
21
22
23
24
30

To find the length of the longest sequence of consecutive numbers in the table Consecutive, we can use a difference-based grouping strategy in both Spark SQL and PySpark. The idea is to assign a group identifier to each sequence of consecutive numbers by leveraging the difference between the number and its row index.


Spark SQL Solution

Query:

WITH RankedNumbers AS (
    SELECT
        number,
        ROW_NUMBER() OVER (ORDER BY number) AS row_index
    FROM Consecutive
),
GroupedNumbers AS (
    SELECT
        number,
        row_index,
        number - row_index AS group_id
    FROM RankedNumbers
),
GroupLengths AS (
    SELECT
        group_id,
        COUNT(*) AS sequence_length
    FROM GroupedNumbers
    GROUP BY group_id
)
SELECT
    MAX(sequence_length) AS longest_consecutive_sequence
FROM GroupLengths;

Explanation:

  1. RankedNumbers: Assign a ROW_NUMBER to each number ordered by its value.
  2. GroupedNumbers: Calculate group_id as number - row_index. This creates a unique identifier for each sequence of consecutive numbers.
  3. GroupLengths: Count the number of elements in each group.
  4. Final Query: Find the maximum sequence length.

PySpark Solution

Sample Data:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, expr
from pyspark.sql.window import Window

# Initialize Spark session
spark = SparkSession.builder.appName("LongestConsecutiveSequence").getOrCreate()

# Sample data
data = [(1,), (2,), (3,), (4,), (10,), (11,), (20,), (21,), (22,), (23,), (24,), (30,)]
columns = ["number"]

consecutive_df = spark.createDataFrame(data, columns)

PySpark Implementation:

# Assign row numbers based on the order of numbers
window_spec = Window.orderBy("number")
ranked_df = consecutive_df.withColumn("row_index", row_number().over(window_spec))

# Create group_id based on the difference between number and row_index
grouped_df = ranked_df.withColumn("group_id", col("number") - col("row_index"))

# Count the size of each group
group_lengths = grouped_df.groupBy("group_id").count()

# Find the maximum sequence length
longest_sequence = group_lengths.agg({"count": "max"}).withColumnRenamed("max(count)", "longest_sequence")

longest_sequence.show()

Explanation:

  1. Ranked DataFrame: Compute a row_index for each number using row_number ordered by number.
  2. Group Identification: Calculate group_id as number - row_index.
  3. Group Lengths: Group by group_id and count the number of rows in each group.
  4. Find Maximum: Aggregate to find the maximum count.

Expected Output

For the sample data:

numberlongest_sequence
5
  • Sequences: [1, 2, 3, 4][10, 11][20, 21, 22, 23, 24][30]
  • Longest Sequence: 5 (20, 21, 22, 23, 24)

How can you find employees whose salary is greater than their manager’s salary?
– Table name: Employees
– Columns: EmployeeID, Salary, ManagerID

To find employees whose salary is greater than their manager’s salary, you can use a self-join in Spark SQL or PySpark. Here’s how:


Spark SQL Solution

Query:

SELECT 
    e1.EmployeeID AS EmployeeID,
    e1.Salary AS EmployeeSalary,
    e2.Salary AS ManagerSalary,
    e1.ManagerID
FROM Employees e1
INNER JOIN Employees e2
    ON e1.ManagerID = e2.EmployeeID
WHERE e1.Salary > e2.Salary;

Explanation:

  1. Perform a self-join on the Employees table:
    • Join the table e1 (employees) with e2 (managers) on e1.ManagerID = e2.EmployeeID.
  2. Filter the results where the employee’s salary (e1.Salary) is greater than their manager’s salary (e2.Salary).
  3. Select relevant columns like employee ID, employee salary, manager salary, and manager ID.

PySpark Solution

Sample Data:

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("EmployeesSalaryComparison").getOrCreate()

# Sample data
data = [
    (1, 80000, 3),  # Employee 1 with Manager 3
    (2, 60000, 3),  # Employee 2 with Manager 3
    (3, 100000, None),  # Employee 3 (Manager) with no manager
    (4, 120000, 3),  # Employee 4 with Manager 3
    (5, 70000, 4)   # Employee 5 with Manager 4
]
columns = ["EmployeeID", "Salary", "ManagerID"]

employees_df = spark.createDataFrame(data, columns)

PySpark Implementation:

# Perform self-join on ManagerID and EmployeeID
joined_df = employees_df.alias("e1").join(
    employees_df.alias("e2"),
    col("e1.ManagerID") == col("e2.EmployeeID"),
    "inner"
).select(
    col("e1.EmployeeID").alias("EmployeeID"),
    col("e1.Salary").alias("EmployeeSalary"),
    col("e2.Salary").alias("ManagerSalary"),
    col("e1.ManagerID").alias("ManagerID")
)

# Filter employees whose salary is greater than their manager's salary
result_df = joined_df.filter(col("EmployeeSalary") > col("ManagerSalary"))

# Show results
result_df.show()

Explanation:

  1. Self-Join:
    • Alias the employees_df as e1 and e2.
    • Join on e1.ManagerID = e2.EmployeeID.
  2. Filter Condition:
    • Filter rows where the employee’s salary (e1.Salary) is greater than the manager’s salary (e2.Salary).
  3. Select Relevant Columns:
    • Include employee ID, employee salary, manager salary, and manager ID for output.

Expected Output

For the sample data:

EmployeeIDEmployeeSalaryManagerSalaryManagerID
41200001000003
  • Employee 4 has a salary of 120,000, which is greater than their manager’s salary of 100,000.

If you want to find employees whose salary is greater than their manager’s salary without using a self-join, you can try alternative approaches such as broadcast hash joinwindow functions, or aggregate filtering. Here’s how:


1. Using Window Functions

Instead of joining the table, you can use a window function to propagate the manager’s salary down to employees, avoiding a self-join.

PySpark Solution:

from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, max

# Create a window to find the manager's salary for each employee
window_spec = Window.partitionBy("ManagerID")

# Calculate manager's salary using window function
employees_with_manager_salary = employees.withColumn(
    "ManagerSalary", max("Salary").over(window_spec)
)

# Filter employees whose salary is greater than their manager's salary
result = employees_with_manager_salary.filter(
    (col("ManagerSalary").isNotNull()) & (col("Salary") > col("ManagerSalary"))
).select("EmployeeID", "Salary", "ManagerID", "ManagerSalary")

result.show()

Explanation:

  1. The Window.partitionBy("ManagerID") groups rows by ManagerID.
  2. The max("Salary").over(window_spec) extracts the manager’s salary for employees reporting to them.
  3. Filter rows where EmployeeSalary > ManagerSalary.

2. Using Aggregation and Lookup

Aggregate the table to find manager salaries, then join it back using a lookup table without a full self-join.

PySpark Solution:

# Aggregate to get manager salaries
manager_salaries = employees.groupBy("EmployeeID").agg(
    max("Salary").alias("ManagerSalary")
).withColumnRenamed("EmployeeID", "ManagerID")

# Join employees with aggregated manager salaries
result = employees.join(
    manager_salaries,
    on="ManagerID",
    how="left"
).filter(col("Salary") > col("ManagerSalary"))

result.show()

Explanation:

  1. Aggregate the Salary column to calculate each manager’s salary.
  2. Rename EmployeeID to ManagerID for an easy lookup.
  3. Perform a simple lookup join instead of a full self-join.

3. Using Cached Manager Lookup

Create a separate lookup table of managers and cache it for efficient repeated lookups.

PySpark Solution:

# Create a manager lookup DataFrame
managers = employees.select("EmployeeID", "Salary").withColumnRenamed("EmployeeID", "ManagerID").alias("managers")

# Cache the lookup table for better performance
managers.cache()

# Join employees with the manager lookup table
result = employees.join(
    managers,
    on="ManagerID",
    how="inner"
).filter(col("Salary") > col("managers.Salary"))

result.show()

Explanation:

  1. Select manager data as a separate lookup table.
  2. Cache the lookup table for performance, reducing shuffles during joins.

4. Using RDD Transformation (for large datasets)

If the data is massive, you can use RDD transformations for better scalability (though less intuitive).

PySpark RDD Example:

# Convert the DataFrame to an RDD
rdd = employees.rdd.map(lambda row: (row.EmployeeID, row.ManagerID, row.Salary))

# Create a dictionary of manager salaries
manager_salaries = rdd.map(lambda x: (x[0], x[2])).collectAsMap()

# Filter employees with salary greater than their manager's salary
result_rdd = rdd.filter(lambda x: x[1] in manager_salaries and x[2] > manager_salaries[x[1]])

# Convert RDD back to DataFrame
result = result_rdd.map(lambda x: (x[0], x[1], x[2], manager_salaries[x[1]])).toDF(["EmployeeID", "ManagerID", "EmployeeSalary", "ManagerSalary"])
result.show()

Explanation:

  1. Use collectAsMap() to create a dictionary of manager salaries.
  2. Filter employees directly using the dictionary lookup.
  3. Avoid shuffles or joins for faster computation.

Which Method is Better?

MethodAdvantagesUse Case
Window FunctionsAvoids join, leverages Spark optimizationsWhen datasets are moderately large.
Aggregation LookupSimple logic, fewer shufflesWhen manager counts are smaller.
Cached Lookup TableEfficient for multiple reuseWhen repeated queries are needed.
RDD TransformationsAvoids DataFrame overheadsFor very large datasets.

Each approach has trade-offs; the window function method is usually the most elegant and avoids costly joins while maintaining good performance.

Pages ( 6 of 11 ): « Previous1 ... 45 6 78 ... 11Next »