Learn Pyspark Dataframe coding and Spark SQL with Examples

Table of Contents

What is the difference between row_number(), rank(), and dense_rank() in SQL?

The functions row_number(), rank(), and dense_rank() are all window functions in SQL that assign unique numbers to rows within a window (or partition). However, they differ in how they handle ties (rows with identical ordering values).


1. row_number()

  • Assigns a unique sequential number to each row within a window or partition.
  • Does not handle ties โ€” each row gets a unique number regardless of duplicate values in the ordering column.

Example:

EmployeeIDSalaryrow_number() (ORDER BY Salary DESC)
190001
290002
380003
470004
  • Tied rows (same Salary) are given unique ranks in the order they appear.

2. rank()

  • Assigns a rank to each row based on the specified order.
  • Rows with the same ordering value (ties) get the same rank, and the next rank skips by the number of tied rows.

Example:

EmployeeIDSalaryrank() (ORDER BY Salary DESC)
190001
290001
380003
470004
  • Rows 1 and 2 tie for first place, so they both get rank 1, but the next rank starts at 3.

3. dense_rank()

  • Similar to rank(), but does not skip ranks after ties. Tied rows get the same rank, and the next rank is incremented by 1.

Example:

EmployeeIDSalarydense_rank() (ORDER BY Salary DESC)
190001
290001
380002
470003
  • Rows 1 and 2 tie for first place, so they both get rank 1, but the next rank starts at 2, with no gaps.

Key Differences:

Featurerow_number()rank()dense_rank()
Handling TiesDoes not handle tiesSame rank for ties, skips ranksSame rank for ties, no skipped ranks
UniquenessAlways uniqueMay repeat for tiesMay repeat for ties
Rank GapsNo gaps in numberingGaps after tiesNo gaps

Use Cases:

  • row_number():
    • When you need a unique identifier for each row in a partition.
    • Commonly used to find the first N rows in each partition.
  • rank():
    • When ranking with acknowledgment of ties is important, but gaps are acceptable.
  • dense_rank():
    • When ranking with acknowledgment of ties is important, but gaps are not acceptable.

PySpark Example:

Sample Data:

data = [(1, 9000), (2, 9000), (3, 8000), (4, 7000)]
columns = ["EmployeeID", "Salary"]
df = spark.createDataFrame(data, columns)

Code:

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank

window_spec = Window.orderBy(col("Salary").desc())

df.select(
    col("EmployeeID"),
    col("Salary"),
    row_number().over(window_spec).alias("row_number"),
    rank().over(window_spec).alias("rank"),
    dense_rank().over(window_spec).alias("dense_rank")
).show()

Output:

EmployeeIDSalaryrow_numberrankdense_rank
19000111
29000211
38000332
47000443

Sample Data

Python

Spark SQL Solution

PySpark Solution

In both solutions, we use a window function to rank the orders for each customer and product. We then filter the results to find customers who have placed at least 3 consecutive orders.

Note that the Spark SQL solution uses the DENSE_RANK function, while the PySpark solution uses the dense_rank function from the pyspark.sql.functions module.

Also, in the PySpark solution, we use the withColumn method to add the OrderRank column to the DataFrame.

Output

The output of both solutions will be:

But there can be a very interesting version of Code, if we have to consider consecutive days Orders.

L

CREATE TABLE Orders (
  CustomerID INT,
  ProductID INT,
  OrderDate DATE
);

INSERT INTO Orders (CustomerID, ProductID, OrderDate)
VALUES
  (1, 101, '2022-01-01'),
  (1, 101, '2022-01-02'),
  (1, 101, '2022-01-03'),
  (1, 102, '2022-01-10'),
  (2, 101, '2022-01-01'),
  (2, 101, '2022-01-05'),
  (3, 101, '2022-01-01'),
  (3, 101, '2022-01-02'),
  (3, 101, '2022-01-03'),
  (3, 101, '2022-01-04');

Query

SQL

WITH OrderedOrders AS (
  SELECT CustomerID, ProductID, OrderDate,
         DENSE_RANK() OVER (PARTITION BY CustomerID, ProductID ORDER BY OrderDate) AS DenseRank,
         LAG(OrderDate, 1) OVER (PARTITION BY CustomerID, ProductID ORDER BY OrderDate) AS PrevOrderDate,
         LAG(OrderDate, 2) OVER (PARTITION BY CustomerID, ProductID ORDER BY OrderDate) AS PrevPrevOrderDate
  FROM Orders
)
SELECT *
FROM OrderedOrders
ORDER BY CustomerID, ProductID, OrderDate;

Result

