Coding Questions in Spark SQL, Pyspark, and Python

Challenging Question on Website Visits /Pageviews

You are given a table website_visits with the following columns:

Column NameData TypeDescription
visit_idintUnique visit ID
user_idintUnique user ID
page_idintUnique page ID
visit_datedateDate of visit
visit_timetimestampTimestamp of visit
page_view_timeintTime spent on page (in seconds)

Your task is to write a SQL query that calculates the following metrics:

  1. Total number of visits
  2. Total number of unique users
  3. Total number of page views
  4. Average time spent on each page
  5. Top 3 pages with the most page views

Additional constraints:

  • A user can visit multiple pages in a single visit.
  • A page can be visited multiple times by the same user.
  • The page_view_time column only records the time spent on each page, not the total time spent on the website.

—–=====================================

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# create a SparkSession
spark = SparkSession.builder.appName("Website Visits").getOrCreate()

# assume we have a DataFrame called 'website_visits'
website_visits = spark.createDataFrame([
    (1, 1, 1, "2022-01-01", "2022-01-01 12:00:00", 30),
    (1, 1, 2, "2022-01-01", "2022-01-01 12:05:00", 20),
    (2, 2, 1, "2022-01-02", "2022-01-02 13:00:00", 40),
    (3, 3, 2, "2022-01-03", "2022-01-03 14:00:00", 10),
    (3, 3, 3, "2022-01-03", "2022-01-03 14:10:00", 50),
    (4, 1, 1, "2022-01-04", "2022-01-04 15:00:00", 35)
], ["visit_id", "user_id", "page_id", "visit_date", "visit_time", "page_view_time"])

# register the DataFrame as a temporary view
website_visits.createOrReplaceTempView("website_visits")

# calculate the metrics
metrics = spark.sql("""
    SELECT
        COUNT(DISTINCT visit_id) AS total_visits,
        COUNT(DISTINCT user_id) AS total_unique_users,
        COUNT(page_id) AS total_page_views,
        AVG(page_view_time) AS avg_time_spent_per_page
    FROM
        website_visits
""")

# calculate the top 3 pages with the most page views
top_pages = spark.sql("""
    SELECT
        page_id,
        COUNT(page_id) AS page_views
    FROM
        website_visits
    GROUP BY
        page_id
    ORDER BY
        page_views DESC
    LIMIT 3
""")

# print the results
metrics.show()
top_pages.show()

This code calculates the required metrics and prints the results. The metrics DataFrame contains the total number of visits, unique users, page views, and average time spent per page. The top_pages DataFrame contains the top 3 pages with the most page views.

Here’s the solution using PySpark DataFrame API:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# create a SparkSession
spark = SparkSession.builder.appName("Website Visits").getOrCreate()

# assume we have a DataFrame called 'website_visits'
website_visits = spark.createDataFrame([
    (1, 1, 1, "2022-01-01", "2022-01-01 12:00:00", 30),
    (1, 1, 2, "2022-01-01", "2022-01-01 12:05:00", 20),
    (2, 2, 1, "2022-01-02", "2022-01-02 13:00:00", 40),
    (3, 3, 2, "2022-01-03", "2022-01-03 14:00:00", 10),
    (3, 3, 3, "2022-01-03", "2022-01-03 14:10:00", 50),
    (4, 1, 1, "2022-01-04", "2022-01-04 15:00:00", 35)
], ["visit_id", "user_id", "page_id", "visit_date", "visit_time", "page_view_time"])

# calculate the metrics
metrics = website_visits.agg(
    F.countDistinct("visit_id").alias("total_visits"),
    F.countDistinct("user_id").alias("total_unique_users"),
    F.count("page_id").alias("total_page_views"),
    F.avg("page_view_time").alias("avg_time_spent_per_page")
)

# calculate the top 3 pages with the most page views
top_pages = website_visits.groupBy("page_id").count().orderBy("count", ascending=False).limit(3)

# print the results
metrics.show()
top_pages.show()

How It Is Done in Google Analytics

Google Analytics (GA) calculates metrics like genuine page views, impressions, and ad revenue using several techniques:

1. Genuine Page Views for Ad Impressions:

  • Filter Out Bot Traffic: GA uses filters and machine learning to identify and exclude bot traffic.
  • User Sessions: Page views are tied to active user sessions, ensuring repeated views in short intervals (e.g., <30s) by the same user are not double-counted.
  • Bounce Rate: If a user leaves the page immediately, the page view might not qualify as an impression.

2. Impressions for Ad Revenue:

  • Qualified Impressions: An ad impression is counted only if the ad is actually rendered on the user’s screen. This is tracked using viewport metrics and engagement thresholds (e.g., 50% of the ad visible for at least 1 second).
  • Viewability Metrics: Ads that are scrolled past too quickly or hidden due to ad blockers are not counted.
  • Attribution Models: Revenue is calculated based on attribution models like last-click, first-click, or linear, to determine which pages and ads contributed to conversions.

3. Tracking Using Event Tags:

  • Pageview Events: GA tracks every page load as a “pageview” event.
  • Scroll Depth: To ensure a genuine view, scroll depth tracking verifies user interaction with the page.
  • Engagement Time: Uses the page_view_time equivalent to determine active engagement versus idle time.

