Write a spark SQL query and pyspark dataframe based script and a coding sollution in python to identify riders who have taken at least one trip every day for the continuous 10 days.


Table Name: Trip
Column Names: trip_id (Trip ID), driver_id (Driver ID), rider_id (Rider ID), trip_start_timestamp (Trip Start Timestamp) with example data

To solve the problem of identifying riders who have taken at least one trip every day for 10 continuous days, you can approach this in three ways:

  1. Spark SQL Query
  2. PySpark DataFrame API
  3. Python Coding Solution

Let’s use the following example data for the Trip table:

Example Data:

trip_iddriver_idrider_idtrip_start_timestamp
11012012024-10-01 08:15:00
21022012024-10-02 10:00:00
31012022024-10-01 09:30:00
41022012024-10-03 12:45:00
51032022024-10-01 11:00:00
61012012024-10-04 09:00:00
71022012024-10-05 13:00:00
81032022024-10-05 14:15:00

1. Spark SQL Query

You can write a SQL query that:

  • Extracts the day for each trip.
  • Groups trips by rider_id and date (ignoring time).
  • Identifies riders who have trips for 10 consecutive days.

SQL Query:

WITH RiderTrips AS (
  SELECT
    rider_id,
    DATE(trip_start_timestamp) AS trip_date
  FROM
    Trip
  GROUP BY
    rider_id, DATE(trip_start_timestamp)
),
ConsecutiveTrips AS (
  SELECT
    rider_id,
    trip_date,
    ROW_NUMBER() OVER (PARTITION BY rider_id ORDER BY trip_date) AS row_num,
    DATEDIFF(trip_date, LAG(trip_date, 1) OVER (PARTITION BY rider_id ORDER BY trip_date)) AS date_diff
  FROM
    RiderTrips
)
SELECT
  rider_id
FROM
  ConsecutiveTrips
WHERE
  date_diff = 1
GROUP BY
  rider_id
HAVING
  COUNT(DISTINCT trip_date) >= 10

Explanation:

  • RiderTrips: Extracts unique trip dates per rider.
  • ConsecutiveTrips: Computes the difference between consecutive dates (DATEDIFF), ensuring that gaps between trips are exactly 1 day.
  • The query then groups trips by rider and counts distinct consecutive dates, filtering for riders with at least 10 consecutive days of trips.

2. PySpark DataFrame API

You can achieve the same logic using PySpark’s DataFrame API to handle date manipulation and window functions.

PySpark Code:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Initialize Spark session
spark = SparkSession.builder.appName("RiderTrips").getOrCreate()

# Sample data
data = [
    (1, 101, 201, "2024-10-01 08:15:00"),
    (2, 102, 201, "2024-10-02 10:00:00"),
    (3, 101, 202, "2024-10-01 09:30:00"),
    (4, 102, 201, "2024-10-03 12:45:00"),
    (5, 103, 202, "2024-10-01 11:00:00"),
    (6, 101, 201, "2024-10-04 09:00:00"),
    (7, 102, 201, "2024-10-05 13:00:00"),
    (8, 103, 202, "2024-10-05 14:15:00")
]

# Define schema and create DataFrame
columns = ["trip_id", "driver_id", "rider_id", "trip_start_timestamp"]
df = spark.createDataFrame(data, schema=columns)

# Convert trip_start_timestamp to DateType
df = df.withColumn("trip_start_timestamp", F.to_timestamp("trip_start_timestamp"))

# Extract trip date
df = df.withColumn("trip_date", F.to_date("trip_start_timestamp"))

# Remove duplicate trips by rider and trip date
df = df.dropDuplicates(["rider_id", "trip_date"])

# Window specification to calculate row numbers and date differences
window_spec = Window.partitionBy("rider_id").orderBy("trip_date")

# Add row number and date difference columns
df = df.withColumn("row_num", F.row_number().over(window_spec))
df = df.withColumn("prev_date", F.lag("trip_date", 1).over(window_spec))
df = df.withColumn("date_diff", F.datediff("trip_date", F.col("prev_date")))

# Filter for consecutive trips (date_diff = 1)
df = df.withColumn("consecutive_group", F.sum((F.col("date_diff") == 1).cast("int")).over(window_spec))

# Filter to identify riders with at least 10 consecutive days of trips
result = df.groupBy("rider_id").agg(F.countDistinct("trip_date").alias("num_days")) \
    .filter(F.col("num_days") >= 10)

result.show()

# Stop the session
spark.stop()

Explanation:

  • The DataFrame API uses withColumn to create new columns like trip_date, row_num, and date_diff.
  • A window specification (Window.partitionBy().orderBy()) is used to compute row numbers and date differences for consecutive trips.
  • The DataFrame is then filtered to select only those riders who meet the requirement of at least 10 consecutive days of trips.

3. Python Coding Solution

If you prefer a pure Python solution (assuming you have the data loaded in a list of dictionaries or some iterable format), here’s a basic approach using pandas and collections.

Python Code:

import pandas as pd
from datetime import timedelta

# Sample data in dictionary format
data = {
    "trip_id": [1, 2, 3, 4, 5, 6, 7, 8],
    "driver_id": [101, 102, 101, 102, 103, 101, 102, 103],
    "rider_id": [201, 201, 202, 201, 202, 201, 201, 202],
    "trip_start_timestamp": [
        "2024-10-01 08:15:00", "2024-10-02 10:00:00", "2024-10-01 09:30:00",
        "2024-10-03 12:45:00", "2024-10-01 11:00:00", "2024-10-04 09:00:00",
        "2024-10-05 13:00:00", "2024-10-05 14:15:00"
    ]
}

# Create pandas DataFrame
df = pd.DataFrame(data)

# Convert trip_start_timestamp to datetime
df['trip_start_timestamp'] = pd.to_datetime(df['trip_start_timestamp'])

# Extract date from the timestamp
df['trip_date'] = df['trip_start_timestamp'].dt.date

# Remove duplicates based on rider_id and trip_date
df = df.drop_duplicates(subset=['rider_id', 'trip_date'])