CustomerIDProductIDOrderDateDenseRankPrevOrderDatePrevPrevOrderDate
11012022-01-011NULLNULL
11012022-01-0222022-01-01NULL
11012022-01-0332022-01-022022-01-01
11022022-01-101NULLNULL
21012022-01-011NULLNULL
21012022-01-0522022-01-01NULL
31012022-01-011NULLNULL
31012022-01-0222022-01-01NULL
31012022-01-0332022-01-022022-01-01
31012022-01-0442022-01-032022-01-02

Note that:

  • The DENSE_RANK function assigns a unique rank to each row within a partition of a result set.
  • The LAG function returns the value of a previous row within a partition of a result set.
  • The PARTITION BY clause divides the result set into partitions to which the function is applied.
  • The ORDER BY clause specifies the order of the rows within each partition.

Python

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window

# Create a SparkSession
spark = SparkSession.builder.appName("Example").getOrCreate()

# Create a DataFrame
data = [
    (1, 101, "2022-01-01"),
    (1, 101, "2022-01-02"),
    (1, 101, "2022-01-03"),
    (1, 102, "2022-01-10"),
    (2, 101, "2022-01-01"),
    (2, 101, "2022-01-05"),
    (3, 101, "2022-01-01"),
    (3, 101, "2022-01-02"),
    (3, 101, "2022-01-03"),
    (3, 101, "2022-01-04")
]

columns = ["CustomerID", "ProductID", "OrderDate"]

df = spark.createDataFrame(data).toDF(*columns)

# Define the window
window = Window.partitionBy("CustomerID", "ProductID").orderBy("OrderDate")

# Calculate the dense rank, lag, and date differences
df = df.withColumn("DenseRank", F.dense_rank().over(window))
df = df.withColumn("PrevOrderDate", F.lag("OrderDate", 1).over(window))
df = df.withColumn("PrevPrevOrderDate", F.lag("OrderDate", 2).over(window))
df = df.withColumn("OrderDate_PrevOrderDate", F.datediff("OrderDate", F.col("PrevOrderDate")))
df = df.withColumn("OrderDate_PrevPrevOrderDate", F.datediff("OrderDate", F.col("PrevPrevOrderDate")))

# Show the results
df.show()


Here are the solutions in Spark SQL and PySpark:

Spark SQL Solution

SQL

WITH OrderedOrders AS (
  SELECT CustomerID, ProductID, OrderDate,
         LAG(OrderDate, 1) OVER (PARTITION BY CustomerID, ProductID ORDER BY OrderDate) AS PrevOrderDate,
         LAG(OrderDate, 2) OVER (PARTITION BY CustomerID, ProductID ORDER BY OrderDate) AS PrevPrevOrderDate
  FROM Orders
)
SELECT DISTINCT CustomerID
FROM OrderedOrders
WHERE OrderDate - PrevOrderDate <= INTERVAL 1 DAY
  AND OrderDate - PrevPrevOrderDate <= INTERVAL 2 DAY

PySpark Solution

Python

from pyspark.sql import Window
from pyspark.sql.functions import lag, col

window = Window.partitionBy("CustomerID", "ProductID").orderBy("OrderDate")

ordered_orders = df.withColumn("PrevOrderDate", lag("OrderDate", 1).over(window)) \
                      .withColumn("PrevPrevOrderDate", lag("OrderDate", 2).over(window))

consecutive_orders = ordered_orders.filter((col("OrderDate") - col("PrevOrderDate")).cast("long") <= 86400) \
                                        .filter((col("OrderDate") - col("PrevPrevOrderDate")).cast("long") <= 172800)

consecutive_customers = consecutive_orders.select("CustomerID").distinct()

In both solutions, we use a window function to lag the OrderDate column by 1 and 2 rows, partitioned by CustomerID and ProductID. We then filter the results to find customers who have placed orders at least 3 times consecutively.

Note that the Spark SQL solution uses the LAG function, while the PySpark solution uses the lag function from the pyspark.sql.functions module.

Also, in the PySpark solution, we cast the result of the subtraction to a long value to compare it with the number of seconds in a day (86400) or two days (172800).

How to find employees whose salary is greater than the average salary of employees in their respective location?
Table Name: Employee
Column Names: EmpID (Employee ID), Emp_name (Employee Name), Manager_id (Manager ID), Salary (Employee Salary), Location (Employee Location)
using spark sql or pyspark

Complete PySpark Solution with Sample Data

from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import col, avg

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("EmployeeSalaryExample").getOrCreate()

# Sample data
data = [
    (1, "Alice", 10, 8000, "New York"),
    (2, "Bob", 11, 12000, "New York"),
    (3, "Charlie", 10, 7000, "Chicago"),
    (4, "David", 12, 11000, "New York"),
    (5, "Eve", 13, 6000, "Chicago")
]

# Define schema
columns = ["EmpID", "Emp_name", "Manager_id", "Salary", "Location"]

