Use a CTE to detect and list duplicate entries in a table? To dedupe duplicate entries?
Using a CTE in Spark SQL to Detect and List Duplicate Entries
Spark SQL Query
To detect duplicate entries, you can use a Common Table Expression (CTE) to group records by their unique attributes and count occurrences. Any record with a count greater than 1 is considered a duplicate.
WITH duplicate_cte AS (
SELECT
column1, column2, column3, -- Add all relevant columns here
COUNT(*) AS occurrence_count
FROM your_table
GROUP BY column1, column2, column3
HAVING COUNT(*) > 1
)
SELECT *
FROM duplicate_cte;
Explanation
- CTE
duplicate_cte
:- Groups records by the columns you consider for uniqueness (e.g.,
column1
,column2
,column3
). - Counts occurrences of each group.
- Filters for groups where the count is greater than 1 (
HAVING COUNT(*) > 1
).
- Groups records by the columns you consider for uniqueness (e.g.,
- Final SELECT:
- Lists all duplicate records along with their counts.
Spark SQL Query to Dedupe Duplicate Entries
If you want to remove duplicates and retain only one instance of each unique record:
WITH ranked_cte AS (
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY column1, column2, column3 ORDER BY some_column) AS row_num
FROM your_table
)
SELECT *
FROM ranked_cte
WHERE row_num = 1;
Explanation
- ROW_NUMBER:
- Assigns a unique row number to each record within a group of duplicates based on specified columns (e.g.,
column1
,column2
,column3
). - Orders records using
ORDER BY some_column
.
- Assigns a unique row number to each record within a group of duplicates based on specified columns (e.g.,
- Final SELECT:
- Retains only the first record (
WHERE row_num = 1
) from each group, effectively deduplicating the table.
- Retains only the first record (
PySpark DataFrame Implementation
Detect Duplicates
To detect duplicates in a PySpark DataFrame:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count
# Initialize Spark Session
spark = SparkSession.builder.appName("DetectDuplicates").getOrCreate()
# Sample Data
data = [
(1, "A", 100),
(2, "B", 200),
(3, "A", 100),
(4, "C", 300),
(5, "B", 200)
]
columns = ["id", "category", "value"]
df = spark.createDataFrame(data, columns)
# Detect Duplicates
duplicates_df = (
df.groupBy("category", "value") # Group by relevant columns
.agg(count("*").alias("occurrence_count"))
.filter(col("occurrence_count") > 1) # Filter for duplicates
)
duplicates_df.show()
Dedupe Entries
To deduplicate and retain only one instance of each unique record:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
# Assign a row number to each record within a group of duplicates
window_spec = Window.partitionBy("category", "value").orderBy("id")
deduped_df = (
df.withColumn("row_num", row_number().over(window_spec))
.filter(col("row_num") == 1) # Retain only the first record
.drop("row_num") # Drop the helper column
)
deduped_df.show()
Sample Output
For Detect Duplicates:
category | value | occurrence_count |
---|---|---|
A | 100 | 2 |
B | 200 | 2 |
For Deduplication:
id | category | value |
---|---|---|
1 | A | 100 |
2 | B | 200 |
4 | C | 300 |
Summary
- Spark SQL with a CTE simplifies the process of detecting and deduplicating records.
- In PySpark,
groupBy
androw_number
allow efficient detection and removal of duplicates.