# Sort by rider_id and trip_date
df = df.sort_values(by=['rider_id', 'trip_date'])

# Group by rider_id and identify consecutive days
def check_consecutive_days(trips):
    consecutive_days = 1
    for i in range(1, len(trips)):
        if trips[i] - trips[i-1] == timedelta(days=1):
            consecutive_days += 1
            if consecutive_days == 10:
                return True
        else:
            consecutive_days = 1
    return False

# Apply check for each rider
riders_with_10_days = df.groupby('rider_id')['trip_date'].apply(lambda x: check_consecutive_days(list(x)))

# Filter riders who meet the criteria
riders_with_10_days = riders_with_10_days[riders_with_10_days]
print(riders_with_10_days.index.tolist())

Explanation:

  • Convert timestamps to datetime objects and extract the date.
  • Remove duplicates based on rider_id and trip_date.
  • Group by rider_id, and for each rider, check if they have 10 consecutive days of trips using the check_consecutive_days() function.

But above solutions provided doesn’t handle gaps in the consecutive days correctly. The solution I shared only counts the days where date_diff == 1 but doesn’t reset when there’s a gap (e.g., date_diff == 2).

so The corrected Solutions are:-

Corrected PySpark Code:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Sample data
data = [
    (1, 101, 201, "2024-10-01 08:15:00"),
    (2, 102, 201, "2024-10-02 10:00:00"),
    (3, 101, 202, "2024-10-01 09:30:00"),
    (4, 102, 201, "2024-10-03 12:45:00"),
    (5, 103, 202, "2024-10-01 11:00:00"),
    (6, 101, 201, "2024-10-04 09:00:00"),
    (7, 102, 201, "2024-10-05 13:00:00"),
    (8, 103, 202, "2024-10-05 14:15:00"),
    (9, 101, 201, "2024-10-06 08:00:00"),
    (10, 101, 201, "2024-10-07 09:00:00"),
    (11, 101, 201, "2024-10-08 09:30:00"),
    (12, 101, 201, "2024-10-09 10:00:00"),
    (13, 101, 201, "2024-10-10 11:00:00")
]

# Define schema and create DataFrame
columns = ["trip_id", "driver_id", "rider_id", "trip_start_timestamp"]
df = spark.createDataFrame(data, schema=columns)

# Convert trip_start_timestamp to DateType
df = df.withColumn("trip_start_timestamp", F.to_timestamp("trip_start_timestamp"))

# Extract trip date
df = df.withColumn("trip_date", F.to_date("trip_start_timestamp"))

# Remove duplicate trips by rider and trip date
df = df.dropDuplicates(["rider_id", "trip_date"])

# Window specification to calculate row numbers and date differences
window_spec = Window.partitionBy("rider_id").orderBy("trip_date")

# Add row number
df = df.withColumn("row_num", F.row_number().over(window_spec))

# Add the difference between row number and the actual trip_date (lagged by 1 row)
df = df.withColumn("date_diff", F.datediff("trip_date", F.lag("trip_date", 1).over(window_spec)))

# Assign a group when the difference between dates is greater than 1 (gap in dates)
df = df.withColumn("is_new_group", F.when(F.col("date_diff") > 1, 1).otherwise(0))

# Create a cumulative sum to identify the consecutive groups
df = df.withColumn("group_id", F.sum("is_new_group").over(window_spec))

# Now we group by rider and group_id and filter for those who have 10 consecutive days
result = df.groupBy("rider_id", "group_id").agg(
    F.countDistinct("trip_date").alias("num_days"),
    F.min("trip_date").alias("start_date"),
    F.max("trip_date").alias("end_date")
).filter(F.col("num_days") >= 10)

result.show(truncate=False)

Explanation of the Corrected Logic:

  1. Row Number Calculation: We calculate the row_num to track the order of trips for each rider_id.
  2. Date Difference: We calculate the difference between the current trip’s date and the previous trip’s date using F.datediff(). This lets us track gaps between trip dates.
  3. New Group Assignment: If date_diff > 1, it means there is a gap between trip dates, and we need to mark this as a new group (is_new_group). Otherwise, it continues the previous streak.
  4. Cumulative Grouping: Using a cumulative sum of is_new_group, we can identify streaks of consecutive trips for each rider_id using group_id.
  5. Filtering Consecutive Days: Finally, we group by rider_id and group_id and count distinct trip dates in each group. Only groups with at least 10 consecutive days are kept.

Expected Output:

This will return the rider_id and the date range (start_date, end_date) of the 10-day streaks.


This approach ensures that any gap between consecutive trips resets the counting, and only continuous streaks of 10 days will be considered valid.

Spark SQL Solution

You can directly write this query in your SQL context if you’re using Spark SQL. This solution will follow a similar logic as the PySpark DataFrame approach.

WITH RiderTrips AS (
    SELECT rider_id,
           trip_date,
           ROW_NUMBER() OVER (PARTITION BY rider_id ORDER BY trip_date) as row_num,
           LAG(trip_date) OVER (PARTITION BY rider_id ORDER BY trip_date) as prev_trip_date
    FROM (
        SELECT rider_id,
               CAST(trip_start_timestamp AS DATE) as trip_date
        FROM Trip
        GROUP BY rider_id, CAST(trip_start_timestamp AS DATE)
    )
),

ConsecutiveTrips AS (
    SELECT rider_id,
           trip_date,
           row_num,
           prev_trip_date,
           DATEDIFF(trip_date, prev_trip_date) as date_diff,
           SUM(CASE WHEN DATEDIFF(trip_date, prev_trip_date) > 1 THEN 1 ELSE 0 END) OVER (PARTITION BY rider_id ORDER BY trip_date) as group_id
    FROM RiderTrips
)

SELECT rider_id, MIN(trip_date) as start_date, MAX(trip_date) as end_date, COUNT(DISTINCT trip_date) as num_days
FROM ConsecutiveTrips
GROUP BY rider_id, group_id
HAVING COUNT(DISTINCT trip_date) >= 10;

