To build a dynamic automated process where the steps (queries or DataFrame operations) are read from a table, so you can design a flexible framework

Dynamic ETL Framework Design

This ETL framework is designed for a hierarchical, monthly process, where products contain programs, which are further divided into stages and steps. Steps include SQL queries or DataFrame transformations and follow the metadata-driven execution.


Key Features

  1. Hierarchy:
    • Products → Programs → Stages → Steps.
  2. Write Modes:
    • Overwrite: Replace data in the table.
    • Append: Add new data to the table.
    • Archive: Save historical data using time-based partitioning.
    • Snapshot: Save with a timestamped table name and maintain a permanent view.
  3. Control & Log Table:
    • Maintain month_year for each ETL run.
    • Log execution statuses (success, failure, retries).
  4. Metadata-Driven Execution:
    • Metadata table stores SQL/DataFrame operations, dependencies, and priorities.
  5. Execution Modes:
    • SQL queries executed via spark.sql.
    • DataFrame transformations executed as Python code.

SQL to Create Metadata Table

CREATE TABLE IF NOT EXISTS metadatatable (
    program_name STRING,
    product_name STRING,
    stage_name STRING,
    step_name STRING,
    operation_type STRING, -- SQL or DataFrame
    query TEXT, -- For SQL queries
    custom_logic TEXT, -- For DataFrame transformations
    temp_view_name STRING, -- Temporary view created by this step
    table_name STRING, -- Target table name
    write_mode STRING, -- overwrite, append, archive, snapshot
    snapshot_mode STRING, -- e.g., tableName_yyyymmdd
    stage_priority INT, -- Priority of stages
    steps_priority INT, -- Priority of steps within a stage
    month_year STRING -- ETL run month/year (e.g., "202412")
);

Python Implementation

Below is a Python implementation using PySpark:

1. Dynamic Framework Code

from pyspark.sql import SparkSession
from datetime import datetime

# Initialize SparkSession
spark = SparkSession.builder 
    .appName("Dynamic Monthly ETL Framework") 
    .enableHiveSupport() 
    .getOrCreate()

# Get current month and year for the ETL process
month_year = "202412"

# Step 1: Load Metadata Table
metadata = spark.sql(f"SELECT * FROM metadatatable WHERE month_year = '{month_year}'")
metadata = metadata.orderBy("stage_priority", "steps_priority")  # Process in order

# Step 2: Control and Log Table
log_table = "etl_job_log"
control_table = "etl_control_table"

# Initialize log DataFrame
log_entries = []

# Function to log step status
def log_status(product_name, program_name, stage_name, step_name, status, error_message=None):
    log_entries.append({
        "product_name": product_name,
        "program_name": program_name,
        "stage_name": stage_name,
        "step_name": step_name,
        "month_year": month_year,
        "status": status,
        "error_message": error_message,
        "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    })

# Function to write to target table
def write_to_table(df, table_name, write_mode, snapshot_mode):
    if write_mode == "snapshot":
        snapshot_table_name = f"{table_name}_{month_year}"
        df.write.mode("overwrite").saveAsTable(snapshot_table_name)
        spark.sql(f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM {snapshot_table_name}")
    else:
        df.write.mode(write_mode).saveAsTable(table_name)

# Step 3: Process Steps
for row in metadata.collect():
    step = row.asDict()
    product_name = step["product_name"]
    program_name = step["program_name"]
    stage_name = step["stage_name"]
    step_name = step["step_name"]
    operation_type = step["operation_type"]
    query = step["query"]
    custom_logic = step["custom_logic"]
    temp_view_name = step["temp_view_name"]
    table_name = step["table_name"]
    write_mode = step["write_mode"]
    snapshot_mode = step["snapshot_mode"]

    try:
        print(f"Executing Step: {product_name} > {program_name} > {stage_name} > {step_name}")
        
        if operation_type == "SQL":
            # Execute SQL query
            df = spark.sql(query)
            if temp_view_name:
                df.createOrReplaceTempView(temp_view_name)
        elif operation_type == "DataFrame":
            # Execute DataFrame logic
            exec(custom_logic)
            df = locals().get(temp_view_name)  # Get the resulting DataFrame
        else:
            raise ValueError(f"Unknown operation type: {operation_type}")

        # Write to target table
        if table_name:
            write_to_table(df, table_name, write_mode, snapshot_mode)

        log_status(product_name, program_name, stage_name, step_name, "SUCCESS")
    except Exception as e:
        log_status(product_name, program_name, stage_name, step_name, "FAILED", str(e))
        print(f"Error in Step: {step_name}, Error: {str(e)}")
        break  # Stop processing on failure

# Step 4: Write Logs to Log Table
if log_entries:
    log_df = spark.createDataFrame(log_entries)
    log_df.write.mode("append").saveAsTable(log_table)

# Step 5: Update Control Table
control_data = [{"month_year": month_year, "status": "COMPLETED", "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}]
control_df = spark.createDataFrame(control_data)
control_df.write.mode("append").saveAsTable(control_table)

print("ETL Process Completed Successfully!")

Execution Using spark-submit

Save the above code as monthly_etl.py and execute it using:

spark-submit --master yarn --deploy-mode cluster monthly_etl.py

Key Features of the Framework

  1. Metadata-Driven:
    • Steps are dynamically executed based on the metadata table.
  2. Flexible Write Modes:
    • Supports overwrite, append, archive, and snapshot.
  3. Error Handling and Logging:
    • Logs success/failure of each step in the log table.
    • Stops processing if a step fails.
  4. Snapshots:
    • Creates timestamped tables and updates permanent views.
  5. Control Table:
    • Tracks the overall status of the ETL process.
  6. Extensibility:
    • Add or modify steps by updating the metadata table without changing the code.

Sample Metadata

Here’s how the metadata table might look for the provided scenario:

program_nameproduct_namestage_namestep_nameoperation_typequerycustom_logictemp_view_nametable_namewrite_modesnapshot_modestage_prioritysteps_prioritymonth_year
Risk ProgramPersonal LoanStage 1Calculate TotalsSQLSELECT category, SUM(amount) AS total FROM transactions GROUP BY categoryNULLtotals_viewrisk_totalsoverwriteNULL11202412
Risk ProgramPersonal LoanStage 1Filter High TotalsDataFrameNULLdf = spark.sql(“SELECT * FROM totals_view”).filter(“total > 500”)high_totals_viewhigh_totalsappendNULL12202412

This framework is ready for scalable, dynamic, and robust ETL processing in PySpark.


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Pages ( 2 of 5 ): « Previous1 2 345Next »
Skills

Posted on

January 2, 2025