# Create DataFrame
employee_df = spark.createDataFrame(data, columns)

# Define a window partitioned by location
location_window = Window.partitionBy("Location")

# Step 1: Calculate the average salary for each location
employee_with_avg_salary = employee_df.withColumn(
    "avg_salary", avg("Salary").over(location_window)
)

# Step 2: Filter employees whose salary is greater than the average salary
result = employee_with_avg_salary.filter(col("Salary") > col("avg_salary"))

# Show the result
result.select("EmpID", "Emp_name", "Manager_id", "Salary", "Location", "avg_salary").show()

Explanation of the Process:

  1. Spark Session: The Spark session is initialized with spark = SparkSession.builder.master("local").appName("EmployeeSalaryExample").getOrCreate().
  2. Sample Data: A list of tuples is created where each tuple represents an employee. Each tuple contains:
    • EmpID: Employee ID
    • Emp_name: Employee Name
    • Manager_id: Manager ID
    • Salary: Employee Salary
    • Location: Employee Location
  3. Schema: The column names are defined, and a DataFrame employee_df is created using the sample data and schema.
  4. Window Function: The window is defined by partitioning the data by Location so that the average salary can be computed for each location.
  5. Filter: We filter out employees whose salary is greater than the calculated average salary for their respective location.
  6. Show Result: Finally, the result is displayed showing the EmpID, Emp_name, Manager_id, Salary, Location, and avg_salary.

Example Output:

+-----+--------+-----------+------+--------+----------+
|EmpID|Emp_name|Manager_id |Salary|Location|avg_salary|
+-----+--------+-----------+------+--------+----------+
|    2|     Bob|         11| 12000| New York|     10000|
|    4|   David|         12| 11000| New York|     10000|
+-----+--------+-----------+------+--------+----------+

Spark SQL Version

If you’d prefer to use Spark SQL, the following steps will also work. You can first register the DataFrame as a temporary table and then run the SQL query.

# Register the DataFrame as a temporary SQL table
employee_df.createOrReplaceTempView("Employee")

# Spark SQL to find employees with salary greater than average salary in their location
query = """
WITH LocationAvgSalary AS (
    SELECT 
        EmpID,
        Emp_name,
        Manager_id,
        Salary,
        Location,
        AVG(Salary) OVER (PARTITION BY Location) AS avg_salary
    FROM Employee
)
SELECT 
    EmpID,
    Emp_name,
    Manager_id,
    Salary,
    Location,
    avg_salary
FROM LocationAvgSalary
WHERE Salary > avg_salary
"""

# Run the SQL query and show the results
result_sql = spark.sql(query)
result_sql.show()

Both methods (PySpark with Window functions and Spark SQL) will provide the same result.

aggregate functions can be applied with window functions in PySpark. These functions are used to compute aggregate values over a defined window or partition, and they do not require explicit grouping.


Supported Aggregate Functions for Window Operations

Some commonly used aggregate functions include:

  1. avg: Computes the average value.
  2. sum: Computes the sum of values.
  3. count: Counts the number of values.
  4. min: Finds the minimum value.
  5. max: Finds the maximum value.
  6. stddev: Computes the standard deviation.
  7. first: Gets the first value in the window.
  8. last: Gets the last value in the window.
  9. collect_list: Collects all values in the window as a list.
  10. collect_set: Collects all distinct values in the window as a set.

General Syntax for Window Aggregate Functions

from pyspark.sql.window import Window
from pyspark.sql.functions import avg, sum, count, min, max

# Define a window
window_spec = Window.partitionBy("partition_column").orderBy("order_column")

# Apply aggregate function over the window
df.withColumn("new_column", aggregate_function("column").over(window_spec))

Examples

1. Average Salary by Department

Compute the average salary for each department and append it as a new column.

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("WindowFunctionsExample").getOrCreate()