Explanation:

  1. RiderTrips:
    • Extracts rider_id and the trip_date (cast from trip_start_timestamp) for each rider and assigns a row number and previous trip date.
    • This part also removes duplicate trips for the same rider on the same date.
  2. ConsecutiveTrips:
    • Calculates the date_diff between the current and previous trip date.
    • Identifies the group_id where trips are consecutive, resetting the group when a gap (date_diff > 1) occurs.
  3. Final Query:
    • Aggregates by rider_id and group_id and counts the number of distinct trip dates within each group.
    • Filters out groups with less than 10 consecutive days.

Python Script Solution (Native Python without PySpark)

To solve this problem using pure Python, you can simulate the same logic without relying on Spark. Here’s how you can achieve that:

from datetime import datetime, timedelta
import pandas as pd

# Example Data
data = {
    "trip_id": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13],
    "driver_id": [101, 102, 101, 102, 103, 101, 102, 103, 101, 101, 101, 101, 101],
    "rider_id": [201, 201, 202, 201, 202, 201, 201, 202, 201, 201, 201, 201, 201],
    "trip_start_timestamp": [
        "2024-10-01 08:15:00", "2024-10-02 10:00:00", "2024-10-01 09:30:00",
        "2024-10-03 12:45:00", "2024-10-01 11:00:00", "2024-10-04 09:00:00",
        "2024-10-05 13:00:00", "2024-10-05 14:15:00", "2024-10-06 08:00:00",
        "2024-10-07 09:00:00", "2024-10-08 09:30:00", "2024-10-09 10:00:00",
        "2024-10-10 11:00:00"
    ]
}

# Convert to DataFrame
df = pd.DataFrame(data)

# Convert timestamps to datetime
df['trip_start_timestamp'] = pd.to_datetime(df['trip_start_timestamp'])

# Extract trip date
df['trip_date'] = df['trip_start_timestamp'].dt.date

# Remove duplicate trips per rider on the same day
df = df.drop_duplicates(subset=['rider_id', 'trip_date'])

# Sort by rider_id and trip_date
df = df.sort_values(by=['rider_id', 'trip_date']).reset_index(drop=True)

# Calculate the difference in days between consecutive trips
df['date_diff'] = df.groupby('rider_id')['trip_date'].diff().dt.days

# Initialize variables to store results
consecutive_groups = []
group_id = 0

# Group trips based on consecutive day streaks
for idx, row in df.iterrows():
    if pd.isna(row['date_diff']) or row['date_diff'] > 1:
        group_id += 1  # Start a new group when date_diff > 1 or for the first row
    consecutive_groups.append(group_id)

# Add the group_id to the DataFrame
df['group_id'] = consecutive_groups

# Count consecutive days in each group for each rider
result = df.groupby(['rider_id', 'group_id']).agg(
    num_days=('trip_date', 'count'),
    start_date=('trip_date', 'min'),
    end_date=('trip_date', 'max')
).reset_index()

# Filter for groups with at least 10 consecutive days
final_result = result[result['num_days'] >= 10]

print(final_result)

Explanation:

  1. Data Preparation:
    • We convert the trip_start_timestamp to a Python datetime object and extract the trip_date.
    • We also remove duplicate trips for the same rider on the same day.
  2. Date Difference Calculation:
    • We calculate the difference in days (date_diff) between consecutive trips for each rider.
  3. Grouping by Consecutive Streaks:
    • We create a new group (group_id) whenever there’s a gap of more than 1 day between consecutive trips.
  4. Count Consecutive Days:
    • For each rider, we count the number of consecutive days within each group and filter groups that have at least 10 consecutive days of trips.

Example Output (Python):

 rider_id  group_id  num_days  start_date    end_date
0       201         1        10  2024-10-01  2024-10-10

This will return the riders who have taken trips for at least 10 consecutive days, along with the start and end dates of that streak.


1A.Write a spark SQL query and pyspark dataframe based script and a coding sollution in python to identify riders who have taken at least one trip every day for the last 10 days.

Table Name: Trip
Column Names: trip_id (Trip ID), driver_id (Driver ID), rider_id (Rider ID), trip_start_timestamp (Trip Start Timestamp) with example data

Let’s break down this problem into three parts:

  1. Spark SQL query.
  2. PySpark DataFrame-based solution.
  3. Python coding solution.

Example Data

Here is a simplified example of what the Trip table might look like:

trip_iddriver_idrider_idtrip_start_timestamp
11015012023-10-01 08:30:00
21025022023-10-02 09:00:00
31015012023-10-03 07:45:00
41035032023-10-01 12:15:00
51015012023-10-04 18:00:00

The goal is to find riders who have taken at least one trip every day for the last 10 days.

1. Spark SQL Query Solution

We can achieve this using the following approach:

  • Extract the date part from the trip_start_timestamp.
  • Count distinct dates for each rider.
  • Filter riders who have trip records for at least the last 10 consecutive days.

Spark SQL Query:

WITH TripDates AS (
    SELECT 
        rider_id, 
        DATE_FORMAT(trip_start_timestamp, 'yyyy-MM-dd') AS trip_date
    FROM 
        Trip
    WHERE 
        trip_start_timestamp >= DATE_SUB(CURRENT_DATE(), 9)
    GROUP BY 
        rider_id, 
        DATE_FORMAT(trip_start_timestamp, 'yyyy-MM-dd')
),
RiderTripCount AS (
    SELECT 
        rider_id, 
        COUNT(DISTINCT trip_date) AS total_days
    FROM 
        TripDates
    GROUP BY 
        rider_id
)
SELECT 
    rider_id
FROM 
    RiderTripCount
WHERE 
    total_days = 10

2. PySpark DataFrame-Based Script

Here’s how you can implement the same logic using PySpark DataFrame API.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format, current_date, datediff
from pyspark.sql import functions as F

# Initialize Spark session
spark = SparkSession.builder.appName("RiderTripAnalysis").getOrCreate()

