To build a dynamic automated process where the steps (queries or DataFrame operations) are read from a table, so you can design a flexible framework

Complete project implementation for a dynamic ETL framework in PySpark. It includes:

  1. Metadata Table Definition and Sample Data
  2. Log Table Definition
  3. Dynamic ETL Script
  4. Sample Execution using Spark Submit

1. Metadata Table and Sample Data

Metadata Table Schema

CREATE TABLE IF NOT EXISTS metadatatable (
    program_name STRING,
    product_name STRING,
    stage_name STRING,
    step_name STRING,
    operation_type STRING, -- SQL, DataFrame
    query TEXT,
    custom_logic TEXT, -- For non-SQL transformations
    temp_view_name STRING,
    table_name STRING,
    write_mode STRING, -- overwrite, append, archive, snapshot
    snapshot_mode STRING, -- e.g., "tableName_yyyymmdd"
    stage_priority INT,
    steps_priority INT,
    month_year STRING -- e.g., "202412"
);

Sample Metadata

INSERT INTO metadatatable VALUES
('Risk Program', 'Personal Loan', 'Stage 1', 'Filter CIBIL', 'SQL',
 'SELECT * FROM cibil_data WHERE score > 700', NULL, 'filtered_cibil', NULL, NULL, NULL, 1, 1, '202412'),

('Risk Program', 'Personal Loan', 'Stage 1', 'Add Risk Column', 'DataFrame', NULL,
 'df = spark.table("filtered_cibil"); df = df.withColumn("risk_score", df["score"] * 1.1)', 
 'cibil_risk_view', NULL, NULL, 1, 2, '202412'),

('Risk Program', 'Personal Loan', 'Stage 2', 'Write Data', 'DataFrame', NULL,
 'df = spark.table("cibil_risk_view"); df.write.mode("overwrite").saveAsTable("final_risk_data")', 
 NULL, 'final_risk_data', 'overwrite', NULL, 2, 1, '202412');

2. Log Table

Log Table Schema

CREATE TABLE IF NOT EXISTS log_table (
    log_id BIGINT AUTO_INCREMENT,
    program_name STRING,
    product_name STRING,
    stage_name STRING,
    step_name STRING,
    status STRING,
    error_message STRING,
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    records_processed BIGINT,
    execution_time_seconds DOUBLE
);

3. Dynamic ETL Script

Python Script (ETL Framework)

Save this as dynamic_etl_framework.py:

from pyspark.sql import SparkSession
from datetime import datetime
import time

# Initialize SparkSession
spark = SparkSession.builder 
    .appName("Dynamic ETL Framework") 
    .enableHiveSupport() 
    .getOrCreate()

# Get current month-year
month_year = "202412"

# Load Metadata Table
metadata = spark.sql(f"SELECT * FROM metadatatable WHERE month_year = '{month_year}'")
metadata = metadata.orderBy("stage_priority", "steps_priority")

# Define Log Table
log_table = "log_table"
log_entries = []

# Function to log status
def log_status(log_id, product_name, program_name, stage_name, step_name, status,
               error_message=None, start_time=None, end_time=None, records_processed=None):
    execution_time_seconds = None
    if start_time and end_time:
        execution_time_seconds = (end_time - start_time).total_seconds()
    
    log_entries.append({
        "log_id": log_id,
        "product_name": product_name,
        "program_name": program_name,
        "stage_name": stage_name,
        "step_name": step_name,
        "status": status,
        "error_message": error_message,
        "start_time": start_time.strftime("%Y-%m-%d %H:%M:%S") if start_time else None,
        "end_time": end_time.strftime("%Y-%m-%d %H:%M:%S") if end_time else None,
        "records_processed": records_processed,
        "execution_time_seconds": execution_time_seconds
    })

# Function to write to target table
def write_to_table(df, table_name, write_mode, snapshot_mode):
    if write_mode == "snapshot":
        snapshot_table_name = f"{table_name}_{month_year}"
        df.write.mode("overwrite").saveAsTable(snapshot_table_name)
        spark.sql(f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM {snapshot_table_name}")
    else:
        df.write.mode(write_mode).saveAsTable(table_name)

# Process each step in the metadata
for row in metadata.collect():
    step = row.asDict()
    product_name = step["product_name"]
    program_name = step["program_name"]
    stage_name = step["stage_name"]
    step_name = step["step_name"]
    operation_type = step["operation_type"]
    query = step["query"]
    custom_logic = step["custom_logic"]
    temp_view_name = step["temp_view_name"]
    table_name = step["table_name"]
    write_mode = step["write_mode"]
    snapshot_mode = step["snapshot_mode"]

    start_time = datetime.now()

    try:
        print(f"Executing Step: {product_name} > {program_name} > {stage_name} > {step_name}")
        
        if operation_type == "SQL":
            # Execute SQL query
            df = spark.sql(query)
            if temp_view_name:
                df.createOrReplaceTempView(temp_view_name)
        elif operation_type == "DataFrame":
            # Execute DataFrame logic
            exec(custom_logic)
            df = locals().get(temp_view_name)  # Get the resulting DataFrame
        else:
            raise ValueError(f"Unknown operation type: {operation_type}")

        # Count records processed
        records_processed = df.count() if df else None

        # Write to table
        if table_name:
            write_to_table(df, table_name, write_mode, snapshot_mode)

        # Log success
        end_time = datetime.now()
        log_status(None, product_name, program_name, stage_name, step_name, 
                   "SUCCESS", start_time=start_time, end_time=end_time, records_processed=records_processed)

    except Exception as e:
        # Log failure
        end_time = datetime.now()
        log_status(None, product_name, program_name, stage_name, step_name, 
                   "FAILED", error_message=str(e), start_time=start_time, end_time=end_time)
        print(f"Error in Step: {step_name}, Error: {str(e)}")
        break

# Write log entries to Log Table
if log_entries:
    log_df = spark.createDataFrame(log_entries)
    log_df.write.mode("append").saveAsTable(log_table)

print("ETL Process Completed!")

4. Execution

Command to Run the Script

Save the above script as dynamic_etl_framework.py and execute using spark-submit:

spark-submit --master yarn --deploy-mode cluster dynamic_etl_framework.py

Sample Hive Tables

#cibil_dataCREATE TABLE cibil_data (customer_id INT, score INT); INSERT INTO cibil_data VALUES (1, 750), (2, 650), (3, 800);

final_risk_data: Automatically created during the process.


This end-to-end project demonstrates the dynamic processing of metadata-driven ETL tasks using PySpark, with logging, error handling, and modular execution.


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Pages ( 3 of 5 ): « Previous12 3 45Next »
Skills

Posted on

January 2, 2025