# Sample data
data = [
    (1, "Alice", "HR", 6000),
    (2, "Bob", "HR", 7000),
    (3, "Charlie", "IT", 8000),
    (4, "David", "IT", 12000),
    (5, "Eve", "Finance", 9000)
]
columns = ["EmpID", "Emp_name", "Department", "Salary"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Define the window specification
window_spec = Window.partitionBy("Department")

# Add a column with the average salary per department
df = df.withColumn("avg_salary", avg("Salary").over(window_spec))

df.show()

Output:

+-----+--------+----------+------+----------+
|EmpID|Emp_name|Department|Salary|avg_salary|
+-----+--------+----------+------+----------+
|    1|   Alice|        HR|  6000|    6500.0|
|    2|     Bob|        HR|  7000|    6500.0|
|    3| Charlie|        IT|  8000|   10000.0|
|    4|   David|        IT| 12000|   10000.0|
|    5|     Eve|   Finance|  9000|    9000.0|
+-----+--------+----------+------+----------+

2. Rank Employees by Salary Within Departments

Calculate rank based on salary within each department.

from pyspark.sql.functions import rank

# Define a window specification
window_spec = Window.partitionBy("Department").orderBy(col("Salary").desc())

# Add a rank column
df = df.withColumn("rank", rank().over(window_spec))

df.show()

Output:

+-----+--------+----------+------+----------+----+
|EmpID|Emp_name|Department|Salary|avg_salary|rank|
+-----+--------+----------+------+----------+----+
|    2|     Bob|        HR|  7000|    6500.0|   1|
|    1|   Alice|        HR|  6000|    6500.0|   2|
|    4|   David|        IT| 12000|   10000.0|   1|
|    3| Charlie|        IT|  8000|   10000.0|   2|
|    5|     Eve|   Finance|  9000|    9000.0|   1|
+-----+--------+----------+------+----------+----+

3. Total Salary by Department

Calculate the total salary for each department.

from pyspark.sql.functions import sum

# Add a column with the total salary per department
df = df.withColumn("total_salary", sum("Salary").over(window_spec))

df.show()

Output:

+-----+--------+----------+------+----------+----+------------+
|EmpID|Emp_name|Department|Salary|avg_salary|rank|total_salary|
+-----+--------+----------+------+----------+----+------------+
|    2|     Bob|        HR|  7000|    6500.0|   1|       13000|
|    1|   Alice|        HR|  6000|    6500.0|   2|       13000|
|    4|   David|        IT| 12000|   10000.0|   1|       20000|
|    3| Charlie|        IT|  8000|   10000.0|   2|       20000|
|    5|     Eve|   Finance|  9000|    9000.0|   1|        9000|
+-----+--------+----------+------+----------+----+------------+

Points to Remember

  1. Window Specification: You can define partitioning (partitionBy) and ordering (orderBy) for your window.
  2. Partition vs Aggregate:
    • Without a window, aggregates work on the entire DataFrame.
    • With a window, aggregates are calculated for each partition.
  3. Aggregate Functions: These work seamlessly with window specifications and are highly optimized in PySpark.

How to calculate the percentage of successful payments for each driver. A payment is considered successful if its status is ‘Completed’.

Table Name: Rides
Column Names: ride_id (Ride ID), driver_id (Driver ID), fare_amount (Fare Amount), driver_rating (Driver Rating), start_time (Start Time)

Table Name: Payments
Column Names: payment_id (Payment ID), ride_id (Ride ID), payment_status (Payment Status)

Step-by-Step Solution to Calculate the Percentage of Successful Payments

We will:

  1. Create sample data for the Rides and Payments tables.
  2. Join the tables on ride_id to associate rides with payment statuses.
  3. Calculate the percentage of successful payments (Completed status) for each driver.

Spark SQL Solution

Sample Data Creation

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("PercentageSuccess").getOrCreate()

# Sample data for Rides table
rides_data = [
    (1, 'D1', 100, 4.5, '2024-01-01 10:00:00'),
    (2, 'D1', 150, 4.7, '2024-01-02 12:00:00'),
    (3, 'D2', 200, 4.8, '2024-01-03 14:00:00'),
    (4, 'D2', 180, 4.6, '2024-01-04 16:00:00'),
    (5, 'D3', 120, 4.3, '2024-01-05 18:00:00'),
]
rides_columns = ["ride_id", "driver_id", "fare_amount", "driver_rating", "start_time"]

rides_df = spark.createDataFrame(rides_data, rides_columns)
rides_df.createOrReplaceTempView("Rides")

# Sample data for Payments table
payments_data = [
    (101, 1, 'Completed'),
    (102, 2, 'Failed'),
    (103, 3, 'Completed'),
    (104, 4, 'Completed'),
    (105, 5, 'Failed'),
]
payments_columns = ["payment_id", "ride_id", "payment_status"]

payments_df = spark.createDataFrame(payments_data, payments_columns)
payments_df.createOrReplaceTempView("Payments")

Query in Spark SQL

WITH DriverPayments AS (
    SELECT 
        R.driver_id,
        COUNT(*) AS total_rides,
        SUM(CASE WHEN P.payment_status = 'Completed' THEN 1 ELSE 0 END) AS successful_payments
    FROM Rides R
    JOIN Payments P ON R.ride_id = P.ride_id
    GROUP BY R.driver_id
),
DriverSuccessRate AS (
    SELECT 
        driver_id,
        total_rides,
        successful_payments,
        ROUND((successful_payments * 100.0) / total_rides, 2) AS success_rate
    FROM DriverPayments
)
SELECT * FROM DriverSuccessRate;

PySpark Solution

from pyspark.sql.functions import count, sum, when, col, round

# Join the two DataFrames
joined_df = rides_df.join(payments_df, "ride_id")

# Calculate total rides and successful payments
result_df = joined_df.groupBy("driver_id") \
    .agg(
        count("*").alias("total_rides"),
        sum(when(col("payment_status") == "Completed", 1).otherwise(0)).alias("successful_payments")
    ) \
    .withColumn("success_rate", round((col("successful_payments") * 100.0) / col("total_rides"), 2))

result_df.show()

Expected Output

For the given sample data:

driver_idtotal_ridessuccessful_paymentssuccess_rate
D12150.00
D222100.00
D3100.00

Key Points:

  • Spark SQL uses SUM(CASE WHEN ...) for conditional aggregation.
  • PySpark uses when().otherwise() to create conditional expressions.
  • Both methods compute the same result efficiently for large datasets.

How to calculate the percentage of menu items sold for each restaurant.

Table Name: Items
Column Names: item_id (Item ID), rest_id (Restaurant ID)

Table Name: Orders
Column Names: order_id (Order ID), item_id (Item ID), quantity (Quantity), is_offer (Is Offer), client_id (Client ID), Date_Timestamp (Date Timestamp)

Calculate the Percentage of Menu Items Sold for Each Restaurant

We need to:

  1. Determine the total unique menu items available (Items table) for each restaurant.
  2. Find the number of unique menu items sold (Orders table) for each restaurant.
  3. Calculate the percentage of menu items sold for each restaurant.

Spark SQL Solution

Sample Data Creation

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("MenuItemsPercentage").getOrCreate()

# Sample data for Items table
items_data = [
    (1, 'R1'),
    (2, 'R1'),
    (3, 'R1'),
    (4, 'R2'),
    (5, 'R2'),
    (6, 'R3'),
]
items_columns = ["item_id", "rest_id"]

items_df = spark.createDataFrame(items_data, items_columns)
items_df.createOrReplaceTempView("Items")

# Sample data for Orders table
orders_data = [
    (101, 1, 2, False, 'C1', '2024-01-01 10:00:00'),
    (102, 2, 1, True, 'C2', '2024-01-02 12:00:00'),
    (103, 3, 3, False, 'C1', '2024-01-03 14:00:00'),
    (104, 4, 1, True, 'C3', '2024-01-04 16:00:00'),
    (105, 6, 2, False, 'C4', '2024-01-05 18:00:00'),
]
orders_columns = ["order_id", "item_id", "quantity", "is_offer", "client_id", "date_timestamp"]

orders_df = spark.createDataFrame(orders_data, orders_columns)
orders_df.createOrReplaceTempView("Orders")

Query in Spark SQL

WITH TotalMenuItems AS (
    SELECT
        rest_id,
        COUNT(DISTINCT item_id) AS total_menu_items
    FROM Items
    GROUP BY rest_id
),
SoldMenuItems AS (
    SELECT
        I.rest_id,
        COUNT(DISTINCT O.item_id) AS sold_menu_items
    FROM Orders O
    JOIN Items I ON O.item_id = I.item_id
    GROUP BY I.rest_id
),
MenuPercentage AS (
    SELECT
        T.rest_id,
        T.total_menu_items,
        S.sold_menu_items,
        ROUND((S.sold_menu_items * 100.0) / T.total_menu_items, 2) AS percentage_sold
    FROM TotalMenuItems T
    LEFT JOIN SoldMenuItems S ON T.rest_id = S.rest_id
)
SELECT * FROM MenuPercentage;

PySpark Solution

from pyspark.sql.functions import countDistinct, col, round

# Calculate total menu items for each restaurant
total_menu_items_df = items_df.groupBy("rest_id") \
    .agg(countDistinct("item_id").alias("total_menu_items"))

# Calculate sold menu items for each restaurant
sold_menu_items_df = orders_df.join(items_df, "item_id") \
    .groupBy("rest_id") \
    .agg(countDistinct("item_id").alias("sold_menu_items"))

# Join the two DataFrames and calculate the percentage
result_df = total_menu_items_df.join(sold_menu_items_df, "rest_id", "left_outer") \
    .withColumn("percentage_sold", round((col("sold_menu_items") * 100.0) / col("total_menu_items"), 2))

result_df.show()

Expected Output

For the given sample data:

rest_idtotal_menu_itemssold_menu_itemspercentage_sold
R133100.00
R22150.00
R311100.00

Key Points:

  • COUNT(DISTINCT column) is used to count unique items.
  • The LEFT JOIN ensures that restaurants with no sold items are included in the results.
  • PySpark and Spark SQL yield the same result, suitable for large datasets.

How to compare the time taken for clients who placed their first order with an offer versus those without an offer to make their next order. Table Name: Orders Column Names: order_id (Order ID), user_id (User ID), is_offer (Is Offer), Date_Timestamp (Date Timestamp)

Compare the time taken for clients who placed their first order with an offer versus those without an offer to make their next order. This involves calculating the time difference between the first and second orders for each user, categorized by whether the first order was placed with an offer.


Steps to Solve

  1. Identify each user’s first order.
  2. Identify each user’s second order.
  3. Calculate the time difference between the first and second orders.
  4. Categorize the users based on whether their first order was placed with an offer.
  5. Compare the average time taken for the next order between the two groups.

Sample Data Creation

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, min, lead, datediff
from pyspark.sql.window import Window

# Initialize Spark session
spark = SparkSession.builder.appName("FirstOrderAnalysis").getOrCreate()

# Sample data
data = [
    (1, 'U1', True, '2024-01-01 10:00:00'),
    (2, 'U1', False, '2024-01-05 14:00:00'),
    (3, 'U2', True, '2024-01-02 12:00:00'),
    (4, 'U2', False, '2024-01-10 16:00:00'),
    (5, 'U3', False, '2024-01-03 08:00:00'),
    (6, 'U3', True, '2024-01-12 20:00:00'),
]
columns = ["order_id", "user_id", "is_offer", "Date_Timestamp"]

orders_df = spark.createDataFrame(data, columns)
orders_df = orders_df.withColumn("Date_Timestamp", col("Date_Timestamp").cast("timestamp"))
orders_df.show()

PySpark Solution

1. Identify First and Second Orders for Each User

Use a window function to rank orders based on the timestamp for each user.

from pyspark.sql.functions import unix_timestamp

# Define a window partitioned by user_id and ordered by Date_Timestamp
window_spec = Window.partitionBy("user_id").orderBy("Date_Timestamp")

# Add a rank column to identify the first and second orders
ranked_orders = orders_df.withColumn("order_rank", col("order_id").over(window_spec))

# Separate first and second orders
first_orders = ranked_orders.filter(col("order_rank") == 1) \
    .select("user_id", "is_offer", "Date_Timestamp")

second_orders = ranked_orders.filter(col("order_rank") == 2) \
    .select("user_id", col("Date_Timestamp").alias("second_order_timestamp"))

2. Calculate the Time Difference Between First and Second Orders

Join the first and second orders and calculate the time difference.

# Join first and second orders
joined_orders = first_orders.join(second_orders, "user_id", "inner")

# Calculate time difference in days
result_df = joined_orders.withColumn(
    "days_to_next_order",
    (unix_timestamp("second_order_timestamp") - unix_timestamp("Date_Timestamp")) / 86400
)

result_df.show()

3. Group by is_offer and Calculate Average Time

Group the results by whether the first order used an offer and calculate the average time difference.

from pyspark.sql.functions import avg

# Group by is_offer and calculate average time
average_time_df = result_df.groupBy("is_offer") \
    .agg(avg("days_to_next_order").alias("avg_days_to_next_order"))

average_time_df.show()

Expected Output

For the given sample data:

Final Result

is_offeravg_days_to_next_order
True6.5
False9.0

Key Concepts Used

  1. Window Functions: partitionBy and orderBy to rank the orders for each user.
  2. Date Calculations: Using unix_timestamp to calculate the difference between timestamps.
  3. Grouping and Aggregation: groupBy to calculate the average days to the next order based on the is_offer column.

Spark SQL Solution

Query

WITH RankedOrders AS (
    SELECT 
        user_id,
        is_offer,
        Date_Timestamp,
        ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY Date_Timestamp) AS order_rank
    FROM Orders
),
FirstOrders AS (
    SELECT 
        user_id, 
        is_offer, 
        Date_Timestamp AS first_order_timestamp
    FROM RankedOrders
    WHERE order_rank = 1
),
SecondOrders AS (
    SELECT 
        user_id, 
        Date_Timestamp AS second_order_timestamp
    FROM RankedOrders
    WHERE order_rank = 2
),
JoinedOrders AS (
    SELECT 
        F.user_id, 
        F.is_offer, 
        F.first_order_timestamp, 
        S.second_order_timestamp,
        DATEDIFF(S.second_order_timestamp, F.first_order_timestamp) AS days_to_next_order
    FROM FirstOrders F
    JOIN SecondOrders S ON F.user_id = S.user_id
)
SELECT 
    is_offer,
    AVG(days_to_next_order) AS avg_days_to_next_order
