contact@hintstoday.com  |  (123)-456-7890

Useful Code Snippets in Python and Pyspark

by lochan2014 | Jan 7, 2025 | Pyspark, Python | 0 comments

PySpark and Python data structures like dictionaries, lists, tuples, and sets can be used together in various ways for different use cases, especially in ETL frameworks, Data Quality (DQ) checks, Metadata-driven processing, and more. Below is a comprehensive list of how they can be used.


🔥 1. Metadata-Driven ETL Framework (Metadata in Python Dictionary)

Use Case: Store metadata about tables, transformations, and execution logic.

  • Python dictionary stores column types, transformation rules, execution order, etc.
  • PySpark reads metadata and applies transformations dynamically.

Example: Storing metadata in a dictionary

metadata = {
    "table_name": "employee",
    "columns": ["id", "name", "salary"],
    "types": {"id": "int", "name": "string", "salary": "double"},
    "transformations": {"salary": "salary * 1.1"}  # 10% salary hike
}

Example: Applying metadata-based transformations in PySpark

from pyspark.sql import functions as F

df = df.withColumn("salary", F.expr(metadata["transformations"]["salary"]))
df.show()

🔍 2. Data Quality (DQ) Checks Using Python Dicts & Sets

Use Case: Define DQ rules and apply them dynamically in PySpark.

  • Python dict: Stores expected column formats, null checks, range validations.
  • PySpark functions: Apply these rules dynamically.

Example: DQ rules in a dictionary

dq_rules = {
    "not_null": ["id", "name"],
    "allowed_values": {"status": {"active", "inactive"}},
    "range_check": {"salary": (10000, 100000)}
}

Applying DQ rules in PySpark

from pyspark.sql import functions as F

for col in dq_rules["not_null"]:
    df = df.filter(F.col(col).isNotNull())

for col, allowed_values in dq_rules["allowed_values"].items():
    df = df.filter(F.col(col).isin(allowed_values))

for col, (min_val, max_val) in dq_rules["range_check"].items():
    df = df.filter((F.col(col) >= min_val) & (F.col(col) <= max_val))

df.show()

📌 3. Optimized Joins Using Python Sets

Use Case: Avoid costly joins by using Python sets for filtering.

  • If a small list of values is available, use a Python set instead of a join.

Example: Filtering records without a join

allowed_ids = {101, 102, 103}  # Instead of joining with a lookup table

df_filtered = df.filter(F.col("id").isin(allowed_ids))
df_filtered.show()

🚀 Advantage: Faster than a join when filtering a small set of values.


🔄 4. Using Python Dictionaries for Dynamic Column Mapping

Use Case: Map old column names to new names dynamically.

Example: Renaming columns dynamically

column_mapping = {"emp_id": "id", "emp_name": "name", "emp_salary": "salary"}

df = df.select([F.col(old).alias(new) for old, new in column_mapping.items()])
df.show()

🏗 5. Parameter Handling for ETL Jobs Using Python Dict

Use Case: Pass parameters dynamically into an ETL job.

Example: Dynamic parameters for a PySpark job

etl_params = {
    "source_table": "employees",
    "target_table": "processed_employees",
    "filter_condition": "status = 'active'"
}

df = spark.sql(f"SELECT * FROM {etl_params['source_table']} WHERE {etl_params['filter_condition']}")
df.write.saveAsTable(etl_params["target_table"])

🚀 6. Caching and Repartitioning Strategies Using Dict & Tuple

Use Case: Dynamically decide caching and partitioning strategies.

  • Python dict defines optimal cache settings.
  • Tuple stores repartitioning columns.

Example: Cache & Repartition Strategies

cache_settings = {"employees": True, "transactions": False}
repartition_columns = ("region", "department")

if cache_settings["employees"]:
    df = df.cache()

df = df.repartition(*repartition_columns)
df.show()

🏆 7. Version Control for Metadata Table Using Dict

Use Case: Maintain metadata table versioning.

  • Python dict stores the latest version of the metadata.

Example: Version control in metadata table

metadata_versioning = {
    "employees": {"version": 2, "last_updated": "2025-02-10"},
    "transactions": {"version": 3, "last_updated": "2025-02-15"}
}

latest_version = metadata_versioning["employees"]["version"]
df = spark.sql(f"SELECT * FROM metadata_table WHERE version = {latest_version}")
df.show()

🔄 8. Logging and Error Handling Using Dict

Use Case: Store error messages and logs dynamically.

Example: Centralized error handling

error_log = []

try:
    df = spark.read.csv("invalid_path.csv")  # This will fail
except Exception as e:
    error_log.append({"error": str(e), "timestamp": "2025-02-22"})

print(error_log)

🔥 Summary of Python Data Structures in PySpark

Use CasePython Data StructureExample
Metadata-driven ETLdictStore table schema & transformations
Data Quality (DQ) Checksdict, setStore validation rules, allowed values
Optimized filtering (avoid joins)setFilter using .isin(set)
Dynamic column mappingdictRename columns dynamically
Parameter handling in ETLdictPass job parameters dynamically
Caching and partitioning strategiesdict, tupleOptimize DataFrame storage
Version control for metadatadictMaintain latest schema versions
Logging and error handlingdict, listCentralized error handling

This covers all the major PySpark + Python data structure use cases we’ve discussed so far. 🚀 😊

Written By

undefined

Related Posts

Useful Code Snippets in Python and Pyspark

#1. create a sample dataframe # create a sample dataframe data = [ (“Sam”,”Sales”, 50000), (“Ram”,”Sales”, 60000), (“Dan”,”Sales”, 70000), (“Gam”,”Marketing”, 40000), (“Ham”,”Marketing”, 55000), (“RAM”,”IT”, 45000), (“Mam”,”IT”, 65000), (“MAM”,”IT”, 75000) ] df =…

Read More

Spark SQL- operators Cheatsheet- Explanation with Usecases

Spark SQL Operators Cheatsheet 1. Arithmetic Operators OperatorSyntaxDescriptionExample+a + bAdds two valuesSELECT 5 + 3;-a – bSubtracts one value from anotherSELECT 5 – 3;*a * bMultiplies two valuesSELECT 5 * 3;/a / bDivides one value by anotherSELECT 6 / 2;%a %…

Read More

Submit a Comment

Your email address will not be published. Required fields are marked *