Coding Questions in Spark SQL, Pyspark, and Python

Use a CTE to detect and list duplicate entries in a table? To dedupe duplicate entries?

Using a CTE in Spark SQL to Detect and List Duplicate Entries

Spark SQL Query

To detect duplicate entries, you can use a Common Table Expression (CTE) to group records by their unique attributes and count occurrences. Any record with a count greater than 1 is considered a duplicate.

WITH duplicate_cte AS (
    SELECT 
        column1, column2, column3, -- Add all relevant columns here
        COUNT(*) AS occurrence_count
    FROM your_table
    GROUP BY column1, column2, column3
    HAVING COUNT(*) > 1
)
SELECT * 
FROM duplicate_cte;

Explanation

  1. CTE duplicate_cte:
    • Groups records by the columns you consider for uniqueness (e.g., column1, column2, column3).
    • Counts occurrences of each group.
    • Filters for groups where the count is greater than 1 (HAVING COUNT(*) > 1).
  2. Final SELECT:
    • Lists all duplicate records along with their counts.

Spark SQL Query to Dedupe Duplicate Entries

If you want to remove duplicates and retain only one instance of each unique record:

WITH ranked_cte AS (
    SELECT 
        *,
        ROW_NUMBER() OVER (PARTITION BY column1, column2, column3 ORDER BY some_column) AS row_num
    FROM your_table
)
SELECT * 
FROM ranked_cte
WHERE row_num = 1;

Explanation

  • ROW_NUMBER:
    • Assigns a unique row number to each record within a group of duplicates based on specified columns (e.g., column1, column2, column3).
    • Orders records using ORDER BY some_column.
  • Final SELECT:
    • Retains only the first record (WHERE row_num = 1) from each group, effectively deduplicating the table.

PySpark DataFrame Implementation

Detect Duplicates

To detect duplicates in a PySpark DataFrame:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count

# Initialize Spark Session
spark = SparkSession.builder.appName("DetectDuplicates").getOrCreate()

# Sample Data
data = [
    (1, "A", 100),
    (2, "B", 200),
    (3, "A", 100),
    (4, "C", 300),
    (5, "B", 200)
]
columns = ["id", "category", "value"]

df = spark.createDataFrame(data, columns)

# Detect Duplicates
duplicates_df = (
    df.groupBy("category", "value")  # Group by relevant columns
    .agg(count("*").alias("occurrence_count"))
    .filter(col("occurrence_count") > 1)  # Filter for duplicates
)

duplicates_df.show()

Dedupe Entries

To deduplicate and retain only one instance of each unique record:

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Assign a row number to each record within a group of duplicates
window_spec = Window.partitionBy("category", "value").orderBy("id")
deduped_df = (
    df.withColumn("row_num", row_number().over(window_spec))
    .filter(col("row_num") == 1)  # Retain only the first record
    .drop("row_num")  # Drop the helper column
)

deduped_df.show()

Sample Output

For Detect Duplicates:

categoryvalueoccurrence_count
A1002
B2002

For Deduplication:

idcategoryvalue
1A100
2B200
4C300

Summary

  • Spark SQL with a CTE simplifies the process of detecting and deduplicating records.
  • In PySpark, groupBy and row_number allow efficient detection and removal of duplicates.
Pages ( 11 of 11 ): « Previous1 ... 910 11