To build a dynamic automated process where the steps (queries or DataFrame operations) are read from a table, so you can design a flexible framework

Below is the complete, enhanced ETL framework code that dynamically processes a specific stage of a program using metadata, with additional features such as parameterization, logging, error handling, and retries.


Dynamic ETL Framework Code

Save this as dynamic_etl_framework.py:

from pyspark.sql import SparkSession
from datetime import datetime
import time
import sys

# Initialize SparkSession
spark = SparkSession.builder 
    .appName("Dynamic ETL Framework") 
    .enableHiveSupport() 
    .getOrCreate()

# Input parameters from the command line
if len(sys.argv) < 4:
    print("Usage: dynamic_etl_framework.py <program_name> <stage_name> <month_year>")
    sys.exit(1)

program_name_filter = sys.argv[1]  # e.g., "Risk Program"
stage_name_filter = sys.argv[2]    # e.g., "Stage 1"
month_year = sys.argv[3]           # e.g., "202412"

# Retry configuration
MAX_RETRIES = 3
RETRY_DELAY = 5  # Seconds

# Load Metadata Table for the specific program and stage
metadata = spark.sql(f"""
    SELECT * 
    FROM metadatatable 
    WHERE month_year = '{month_year}' 
      AND program_name = '{program_name_filter}'
      AND stage_name = '{stage_name_filter}'
""").orderBy("stage_priority", "steps_priority")

# Define Log Table
log_table = "log_table"
log_entries = []

# Function to log status
def log_status(product_name, program_name, stage_name, step_name, status,
               error_message=None, start_time=None, end_time=None, records_processed=None):
    execution_time_seconds = None
    if start_time and end_time:
        execution_time_seconds = (end_time - start_time).total_seconds()
    
    log_entries.append({
        "product_name": product_name,
        "program_name": program_name,
        "stage_name": stage_name,
        "step_name": step_name,
        "status": status,
        "error_message": error_message,
        "start_time": start_time.strftime("%Y-%m-%d %H:%M:%S") if start_time else None,
        "end_time": end_time.strftime("%Y-%m-%d %H:%M:%S") if end_time else None,
        "records_processed": records_processed,
        "execution_time_seconds": execution_time_seconds
    })

# Function to write to target table
def write_to_table(df, table_name, write_mode, snapshot_mode):
    if write_mode == "snapshot":
        snapshot_table_name = f"{table_name}_{month_year}"
        df.write.mode("overwrite").saveAsTable(snapshot_table_name)
        spark.sql(f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM {snapshot_table_name}")
    else:
        df.write.mode(write_mode).saveAsTable(table_name)

# Process each step in the metadata
for row in metadata.collect():
    step = row.asDict()
    product_name = step["product_name"]
    program_name = step["program_name"]
    stage_name = step["stage_name"]
    step_name = step["step_name"]
    operation_type = step["operation_type"]
    query = step["query"]
    custom_logic = step["custom_logic"]
    temp_view_name = step["temp_view_name"]
    table_name = step["table_name"]
    write_mode = step["write_mode"]
    snapshot_mode = step["snapshot_mode"]

    start_time = datetime.now()
    retry_count = 0
    success = False

    while retry_count < MAX_RETRIES and not success:
        try:
            print(f"Executing Step: {product_name} > {program_name} > {stage_name} > {step_name}")
            
            if operation_type == "SQL":
                # Execute SQL query
                df = spark.sql(query)
                if temp_view_name:
                    df.createOrReplaceTempView(temp_view_name)
            elif operation_type == "DataFrame":
                # Execute DataFrame logic
                exec(custom_logic)
                df = locals().get(temp_view_name)  # Get the resulting DataFrame
            else:
                raise ValueError(f"Unknown operation type: {operation_type}")

            # Count records processed
            records_processed = df.count() if df else None

            # Write to table
            if table_name:
                write_to_table(df, table_name, write_mode, snapshot_mode)

            # Log success
            end_time = datetime.now()
            log_status(product_name, program_name, stage_name, step_name, 
                       "SUCCESS", start_time=start_time, end_time=end_time, records_processed=records_processed)
            success = True

        except Exception as e:
            retry_count += 1
            if retry_count >= MAX_RETRIES:
                # Log failure after max retries
                end_time = datetime.now()
                log_status(product_name, program_name, stage_name, step_name, 
                           "FAILED", error_message=str(e), start_time=start_time, end_time=end_time)
                print(f"Error in Step: {step_name}, Error: {str(e)}")
                break
            else:
                print(f"Retrying Step: {step_name} ({retry_count}/{MAX_RETRIES}) after error: {str(e)}")
                time.sleep(RETRY_DELAY)

# Write log entries to Log Table
if log_entries:
    log_df = spark.createDataFrame(log_entries)
    log_df.write.mode("append").saveAsTable(log_table)

print("ETL Process Completed!")

Execution

Save the script as dynamic_etl_framework.py and execute using Spark Submit:

spark-submit dynamic_etl_framework.py "Risk Program" "Stage 1" "202412"

Additional Features in This Version

  1. Parameterization:
    • Runs for specific program_name, stage_name, and month_year.
    • Passed as command-line arguments for flexibility.
  2. Error Handling and Retries:
    • Retries each step up to MAX_RETRIES times before logging it as a failure.
    • Adds a delay between retries to handle transient issues.
  3. Logging:
    • Captures detailed logs, including status, error_message, records_processed, and execution time.
    • Logs are stored in the log_table.
  4. Dynamic Write Modes:
    • Supports overwrite, append, archive, and snapshot modes for target tables.

Sample Metadata for Testing

Here’s the same sample metadata that filters and processes CIBIL data:

Sample Metadata Table

INSERT INTO metadatatable VALUES
('Risk Program', 'Personal Loan', 'Stage 1', 'Filter CIBIL', 'SQL',
 'SELECT * FROM cibil_data WHERE score > 700', NULL, 'filtered_cibil', NULL, NULL, NULL, 1, 1, '202412'),

('Risk Program', 'Personal Loan', 'Stage 1', 'Add Risk Column', 'DataFrame', NULL,
 'df = spark.table("filtered_cibil"); df = df.withColumn("risk_score", df["score"] * 1.1)', 
 'cibil_risk_view', NULL, NULL, 1, 2, '202412'),

('Risk Program', 'Personal Loan', 'Stage 1', 'Write Data', 'DataFrame', NULL,
 'df = spark.table("cibil_risk_view"); df.write.mode("overwrite").saveAsTable("final_risk_data")', 
 NULL, 'final_risk_data', 'overwrite', NULL, 1, 3, '202412');

Log Table Schema

CREATE TABLE IF NOT EXISTS log_table (
    log_id BIGINT AUTO_INCREMENT,
    program_name STRING,
    product_name STRING,
    stage_name STRING,
    step_name STRING,
    status STRING,
    error_message STRING,
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    records_processed BIGINT,
    execution_time_seconds DOUBLE
);

This approach ensures a robust, dynamic, and extensible ETL pipeline, with clear logging and retry mechanisms. Let me know if you need further refinements!


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Pages ( 4 of 5 ): « Previous123 4 5Next »
Skills

Posted on

January 2, 2025