PySpark and Python data structures like dictionaries, lists, tuples, and sets can be used together in various ways for different use cases, especially in ETL frameworks, Data Quality (DQ) checks, Metadata-driven processing, and more. Below is a comprehensive list of how they can be used.
🔥 1. Metadata-Driven ETL Framework (Metadata in Python Dictionary)
Use Case: Store metadata about tables, transformations, and execution logic.
- Python dictionary stores column types, transformation rules, execution order, etc.
- PySpark reads metadata and applies transformations dynamically.
✅ Example: Storing metadata in a dictionary
metadata = {
"table_name": "employee",
"columns": ["id", "name", "salary"],
"types": {"id": "int", "name": "string", "salary": "double"},
"transformations": {"salary": "salary * 1.1"} # 10% salary hike
}
✅ Example: Applying metadata-based transformations in PySpark
from pyspark.sql import functions as F
df = df.withColumn("salary", F.expr(metadata["transformations"]["salary"]))
df.show()
🔍 2. Data Quality (DQ) Checks Using Python Dicts & Sets
Use Case: Define DQ rules and apply them dynamically in PySpark.
- Python dict: Stores expected column formats, null checks, range validations.
- PySpark functions: Apply these rules dynamically.
✅ Example: DQ rules in a dictionary
dq_rules = {
"not_null": ["id", "name"],
"allowed_values": {"status": {"active", "inactive"}},
"range_check": {"salary": (10000, 100000)}
}
✅ Applying DQ rules in PySpark
from pyspark.sql import functions as F
for col in dq_rules["not_null"]:
df = df.filter(F.col(col).isNotNull())
for col, allowed_values in dq_rules["allowed_values"].items():
df = df.filter(F.col(col).isin(allowed_values))
for col, (min_val, max_val) in dq_rules["range_check"].items():
df = df.filter((F.col(col) >= min_val) & (F.col(col) <= max_val))
df.show()
📌 3. Optimized Joins Using Python Sets
Use Case: Avoid costly joins by using Python sets for filtering.
- If a small list of values is available, use a Python set instead of a join.
✅ Example: Filtering records without a join
allowed_ids = {101, 102, 103} # Instead of joining with a lookup table
df_filtered = df.filter(F.col("id").isin(allowed_ids))
df_filtered.show()
🚀 Advantage: Faster than a join when filtering a small set of values.
🔄 4. Using Python Dictionaries for Dynamic Column Mapping
Use Case: Map old column names to new names dynamically.
✅ Example: Renaming columns dynamically
column_mapping = {"emp_id": "id", "emp_name": "name", "emp_salary": "salary"}
df = df.select([F.col(old).alias(new) for old, new in column_mapping.items()])
df.show()
🏗 5. Parameter Handling for ETL Jobs Using Python Dict
Use Case: Pass parameters dynamically into an ETL job.
✅ Example: Dynamic parameters for a PySpark job
etl_params = {
"source_table": "employees",
"target_table": "processed_employees",
"filter_condition": "status = 'active'"
}
df = spark.sql(f"SELECT * FROM {etl_params['source_table']} WHERE {etl_params['filter_condition']}")
df.write.saveAsTable(etl_params["target_table"])
🚀 6. Caching and Repartitioning Strategies Using Dict & Tuple
Use Case: Dynamically decide caching and partitioning strategies.
- Python dict defines optimal cache settings.
- Tuple stores repartitioning columns.
✅ Example: Cache & Repartition Strategies
cache_settings = {"employees": True, "transactions": False}
repartition_columns = ("region", "department")
if cache_settings["employees"]:
df = df.cache()
df = df.repartition(*repartition_columns)
df.show()
🏆 7. Version Control for Metadata Table Using Dict
Use Case: Maintain metadata table versioning.
- Python dict stores the latest version of the metadata.
✅ Example: Version control in metadata table
metadata_versioning = {
"employees": {"version": 2, "last_updated": "2025-02-10"},
"transactions": {"version": 3, "last_updated": "2025-02-15"}
}
latest_version = metadata_versioning["employees"]["version"]
df = spark.sql(f"SELECT * FROM metadata_table WHERE version = {latest_version}")
df.show()
🔄 8. Logging and Error Handling Using Dict
Use Case: Store error messages and logs dynamically.
✅ Example: Centralized error handling
error_log = []
try:
df = spark.read.csv("invalid_path.csv") # This will fail
except Exception as e:
error_log.append({"error": str(e), "timestamp": "2025-02-22"})
print(error_log)
🔥 Summary of Python Data Structures in PySpark
Use Case | Python Data Structure | Example |
---|---|---|
Metadata-driven ETL | dict | Store table schema & transformations |
Data Quality (DQ) Checks | dict , set | Store validation rules, allowed values |
Optimized filtering (avoid joins) | set | Filter using .isin(set) |
Dynamic column mapping | dict | Rename columns dynamically |
Parameter handling in ETL | dict | Pass job parameters dynamically |
Caching and partitioning strategies | dict , tuple | Optimize DataFrame storage |
Version control for metadata | dict | Maintain latest schema versions |
Logging and error handling | dict , list | Centralized error handling |
This covers all the major PySpark + Python data structure use cases we’ve discussed so far. 🚀 😊