# Sample data
data = [
    (1, 101, 501, "2023-10-01 08:30:00"),
    (2, 102, 502, "2023-10-02 09:00:00"),
    (3, 101, 501, "2023-10-03 07:45:00"),
    (4, 103, 503, "2023-10-01 12:15:00"),
    (5, 101, 501, "2023-10-04 18:00:00"),
    # Add more sample data here
]

# Create DataFrame
columns = ["trip_id", "driver_id", "rider_id", "trip_start_timestamp"]
df = spark.createDataFrame(data, schema=columns)

# Convert trip_start_timestamp to Date type
df = df.withColumn("trip_start_timestamp", F.to_timestamp(col("trip_start_timestamp")))

# Extract trip date and filter for last 10 days
filtered_df = df.filter(
    col("trip_start_timestamp") >= F.date_sub(F.current_date(), 9)
).withColumn(
    "trip_date", date_format(col("trip_start_timestamp"), "yyyy-MM-dd")
).select("rider_id", "trip_date").distinct()

# Count the number of distinct trip dates per rider
rider_trip_count = filtered_df.groupBy("rider_id").agg(
    F.countDistinct("trip_date").alias("total_days")
)

# Filter riders who have trips every day for the last 10 days
riders_with_trip_every_day = rider_trip_count.filter(col("total_days") == 10)

# Show results
riders_with_trip_every_day.show()
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Initialize Spark session
spark = SparkSession.builder.appName("TripsData").getOrCreate()

# Sample data (expanded for 15 days)
data = [
    (1, 101, 501, "2023-10-01 08:30:00"),
    (2, 102, 502, "2023-10-02 09:00:00"),
    (3, 101, 501, "2023-10-03 07:45:00"),
    (4, 103, 503, "2023-10-01 12:15:00"),
    (5, 101, 501, "2023-10-04 18:00:00"),
    (6, 104, 502, "2023-10-04 13:00:00"),
    (7, 101, 501, "2023-10-05 11:30:00"),
    (8, 103, 503, "2023-10-02 09:45:00"),
    (9, 101, 501, "2023-10-06 14:15:00"),
    (10, 102, 502, "2023-10-05 10:00:00"),
    (11, 101, 501, "2023-10-07 08:00:00"),
    (12, 104, 503, "2023-10-07 15:30:00"),
    (13, 101, 501, "2023-10-08 16:45:00"),
    (14, 102, 502, "2023-10-08 11:15:00"),
    (15, 101, 501, "2023-10-09 08:30:00"),
    (16, 101, 501, "2023-10-10 09:15:00"),
    (17, 104, 502, "2023-10-10 10:30:00"),
    (18, 101, 501, "2023-10-11 17:45:00"),
    (19, 103, 503, "2023-10-12 18:30:00"),
    (20, 101, 501, "2023-10-12 19:15:00"),
    (21, 101, 501, "2023-10-13 07:50:00"),
    (22, 102, 502, "2023-10-13 08:20:00"),
    (23, 101, 501, "2023-10-14 12:00:00"),
    (24, 104, 503, "2023-10-14 16:00:00"),
    (25, 101, 501, "2023-10-15 08:00:00"),
    (26, 102, 502, "2023-10-15 09:00:00")
]

# Define columns
columns = ["trip_id", "driver_id", "rider_id", "trip_start_timestamp"]

# Create DataFrame
df = spark.createDataFrame(data, schema=columns)

# Convert 'trip_start_timestamp' to timestamp type
df = df.withColumn("trip_start_timestamp", F.to_timestamp(F.col("trip_start_timestamp")))

# Show DataFrame
df.show(truncate=False)

3. Python Solution (for Non-Spark Environments)

If you’re working in a non-Spark environment, we can use Python with pandas to solve this problem. Here’s an approach using pandas:

import pandas as pd
from datetime import datetime, timedelta

# Sample Data
data = {
    'trip_id': [1, 2, 3, 4, 5],
    'driver_id': [101, 102, 101, 103, 101],
    'rider_id': [501, 502, 501, 503, 501],
    'trip_start_timestamp': ['2023-10-01 08:30:00', '2023-10-02 09:00:00', '2023-10-03 07:45:00', 
                             '2023-10-01 12:15:00', '2023-10-04 18:00:00']
}

# Create DataFrame
df = pd.DataFrame(data)
df['trip_start_timestamp'] = pd.to_datetime(df['trip_start_timestamp'])

# Get today's date and filter for last 10 days
today = datetime.now()
last_10_days = today - timedelta(days=9)

filtered_df = df[df['trip_start_timestamp'] >= last_10_days]

# Extract the trip date
filtered_df['trip_date'] = filtered_df['trip_start_timestamp'].dt.date

# Group by rider_id and count distinct trip dates
rider_trip_count = filtered_df.groupby('rider_id')['trip_date'].nunique().reset_index()

# Filter riders who have trips every day for the last 10 days
riders_with_trip_every_day = rider_trip_count[rider_trip_count['trip_date'] == 10]

print(riders_with_trip_every_day)

Summary of Solutions:

  • Spark SQL: Use DATE_FORMAT and GROUP BY with COUNT(DISTINCT trip_date) to check riders who have taken trips every day for the last 10 days.
  • PySpark DataFrame API: Use similar logic with PySpark’s to_timestamp, date_sub, and countDistinct functions to implement the same functionality.
  • Python (pandas): If not using Spark, you can still solve this problem using pandas and Python’s datetime module.

These solutions can help identify riders who are consistently active in the last 10 days.

F.to_timestamp(col(“trip_start_timestamp”)) what is it for

F.to_timestamp(col("trip_start_timestamp")) in PySpark is used to convert a string column into a TimestampType. This is useful when you have a date or timestamp in string format (e.g., "2023-10-01 08:30:00") and need to work with it as a proper timestamp for performing date/time-related operations such as filtering, aggregation, or comparisons.

Here’s a breakdown of the components:

  • F.to_timestamp(): This is a function from pyspark.sql.functions that converts a string to a timestamp.
  • col("trip_start_timestamp"): This extracts the column named trip_start_timestamp from the DataFrame.

Example:

Assume you have a column trip_start_timestamp in string format like this:

"2023-10-01 08:30:00"

By using F.to_timestamp, you can convert this string into a timestamp that allows Spark to recognize and manipulate it as a date-time object.

Why Use F.to_timestamp?

  1. String to Timestamp Conversion: If your date or time data is stored as strings, converting it to timestamp allows you to use Spark’s built-in date and time functions like date_sub, date_add, year, month, etc.
  2. Filtering: You can filter records based on time periods (e.g., the last 10 days) only if the date is in the correct timestamp format.
  3. Aggregation: If you need to group by or aggregate data based on date or time, the column needs to be in timestamp format.

Example of How It Works:

pythonCopy codefrom pyspark.sql import functions as F
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("Example").getOrCreate()

# Sample data
data = [
    (1, "2023-10-01 08:30:00"),
    (2, "2023-10-02 09:00:00"),
]

# Create DataFrame
columns = ["trip_id", "trip_start_timestamp"]
df = spark.createDataFrame(data, columns)

# Convert 'trip_start_timestamp' from string to timestamp
df = df.withColumn("trip_start_timestamp", F.to_timestamp(F.col("trip_start_timestamp")))

# Show the result
df.show(truncate=False)

Output:

+-------+-------------------+
|trip_id|trip_start_timestamp|
+-------+-------------------+
|1      |2023-10-01 08:30:00 |
|2      |2023-10-02 09:00:00 |
+-------+-------------------+

Now, trip_start_timestamp is recognized as a timestamp type, and you can use it for date and time operations.

similar function as to_timestamp in pyspark

In PySpark, aside from F.to_timestamp(), there are several other functions that deal with date and time conversion or manipulation. Here are some similar functions:

1. to_date()

Purpose: Converts a string column into a DateType (which only has the date part, without time).

Usage: F.to_date(F.col("string_column"), "format")

Example:

from pyspark.sql import functions as F df = df.withColumn("date_column", F.to_date(F.col("string_column"), "yyyy-MM-dd")) #If you have a column like  "2023-10-01 08:30:00" and you only want the date part ("2023-10-01"), you can use to_date().

2. unix_timestamp()

Purpose: Converts a date or timestamp to Unix epoch time (i.e., the number of seconds since January 1, 1970).

Usage: F.unix_timestamp(F.col("string_column"), "format")

Example:

df = df.withColumn("unix_time", F.unix_timestamp(F.col("trip_start_timestamp"), "yyyy-MM-dd HH:mm:ss"))

This will give you the number of seconds since 1970-01-01 for each timestamp in trip_start_timestamp.

3. from_unixtime()

Purpose: Converts Unix time (seconds since 1970-01-01) back to a readable timestamp.

Usage: F.from_unixtime(F.col("unix_column"))

Example:

df = df.withColumn("timestamp_from_unix", F.from_unixtime(F.col("unix_column")))

4. date_format()

Purpose: Formats a TimestampType or DateType column into a specific string format.

Usage: F.date_format(F.col("date_column"), "format")

Example:

df = df.withColumn("formatted_date", F.date_format(F.col("date_column"), "yyyy-MM-dd")) #This is useful if you want to change the date or timestamp format in the output.

5. to_utc_timestamp()

Purpose: Converts a timestamp column into UTC from any given time zone.

Usage: F.to_utc_timestamp(F.col("timestamp_column"), "timezone")

Example:

df = df.withColumn("utc_time", F.to_utc_timestamp(F.col("timestamp_column"), "America/New_York"))

6. from_utc_timestamp()

Purpose: Converts UTC timestamp to a specific time zone.

Usage: F.from_utc_timestamp(F.col("utc_column"), "timezone")

Example:

df = df.withColumn("local_time", F.from_utc_timestamp(F.col("utc_column"), "America/New_York"))

7. cast()

Purpose: Another way to convert a column into a different data type, including date and timestamp types.

Usage: F.col("string_column").cast("timestamp")

Example

df = df.withColumn("timestamp_column", F.col("string_column").cast("timestamp"))

Example Comparison for Similar Functions:

pythonCopy codefrom pyspark.sql import functions as F