FROM JoinedOrders
GROUP BY is_offer;

This query achieves the same result as the PySpark solution but directly in SQL.

How to find the length of the longest sequence of consecutive numbers in the table.

Table Name: Consecutive
Column Names: number (Number)
Sample Table –
Number
1
2
3
4
10
11
20
21
22
23
24
30

To find the length of the longest sequence of consecutive numbers in the table Consecutive, we can use a difference-based grouping strategy in both Spark SQL and PySpark. The idea is to assign a group identifier to each sequence of consecutive numbers by leveraging the difference between the number and its row index.


Spark SQL Solution

Query:

WITH RankedNumbers AS (
    SELECT
        number,
        ROW_NUMBER() OVER (ORDER BY number) AS row_index
    FROM Consecutive
),
GroupedNumbers AS (
    SELECT
        number,
        row_index,
        number - row_index AS group_id
    FROM RankedNumbers
),
GroupLengths AS (
    SELECT
        group_id,
        COUNT(*) AS sequence_length
    FROM GroupedNumbers
    GROUP BY group_id
)
SELECT
    MAX(sequence_length) AS longest_consecutive_sequence
FROM GroupLengths;

Explanation:

  1. RankedNumbers: Assign a ROW_NUMBER to each number ordered by its value.
  2. GroupedNumbers: Calculate group_id as number - row_index. This creates a unique identifier for each sequence of consecutive numbers.
  3. GroupLengths: Count the number of elements in each group.
  4. Final Query: Find the maximum sequence length.