SQL vs. Google Analytics

In SQL, you analyze raw data to compute metrics without assumptions about user behavior (e.g., bot filtering, viewport visibility). In Google Analytics, predefined metrics and filters ensure the data reflects actual user engagement and advertiser requirements.

User Sessions: Page views are tied to active user sessions, ensuring repeated views in short intervals (e.g., <30s) by the same user are not double-counted.
Bounce Rate: If a user leaves the page immediately, the page view might not qualify as an impression.

How to add implement this solution in Pyspark sql / Dataframe api

To incorporate the concepts of user sessions and bounce rate into the SQL logic using PySpark SQL:


Assumptions:

  1. User Sessions: A new session starts when:
    • A user’s consecutive visit_time exceeds 30 minutes from the previous visit.
    • Otherwise, all actions within 30 minutes are part of the same session.
  2. Bounce Rate: A visit qualifies as a bounce if:
    • Only one page is viewed in the session.

Modified PySpark SQL Code

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, when, count, avg, sum as _sum, lit
from pyspark.sql.window import Window

# Initialize Spark Session
spark = SparkSession.builder.master("local").appName("WebsiteAnalytics").getOrCreate()

# Sample DataFrame
data = [
    (1, 101, 201, '2025-01-01', '2025-01-01 10:00:00', 300),
    (2, 101, 202, '2025-01-01', '2025-01-01 10:25:00', 200),
    (3, 101, 203, '2025-01-01', '2025-01-01 12:00:00', 100),
    (4, 102, 201, '2025-01-01', '2025-01-01 11:00:00', 400),
    (5, 103, 204, '2025-01-01', '2025-01-01 11:05:00', 150),
    (6, 103, 205, '2025-01-01', '2025-01-01 11:10:00', 250),
]
columns = ['visit_id', 'user_id', 'page_id', 'visit_date', 'visit_time', 'page_view_time']

df = spark.createDataFrame(data, columns)

# Convert visit_time to timestamp
df = df.withColumn("visit_time", col("visit_time").cast("timestamp"))

# Step 1: Assign session IDs based on 30-minute inactivity
window_user = Window.partitionBy("user_id").orderBy("visit_time")

df = df.withColumn(
    "prev_visit_time", lag("visit_time").over(window_user)
).withColumn(
    "new_session", when(
        (col("visit_time").cast("long") - col("prev_visit_time").cast("long")) > 1800, 1
    ).otherwise(0)
).withColumn(
    "session_id", _sum("new_session").over(window_user)
)

# Step 2: Calculate metrics for user sessions
session_metrics = df.groupBy("user_id", "session_id").agg(
    count("page_id").alias("pages_per_session"),
    _sum("page_view_time").alias("total_time_spent")
)

# Step 3: Add Bounce Rate (Single Page Sessions)
session_metrics = session_metrics.withColumn(
    "is_bounce", when(col("pages_per_session") == 1, 1).otherwise(0)
)

# Step 4: Calculate Overall Metrics
final_metrics = session_metrics.groupBy().agg(
    count("session_id").alias("total_sessions"),
    _sum("is_bounce").alias("total_bounces"),
    (avg("total_time_spent") / lit(60)).alias("avg_session_time_in_minutes"),
    (col("total_bounces") / col("total_sessions") * 100).alias("bounce_rate_percentage")
)

# Step 5: Add Page Views and Top Pages
top_pages = df.groupBy("page_id").agg(
    count("visit_id").alias("page_view_count")
).orderBy(col("page_view_count").desc())

# Show results
final_metrics.show(truncate=False)
top_pages.show(truncate=False)

Explanation

  1. User Sessions:
    • The session_id is assigned based on a 30-minute inactivity threshold. If the difference between the current and previous visit_time exceeds 30 minutes, a new session starts.
  2. Bounce Rate:
    • Sessions with only one page view (pages_per_session = 1) are flagged as bounces.
  3. Overall Metrics:
    • Total sessions.
    • Bounce rate percentage as (total_bounces / total_sessions) * 100.
    • Average session duration in minutes (avg(total_time_spent)).
  4. Top Pages:
    • Aggregated by page_id to find the top pages based on page_view_count.

Output Example

Final Metrics:

+--------------+-------------+-------------------------+------------------+
|total_sessions|total_bounces|avg_session_time_in_minutes|bounce_rate_percentage|
+--------------+-------------+-------------------------+------------------+
|4             |2            |8.75                    |50.0              |
+--------------+-------------+-------------------------+------------------+

Top Pages:

+-------+---------------+
|page_id|page_view_count|
+-------+---------------+
|201    |2              |
|202    |1              |
|203    |1              |
|204    |1              |
|205    |1              |
+-------+---------------+

To incorporate the user sessions and bounce rate logic into the SQL query and PySpark (both SQL and DataFrame API), we need to adjust the logic to:

  1. Handle User Sessions:
    • Identify and group visits into sessions based on time gaps (e.g., if the gap between consecutive page views by the same user is greater than 30 seconds, it starts a new session).
    • Use the visit_time column to calculate these sessions.
  2. Handle Bounce Rate:
    • A “bounce” occurs if a user views only one page in a session or exits immediately.
    • Exclude such bounces from impressions.