# Assume df has a string timestamp column
df = df.withColumn("to_timestamp", F.to_timestamp(F.col("string_column"), "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn("to_date", F.to_date(F.col("string_column"), "yyyy-MM-dd"))
df = df.withColumn("unix_timestamp", F.unix_timestamp(F.col("string_column"), "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn("from_unix", F.from_unixtime(F.col("unix_timestamp")))
df = df.withColumn("formatted_date", F.date_format(F.col("to_timestamp"), "yyyy-MM-dd"))

Each of these functions serves a slightly different purpose, but all of them help in converting or manipulating date and time values in PySpark.


Spark SQL Query

SELECT rider_id
FROM (
  SELECT rider_id, DATE(trip_start_timestamp) AS trip_date
  FROM Trip
  WHERE trip_start_timestamp >= CURRENT_DATE - INTERVAL 10 DAYS
  GROUP BY rider_id, DATE(trip_start_timestamp)
)
GROUP BY rider_id
HAVING COUNT(DISTINCT trip_date) = 10;

PySpark DataFrame Script

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Create SparkSession
spark = SparkSession.builder.appName("Daily Riders").getOrCreate()

# Load Trip data
trip_df = spark.read.table("Trip")

# Filter last 10 days
last_10_days_df = trip_df.filter(trip_df.trip_start_timestamp >= F.current_date() - F.expr("INTERVAL 10 DAYS"))

# Group by rider_id and trip_date
daily_riders_df = last_10_days_df \
  .groupBy("rider_id", F.date_trunc("day", "trip_start_timestamp").alias("trip_date")) \
  .count()

# Identify riders with daily trips
consistent_riders_df = daily_riders_df \
  .groupBy("rider_id") \
  .count() \
  .filter(F.col("count") == 10)

# Display results
consistent_riders_df.show()

Python Script using Pandas

Python

import pandas as pd

# Load Trip data
trip_df = pd.read_csv("trip_data.csv")

# Convert trip_start_timestamp to datetime
trip_df['trip_start_timestamp'] = pd.to_datetime(trip_df['trip_start_timestamp'])

# Filter last 10 days
last_10_days_df = trip_df[(trip_df['trip_start_timestamp'].dt.date >= pd.Timestamp('today').date() - pd.Timedelta(days=10))]

# Group by rider_id and trip_date
daily_riders_df = last_10_days_df.groupby(['rider_id', pd.Grouper(key='trip_start_timestamp', freq='D')]).size().reset_index()

# Identify riders with daily trips
consistent_riders_df = daily_riders_df.groupby('rider_id').size().reset_index()
consistent_riders_df = consistent_riders_df[consistent_riders_df[0] == 10]

# Display results
print(consistent_riders_df)

Explanation

  1. Filter trips from the last 10 days.
  2. Group trips by rider_id and trip_date ( truncated to day level).
  3. Count distinct trip_date for each rider_id.
  4. Identify rider_id with count equal to 10 (daily trips).

Advice

  1. Optimize performance using Spark’s built-in functions.
  2. Utilize Pandas for smaller datasets.
  3. Verify data integrity before analysis.

Additional Resources


2.Write a spark SQL query and pyspark dataframe based script and a coding sollution in python to identify riders who have taken at least one trip every day for the 10 days continuous in last six Months?(Modifications to Q1)

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Initialize Spark session
spark = SparkSession.builder.appName("TripsData").getOrCreate()

# Sample data (expanded for 15 days)
data = [
    (1, 101, 501, "2023-10-01 08:30:00"),
    (2, 102, 502, "2023-10-02 09:00:00"),
    (3, 101, 501, "2023-10-03 07:45:00"),
    (4, 103, 503, "2023-10-01 12:15:00"),
    (5, 101, 501, "2023-10-04 18:00:00"),
    (6, 104, 502, "2023-10-04 13:00:00"),
    (7, 101, 501, "2023-10-05 11:30:00"),
    (8, 103, 503, "2023-10-02 09:45:00"),
    (9, 101, 501, "2023-10-06 14:15:00"),
    (10, 102, 502, "2023-10-05 10:00:00"),
    (11, 101, 501, "2023-10-07 08:00:00"),
    (12, 104, 503, "2023-10-07 15:30:00"),
    (13, 101, 501, "2023-10-08 16:45:00"),
    (14, 102, 502, "2023-10-08 11:15:00"),
    (15, 101, 501, "2023-10-09 08:30:00"),
    (16, 101, 501, "2023-10-10 09:15:00"),
    (17, 104, 502, "2023-10-10 10:30:00"),
    (18, 101, 501, "2023-10-11 17:45:00"),
    (19, 103, 503, "2023-10-12 18:30:00"),
    (20, 101, 501, "2023-10-12 19:15:00"),
    (21, 101, 501, "2023-10-13 07:50:00"),
    (22, 102, 502, "2023-10-13 08:20:00"),
    (23, 101, 501, "2023-10-14 12:00:00"),
    (24, 104, 503, "2023-10-14 16:00:00"),
    (25, 101, 501, "2023-10-15 08:00:00"),
    (26, 102, 502, "2023-10-15 09:00:00")
]

# Define columns
columns = ["trip_id", "driver_id", "rider_id", "trip_start_timestamp"]

# Create DataFrame
df = spark.createDataFrame(data, schema=columns)

# Convert 'trip_start_timestamp' to timestamp type
df = df.withColumn("trip_start_timestamp", F.to_timestamp(F.col("trip_start_timestamp")))

# Show DataFrame
df.show(truncate=False)

To solve this problem, we need to identify riders who have taken at least one trip every day continuously for 10 days within the last six months.

Approach:

  1. Convert the trip_start_timestamp to a date format.
  2. Filter the data for the last six months.
  3. Identify the continuous trips taken by riders for at least 10 consecutive days.
  4. Use window functions to identify the consecutive trip days per rider.
  5. Calculate the difference between the current trip day and the first day in a sequence for each rider and identify the ones with 10 consecutive days.

1. Spark SQL Query

First, register the DataFrame as a temporary SQL table and run the query.

# Register DataFrame as a temporary SQL view
df.createOrReplaceTempView("trip")

# Spark SQL query to identify riders with at least one trip every day for 10 consecutive days in the last six months
query = """
WITH trip_days AS (
    SELECT 
        rider_id,
        DATE(trip_start_timestamp) as trip_date
    FROM 
        trip
    WHERE 
        trip_start_timestamp >= ADD_MONTHS(CURRENT_DATE(), -6)
    GROUP BY 
        rider_id, DATE(trip_start_timestamp)
),
trip_with_lag AS (
    SELECT 
        rider_id, 
        trip_date,
        DATEDIFF(trip_date, LAG(trip_date, 1) OVER (PARTITION BY rider_id ORDER BY trip_date)) as day_diff
    FROM 
        trip_days
),
trip_streaks AS (
    SELECT
        rider_id,
        trip_date,
        SUM(CASE WHEN day_diff = 1 OR day_diff IS NULL THEN 0 ELSE 1 END) OVER (PARTITION BY rider_id ORDER BY trip_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS streak_group
    FROM
        trip_with_lag
),
consecutive_trips AS (
    SELECT 
        rider_id,
        COUNT(*) AS consecutive_days
    FROM 
        trip_streaks
    GROUP BY 
        rider_id, streak_group
    HAVING 
        consecutive_days >= 10
)
SELECT DISTINCT rider_id
FROM consecutive_trips
"""

# Run the query
result_sql_df = spark.sql(query)

# Show the results
result_sql_df.show()

2. PySpark DataFrame Based Script

Below is the solution using the PySpark DataFrame API.

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Filter trips for the last 6 months
df_filtered = df.filter(F.col("trip_start_timestamp") >= F.add_months(F.current_date(), -6))

# Extract the trip date from the timestamp
df_filtered = df_filtered.withColumn("trip_date", F.to_date(F.col("trip_start_timestamp")))

# Remove duplicate trips per day per rider
df_filtered = df_filtered.dropDuplicates(["rider_id", "trip_date"])

# Window to calculate the difference between consecutive days for each rider
window_spec = Window.partitionBy("rider_id").orderBy("trip_date")

# Calculate the difference in days between consecutive trips for each rider
df_with_lag = df_filtered.withColumn("day_diff", F.datediff(F.col("trip_date"), F.lag("trip_date").over(window_spec)))

# Assign streak groups where the day_diff is more than 1 or null (start of a new streak)
df_with_streaks = df_with_lag.withColumn(
    "streak_group", 
    F.sum(F.when((F.col("day_diff") > 1) | F.col("day_diff").isNull(), 1).otherwise(0)).over(window_spec)
)

# Count consecutive days for each streak
df_consecutive = df_with_streaks.groupBy("rider_id", "streak_group").agg(F.count("trip_date").alias("consecutive_days"))

# Filter for riders with at least 10 consecutive days
result_df = df_consecutive.filter(F.col("consecutive_days") >= 10).select("rider_id").distinct()

# Show the result
result_df.show()

3. Python Solution Using Pandas

In Python, you can use pandas to solve this problem, assuming the data is loaded into a DataFrame.

import pandas as pd
from datetime import datetime, timedelta

# Sample data
data = {
    'trip_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12],
    'rider_id': [501, 502, 501, 503, 501, 502, 501, 503, 501, 502, 501, 503],
    'trip_start_timestamp': [
        "2023-10-01 08:30:00", "2023-10-02 09:00:00", "2023-10-03 07:45:00",
        "2023-10-01 12:15:00", "2023-10-04 18:00:00", "2023-10-04 13:00:00",
        "2023-10-05 11:30:00", "2023-10-02 09:45:00", "2023-10-06 14:15:00",
        "2023-10-05 10:00:00", "2023-10-07 08:00:00", "2023-10-07 15:30:00"
    ]
}

# Create DataFrame
df = pd.DataFrame(data)
df['trip_start_timestamp'] = pd.to_datetime(df['trip_start_timestamp'])

# Filter trips for the last 6 months
six_months_ago = datetime.now() - timedelta(days=6*30)
df_filtered = df[df['trip_start_timestamp'] >= six_months_ago]

# Extract date from timestamp
df_filtered['trip_date'] = df_filtered['trip_start_timestamp'].dt.date

# Remove duplicate trips per day per rider
df_filtered = df_filtered.drop_duplicates(subset=['rider_id', 'trip_date'])

# Calculate the difference between consecutive days for each rider
df_filtered['day_diff'] = df_filtered.groupby('rider_id')['trip_date'].diff().dt.days

# Group by streaks
df_filtered['streak_group'] = (df_filtered['day_diff'] > 1) | df_filtered['day_diff'].isna()
df_filtered['streak_group'] = df_filtered.groupby('rider_id')['streak_group'].cumsum()

# Count consecutive days for each streak
streak_counts = df_filtered.groupby(['rider_id', 'streak_group']).size().reset_index(name='consecutive_days')

# Filter for riders with at least 10 consecutive days
result = streak_counts[streak_counts['consecutive_days'] >= 10]['rider_id'].unique()

# Output the result
print(result)

Explanation:

  • Spark SQL and PySpark DataFrame scripts use window functions to identify and group consecutive days.
  • We filter for riders who have trips in the last six months and group consecutive streaks of trips.
  • Riders who have at least 10 consecutive days of trips are identified.

3. to compare the time taken for clients who placed their first order with an offer versus those without an offer to make their next order.

Table Name: Orders
Column Names: order_id (Order ID), user_id (User ID), is_offer (Is Offer), Date_Timestamp (Date Timestamp) using pyspark sql, pyspark dataframe, and python

To solve this problem, we’ll follow the below approach:

  1. Sample Data Generation: First, generate the Orders table data with columns such as order_id, user_id, is_offer, and date_timestamp.
  2. Calculate the time between the first order and the next order for each user who placed an order.
  3. Group and compare users who placed their first order with an offer vs. without an offer.

The solution will be provided using:

  • PySpark SQL Query
  • PySpark DataFrame API
  • Python solution with Pandas

1. Sample Data Generation

Let’s generate sample data to represent the Orders table:

data = [
    (1, 101, 1, "2023-01-01 10:00:00"),  # User 101, first order with offer
    (2, 101, 0, "2023-01-05 09:30:00"),  # User 101, next order without offer
    (3, 102, 0, "2023-01-02 11:15:00"),  # User 102, first order without offer
    (4, 102, 1, "2023-01-10 08:45:00"),  # User 102, next order with offer
    (5, 103, 1, "2023-01-03 15:20:00"),  # User 103, first order with offer
    (6, 103, 0, "2023-01-08 16:00:00"),  # User 103, next order without offer
    (7, 104, 0, "2023-01-04 14:10:00"),  # User 104, first order without offer
    (8, 104, 0, "2023-01-15 09:25:00")   # User 104, next order without offer
]

columns = ["order_id", "user_id", "is_offer", "date_timestamp"]

# Create PySpark DataFrame
df = spark.createDataFrame(data, schema=columns)

2. PySpark SQL Query

# Register DataFrame as a temporary SQL view
df.createOrReplaceTempView("orders")

# SQL query to calculate time difference between first order and the next for users with and without offers
query = """
WITH first_orders AS (
    SELECT 
        user_id, 
        is_offer, 
        MIN(date_timestamp) AS first_order_time
    FROM 
        orders
    GROUP BY 
        user_id, is_offer
),
next_orders AS (
    SELECT 
        o.user_id, 
        o.date_timestamp AS next_order_time,
        f.is_offer,
        f.first_order_time
    FROM 
        orders o
    INNER JOIN 
        first_orders f 
        ON o.user_id = f.user_id AND o.date_timestamp > f.first_order_time
)
SELECT 
    f.user_id,
    f.is_offer,
    MIN(next_order_time) AS next_order_time,
    DATEDIFF(MIN(next_order_time), f.first_order_time) AS days_to_next_order
FROM 
    first_orders f
JOIN 
    next_orders n
ON 
    f.user_id = n.user_id AND f.is_offer = n.is_offer
GROUP BY 
    f.user_id, f.is_offer, f.first_order_time
ORDER BY 
    f.user_id;
"""

# Run the SQL query
result_sql = spark.sql(query)
result_sql.show()

3. PySpark DataFrame API

Here’s how you can achieve the same result using PySpark DataFrame API.

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Convert date_timestamp to a timestamp type
df = df.withColumn("date_timestamp", F.to_timestamp("date_timestamp"))

# Get the first order time per user
window_spec = Window.partitionBy("user_id").orderBy("date_timestamp")
df_first_order = df.withColumn("first_order_time", F.first("date_timestamp").over(window_spec))

# Filter for first orders only
first_orders = df.filter(F.col("date_timestamp") == F.col("first_order_time"))

# Get next orders for each user after their first order
next_orders = df.alias("o").join(first_orders.alias("f"), (F.col("o.user_id") == F.col("f.user_id")) & (F.col("o.date_timestamp") > F.col("f.date_timestamp")))

# Calculate the time difference between first and next orders
result_df = next_orders.groupBy("o.user_id", "f.is_offer").agg(
    F.min("o.date_timestamp").alias("next_order_time"),
    F.datediff(F.min("o.date_timestamp"), F.col("f.first_order_time")).alias("days_to_next_order")
)

# Show the results
result_df.show()

4. Python Solution (Pandas)

Here’s the equivalent solution using pandas in Python:

import pandas as pd
from datetime import datetime

# Sample data
data = {
    'order_id': [1, 2, 3, 4, 5, 6, 7, 8],
    'user_id': [101, 101, 102, 102, 103, 103, 104, 104],
    'is_offer': [1, 0, 0, 1, 1, 0, 0, 0],
    'date_timestamp': [
        "2023-01-01 10:00:00", "2023-01-05 09:30:00", "2023-01-02 11:15:00",
        "2023-01-10 08:45:00", "2023-01-03 15:20:00", "2023-01-08 16:00:00",
        "2023-01-04 14:10:00", "2023-01-15 09:25:00"
    ]
}

# Create DataFrame
df = pd.DataFrame(data)
df['date_timestamp'] = pd.to_datetime(df['date_timestamp'])

# Get first order time for each user
df['first_order_time'] = df.groupby('user_id')['date_timestamp'].transform('min')

# Filter for first orders
first_orders = df[df['date_timestamp'] == df['first_order_time']]

# Get next orders for each user after their first order
next_orders = df[df['date_timestamp'] > df['first_order_time']]

# Merge first and next orders
merged_orders = first_orders[['user_id', 'is_offer', 'first_order_time']].merge(
    next_orders[['user_id', 'date_timestamp']],
    on='user_id'
)

# Calculate the time difference
merged_orders['days_to_next_order'] = (merged_orders['date_timestamp'] - merged_orders['first_order_time']).dt.days

# Show the result
print(merged_orders)

Explanation:

  • PySpark SQL Query: We used window functions to get the first order for each user and joined with other orders to get the next order after the first order.
  • PySpark DataFrame API: We used the PySpark functions min, datediff, and first with window specifications to replicate the same logic as in SQL.
  • Python Pandas: A straightforward approach using pandas for a Python-based solution, leveraging groupby and date differences to achieve the desired result.

4.Write a spark SQL query and pyspark dataframe based script and a coding sollution in python to find Longest Palindromic Substring?

PySpark DataFrame Approach:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Create SparkSession
spark = SparkSession.builder.appName("Longest Palindromic Substring").getOrCreate()

# Input string
s = "babad"

# Generate all substrings
substrings = []
for i in range(len(s)):
    for j in range(i+1, len(s)+1):
        substrings.append((s[i:j], len(s[i:j])))

# Create DataFrame
substr_df = spark.createDataFrame(substrings, ["substr", "len"])

# Filter palindromic substrings
palindromes_df = substr_df.filter(substr_df.substr == F.reverse(substr_df.substr))

# Find longest palindromic substring
longest_palindrome = palindromes_df.orderBy(F.col("len").desc()).first()

print(longest_palindrome.substr)
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Initialize Spark session
spark = SparkSession.builder.appName("LongestPalindrome").getOrCreate()

# Sample data
data = [("babad",), ("cbbd",), ("a",), ("ac",), ("racecar",)]
df = spark.createDataFrame(data, ["input_string"])

# Define the UDF to find the longest palindromic substring
def find_longest_palindrome(s):
    def expand_from_center(left, right):
        while left >= 0 and right < len(s) and s[left] == s[right]:
            left -= 1
            right += 1
        return s[left+1:right]

    longest_palindrome = ""
    for i in range(len(s)):
        odd_palindrome = expand_from_center(i, i)
        even_palindrome = expand_from_center(i, i+1)
        longest_palindrome = max(longest_palindrome, odd_palindrome, even_palindrome, key=len)
    return longest_palindrome

# Register the UDF
find_longest_palindrome_udf = udf(find_longest_palindrome, StringType())

# Apply the UDF on the DataFrame
df_with_palindrome = df.withColumn("longest_palindrome", find_longest_palindrome_udf("input_string"))

# Show the result
df_with_palindrome.show(truncate=False)

Python Approach:

def longest_palindromic_substring(s):
# Function to expand around the center and find the longest palindrome
def expand_from_center(left, right):
while left >= 0 and right < len(s) and s[left] == s[right]:
left -= 1
right += 1
return s[left+1:right]
longest_palindrome = ""

for i in range(len(s)):
    # Odd-length palindrome
    odd_palindrome = expand_from_center(i, i)

    # Even-length palindrome
    even_palindrome = expand_from_center(i, i+1)

    # Update the longest palindrome
    longest_palindrome = max(longest_palindrome, odd_palindrome, even_palindrome, key=len)

return longest_palindrome

Test the function

input_string = “babad”
print(“Longest Palindromic Substring:”, longest_palindromic_substring(input_string))

Trending