PySpark Solution

Sample Data:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, expr
from pyspark.sql.window import Window

# Initialize Spark session
spark = SparkSession.builder.appName("LongestConsecutiveSequence").getOrCreate()

# Sample data
data = [(1,), (2,), (3,), (4,), (10,), (11,), (20,), (21,), (22,), (23,), (24,), (30,)]
columns = ["number"]

consecutive_df = spark.createDataFrame(data, columns)

PySpark Implementation:

# Assign row numbers based on the order of numbers
window_spec = Window.orderBy("number")
ranked_df = consecutive_df.withColumn("row_index", row_number().over(window_spec))

# Create group_id based on the difference between number and row_index
grouped_df = ranked_df.withColumn("group_id", col("number") - col("row_index"))

# Count the size of each group
group_lengths = grouped_df.groupBy("group_id").count()

# Find the maximum sequence length
longest_sequence = group_lengths.agg({"count": "max"}).withColumnRenamed("max(count)", "longest_sequence")

longest_sequence.show()

Explanation:

  1. Ranked DataFrame: Compute a row_index for each number using row_number ordered by number.
  2. Group Identification: Calculate group_id as number - row_index.
  3. Group Lengths: Group by group_id and count the number of rows in each group.
  4. Find Maximum: Aggregate to find the maximum count.