Modified Logic in PySpark SQL

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, countDistinct, count, sum, when, avg
from pyspark.sql.window import Window

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("WebsiteAnalytics").getOrCreate()

# Sample DataFrames
data = [
    (1, 1, 101, '2025-01-01', '2025-01-01 10:00:00', 30),
    (2, 1, 102, '2025-01-01', '2025-01-01 10:00:25', 20),
    (3, 1, 103, '2025-01-01', '2025-01-01 10:01:00', 40),
    (4, 2, 101, '2025-01-01', '2025-01-01 11:00:00', 15),
    (5, 3, 101, '2025-01-01', '2025-01-01 12:00:00', 10)
]
columns = ["visit_id", "user_id", "page_id", "visit_date", "visit_time", "page_view_time"]

df = spark.createDataFrame(data, columns)

# Step 1: Define session logic
window_spec = Window.partitionBy("user_id").orderBy("visit_time")
df_with_sessions = df.withColumn(
    "prev_time",
    lag("visit_time").over(window_spec)
).withColumn(
    "session_id",
    when(
        col("prev_time").isNull() | 
        (col("visit_time").cast("long") - col("prev_time").cast("long") > 30),
        col("visit_id")
    ).otherwise(None)
)

# Fill session_id with forward fill logic
from pyspark.sql.functions import last
df_with_sessions = df_with_sessions.withColumn(
    "session_id",
    last("session_id", ignorenulls=True).over(window_spec)
)

# Step 2: Add bounce rate logic
df_with_bounce = df_with_sessions.groupBy("session_id").agg(
    count("page_id").alias("pages_in_session")
).withColumn(
    "is_bounce",
    when(col("pages_in_session") == 1, 1).otherwise(0)
)

# Step 3: Join bounce info back to main DataFrame
df_final = df_with_sessions.join(
    df_with_bounce, "session_id", "inner"
)

# Step 4: Calculate metrics
metrics = df_final.filter(col("is_bounce") == 0).groupBy().agg(
    countDistinct("visit_id").alias("total_visits"),
    countDistinct("user_id").alias("total_unique_users"),
    count("page_id").alias("total_page_views"),
    avg("page_view_time").alias("avg_time_per_page")
)

# Step 5: Top 3 pages by page views
top_pages = df_final.filter(col("is_bounce") == 0).groupBy("page_id").count().orderBy(col("count").desc()).limit(3)

metrics.show()
top_pages.show()

Explanation

  1. User Sessions:
    • Use lag() to calculate the time difference between consecutive visits for the same user.
    • If the time gap exceeds 30 seconds, start a new session by assigning a new session_id.
  2. Bounce Rate:
    • Count the number of pages viewed in each session.
    • Mark sessions with only one page view as a bounce (is_bounce = 1).
  3. Metrics and Exclusion:
    • Filter out bounces (is_bounce = 0) to calculate genuine visits and impressions.
    • Aggregate metrics such as total visits, unique users, and average time per page.
  4. Top Pages:
    • Count page views excluding bounces and rank them to find the top 3 pages.

Equivalent SQL Query

If you prefer SQL:

WITH session_data AS (
  SELECT
    *,
    LAG(visit_time) OVER (PARTITION BY user_id ORDER BY visit_time) AS prev_time,
    CASE
      WHEN LAG(visit_time) OVER (PARTITION BY user_id ORDER BY visit_time) IS NULL
           OR visit_time - LAG(visit_time) OVER (PARTITION BY user_id ORDER BY visit_time) > 30 THEN visit_id
      ELSE NULL
    END AS session_id
  FROM website_visits
),
filled_sessions AS (
  SELECT
    *,
    LAST_VALUE(session_id IGNORE NULLS) OVER (PARTITION BY user_id ORDER BY visit_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS session_id_filled
  FROM session_data
),
bounce_data AS (
  SELECT
    session_id_filled AS session_id,
    COUNT(page_id) AS pages_in_session,
    CASE WHEN COUNT(page_id) = 1 THEN 1 ELSE 0 END AS is_bounce
  FROM filled_sessions
  GROUP BY session_id_filled
),
final_data AS (
  SELECT
    f.*,
    b.is_bounce
  FROM filled_sessions f
  JOIN bounce_data b ON f.session_id_filled = b.session_id
),
metrics AS (
  SELECT
    COUNT(DISTINCT visit_id) AS total_visits,
    COUNT(DISTINCT user_id) AS total_unique_users,
    COUNT(page_id) AS total_page_views,
    AVG(page_view_time) AS avg_time_per_page
  FROM final_data
  WHERE is_bounce = 0
),
top_pages AS (
  SELECT
    page_id,
    COUNT(*) AS page_view_count
  FROM final_data
  WHERE is_bounce = 0
  GROUP BY page_id
  ORDER BY page_view_count DESC
  LIMIT 3
)
SELECT * FROM metrics;
SELECT * FROM top_pages;
Pages ( 1 of 11 ): 1 23 ... 11Next »