Expected Output

For the sample data:

numberlongest_sequence
5
  • Sequences: [1, 2, 3, 4], [10, 11], [20, 21, 22, 23, 24], [30]
  • Longest Sequence: 5 (20, 21, 22, 23, 24)

How can you find employees whose salary is greater than their manager’s salary?
– Table name: Employees
– Columns: EmployeeID, Salary, ManagerID

To find employees whose salary is greater than their manager’s salary, you can use a self-join in Spark SQL or PySpark. Here’s how:


Spark SQL Solution

Query:

SELECT 
    e1.EmployeeID AS EmployeeID,
    e1.Salary AS EmployeeSalary,
    e2.Salary AS ManagerSalary,
    e1.ManagerID
FROM Employees e1
INNER JOIN Employees e2
    ON e1.ManagerID = e2.EmployeeID
WHERE e1.Salary > e2.Salary;

Explanation:

  1. Perform a self-join on the Employees table:
    • Join the table e1 (employees) with e2 (managers) on e1.ManagerID = e2.EmployeeID.
  2. Filter the results where the employee’s salary (e1.Salary) is greater than their manager’s salary (e2.Salary).
  3. Select relevant columns like employee ID, employee salary, manager salary, and manager ID.

PySpark Solution

Sample Data:

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("EmployeesSalaryComparison").getOrCreate()

# Sample data
data = [
    (1, 80000, 3),  # Employee 1 with Manager 3
    (2, 60000, 3),  # Employee 2 with Manager 3
    (3, 100000, None),  # Employee 3 (Manager) with no manager
    (4, 120000, 3),  # Employee 4 with Manager 3
    (5, 70000, 4)   # Employee 5 with Manager 4
]
columns = ["EmployeeID", "Salary", "ManagerID"]

employees_df = spark.createDataFrame(data, columns)

PySpark Implementation:

# Perform self-join on ManagerID and EmployeeID
joined_df = employees_df.alias("e1").join(
    employees_df.alias("e2"),
    col("e1.ManagerID") == col("e2.EmployeeID"),
    "inner"
).select(
    col("e1.EmployeeID").alias("EmployeeID"),
    col("e1.Salary").alias("EmployeeSalary"),
    col("e2.Salary").alias("ManagerSalary"),
    col("e1.ManagerID").alias("ManagerID")
)

# Filter employees whose salary is greater than their manager's salary
result_df = joined_df.filter(col("EmployeeSalary") > col("ManagerSalary"))

# Show results
result_df.show()

Explanation:

  1. Self-Join:
    • Alias the employees_df as e1 and e2.
    • Join on e1.ManagerID = e2.EmployeeID.
  2. Filter Condition:
    • Filter rows where the employee’s salary (e1.Salary) is greater than the manager’s salary (e2.Salary).
  3. Select Relevant Columns:
    • Include employee ID, employee salary, manager salary, and manager ID for output.

Expected Output

For the sample data:

EmployeeIDEmployeeSalaryManagerSalaryManagerID
41200001000003
  • Employee 4 has a salary of 120,000, which is greater than their manager’s salary of 100,000.

If you want to find employees whose salary is greater than their manager’s salary without using a self-join, you can try alternative approaches such as broadcast hash join, window functions, or aggregate filtering. Here’s how:


1. Using Window Functions

Instead of joining the table, you can use a window function to propagate the manager’s salary down to employees, avoiding a self-join.

PySpark Solution:

from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, max

# Create a window to find the manager's salary for each employee
window_spec = Window.partitionBy("ManagerID")

# Calculate manager's salary using window function
employees_with_manager_salary = employees.withColumn(
    "ManagerSalary", max("Salary").over(window_spec)
)

# Filter employees whose salary is greater than their manager's salary
result = employees_with_manager_salary.filter(
    (col("ManagerSalary").isNotNull()) & (col("Salary") > col("ManagerSalary"))
).select("EmployeeID", "Salary", "ManagerID", "ManagerSalary")

result.show()

Explanation:

  1. The Window.partitionBy("ManagerID") groups rows by ManagerID.
  2. The max("Salary").over(window_spec) extracts the manager’s salary for employees reporting to them.
  3. Filter rows where EmployeeSalary > ManagerSalary.

2. Using Aggregation and Lookup

Aggregate the table to find manager salaries, then join it back using a lookup table without a full self-join.

PySpark Solution:

# Aggregate to get manager salaries
manager_salaries = employees.groupBy("EmployeeID").agg(
    max("Salary").alias("ManagerSalary")
).withColumnRenamed("EmployeeID", "ManagerID")

# Join employees with aggregated manager salaries
result = employees.join(
    manager_salaries,
    on="ManagerID",
    how="left"
).filter(col("Salary") > col("ManagerSalary"))

result.show()

Explanation:

  1. Aggregate the Salary column to calculate each manager’s salary.
  2. Rename EmployeeID to ManagerID for an easy lookup.
  3. Perform a simple lookup join instead of a full self-join.

3. Using Cached Manager Lookup

Create a separate lookup table of managers and cache it for efficient repeated lookups.

PySpark Solution:

# Create a manager lookup DataFrame
managers = employees.select("EmployeeID", "Salary").withColumnRenamed("EmployeeID", "ManagerID").alias("managers")

# Cache the lookup table for better performance
managers.cache()

# Join employees with the manager lookup table
result = employees.join(
    managers,
    on="ManagerID",
    how="inner"
).filter(col("Salary") > col("managers.Salary"))

result.show()

Explanation:

  1. Select manager data as a separate lookup table.
  2. Cache the lookup table for performance, reducing shuffles during joins.

4. Using RDD Transformation (for large datasets)

If the data is massive, you can use RDD transformations for better scalability (though less intuitive).

PySpark RDD Example:

# Convert the DataFrame to an RDD
rdd = employees.rdd.map(lambda row: (row.EmployeeID, row.ManagerID, row.Salary))

# Create a dictionary of manager salaries
manager_salaries = rdd.map(lambda x: (x[0], x[2])).collectAsMap()

# Filter employees with salary greater than their manager's salary
result_rdd = rdd.filter(lambda x: x[1] in manager_salaries and x[2] > manager_salaries[x[1]])

# Convert RDD back to DataFrame
result = result_rdd.map(lambda x: (x[0], x[1], x[2], manager_salaries[x[1]])).toDF(["EmployeeID", "ManagerID", "EmployeeSalary", "ManagerSalary"])
result.show()

Explanation:

  1. Use collectAsMap() to create a dictionary of manager salaries.
  2. Filter employees directly using the dictionary lookup.
  3. Avoid shuffles or joins for faster computation.

Which Method is Better?

MethodAdvantagesUse Case
Window FunctionsAvoids join, leverages Spark optimizationsWhen datasets are moderately large.
Aggregation LookupSimple logic, fewer shufflesWhen manager counts are smaller.
Cached Lookup TableEfficient for multiple reuseWhen repeated queries are needed.
RDD TransformationsAvoids DataFrame overheadsFor very large datasets.

Each approach has trade-offs; the window function method is usually the most elegant and avoids costly joins while maintaining good performance.

Subscribe