Project 6

etl_project/
├── config/
│ ├── base_config.json
├── control/
│ ├── control_table_setup.sql
│ ├── sample_control_table_data.sql
│ ├── log_table_setup.sql
├── scripts/
│ ├── run_etl.sh
│ ├── run_etl.py
├── etl/
│ ├── execute_query.py
│ ├── log_status.py
│ ├── stage_runner.py
│ ├── job_orchestrator.py
├── logging/
│ ├── logger.py
├── utils/
│ ├── spark_utils.py
│ ├── query_utils.py
│ ├── config_utils.py
│ ├── error_handling.py
├── log/
│ ├── execution_logs.log
├── tests/
│ ├── test_etl.py
│ ├── test_query_utils.py
│ ├── test_logger.py
│ ├── test_error_handling.py
└── README.md

ETL Workflow with Enhanced Logging and Snapshot Management

Below is a refined ETL architecture and corresponding SQL and Python updates, based on your project structure and requirements.


SQL Table Definitions

Control Table

The control table is used to manage ETL steps, track their sequence, and specify configurations such as write_mode and snapshot_mode.

CREATE TABLE IF NOT EXISTS control_table (
    program_name STRING,
    product_name STRING,
    stage_name STRING,
    step_name STRING,
    operation_type STRING, -- SQL, DataFrame
    query TEXT,
    custom_logic TEXT, -- For non-SQL transformations
    temp_view_name STRING,
    table_name STRING,
    write_mode STRING, -- overwrite, append, archive, snapshot
    snapshot_mode STRING, -- e.g., "tableName_yyyymmdd"
    stage_priority INT,
    steps_priority INT,
    month_year STRING -- e.g., "202412"
);

Log Table

The log table tracks the execution of each ETL step, including performance metrics and error details.

CREATE TABLE IF NOT EXISTS log_table (
    log_id BIGINT AUTO_INCREMENT PRIMARY KEY,
    program_name STRING,
    product_name STRING,
    stage_name STRING,
    step_name STRING,
    status STRING, -- SUCCESS, FAILED
    error_message STRING,
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    records_processed BIGINT,
    execution_time_seconds DOUBLE,
    month_year STRING -- e.g., "202412"
);

Key Enhancements

  1. Monthly Partitioning (month_year):
    • Both control and log tables include a month_year column to support monthly ETL runs.
  2. Write Modes:
    • overwrite: Replace existing data.
    • append: Add new records.
    • archive: Save to a time-stamped snapshot table.
    • snapshot: Create a table with a time-stamped name and a permanent view for consistent access.
  3. Execution Time and Records Processed:
    • The log table tracks execution time (execution_time_seconds) and the number of records processed.
  4. Support for Spark SQL and DataFrame:
    • Steps can be SQL queries or DataFrame code snippets.

Python Code Updates

Step Execution with Snapshot Handling

In execute_query.py, handle write_mode and snapshot_mode.

from datetime import datetime

def execute_step(spark, step, base_config):
    """
    Execute a single step in the ETL process and return the number of records processed.
    """
    if step["operation_type"] == "SQL":
        temp_view_name = step["temp_view_name"]
        query = step["query"]
        result_df = spark.sql(query)

        if step["write_mode"] == "temp_view":
            result_df.createOrReplaceTempView(temp_view_name)
        elif step["write_mode"] == "table":
            table_name = step["table_name"]
            result_df.write.mode("overwrite").saveAsTable(table_name)
        elif step["write_mode"] == "append":
            table_name = step["table_name"]
            result_df.write.mode("append").saveAsTable(table_name)
        elif step["write_mode"] == "archive":
            table_name = f"{step['table_name']}_{datetime.now().strftime('%Y%m%d')}"
            result_df.write.mode("overwrite").saveAsTable(table_name)
        elif step["write_mode"] == "snapshot":
            table_name = f"{step['table_name']}_{datetime.now().strftime('%Y%m%d')}"
            result_df.write.mode("overwrite").saveAsTable(table_name)
            # Create or replace permanent view
            spark.sql(f"CREATE OR REPLACE VIEW {step['table_name']} AS SELECT * FROM {table_name}")

        return result_df.count()  # Return the number of records processed

    elif step["operation_type"] == "DataFrame":
        custom_logic = step["custom_logic"]
        exec(custom_logic)  # Run custom DataFrame code
        return df.count()  # Assuming df is the resulting DataFrame

Logging Updates

Update the log_to_table function to include month_year and execution_time_seconds.

from datetime import datetime

def log_to_table(spark, program_name, product_name, stage_name, step_name, status, error_message, records_processed, start_time, end_time, month_year):
    """
    Log execution details to the log_table.
    """
    execution_time_seconds = (end_time - start_time).total_seconds()

    spark.sql(f"""
        INSERT INTO log_table (program_name, product_name, stage_name, step_name, status,   error_message, start_time, end_time, records_processed, execution_time_seconds, month_year)
        VALUES ('{program_name}', '{product_name}', '{stage_name}', '{step_name}', '{status}', 
                '{error_message}', '{start_time}', '{end_time}', {records_processed}, 
                {execution_time_seconds}, '{month_year}')
    """)

Stage Runner with Monthly Partitioning

In stage_runner.py, pass month_year for every step.

from datetime import datetime
from etl.execute_query import execute_step
from etl.log_status import log_to_table
from utils.config_utils import get_control_table_steps
from utils.spark_utils import get_spark_session
from logging.logger import get_logger

logger = get_logger("ETL")

def run_stage(program_name, product_name, stage_name, base_config, month_year):
    """
    Execute all steps in a stage and log results.
    """
    spark = get_spark_session()
    steps = get_control_table_steps(spark, program_name, stage_name, month_year)

    for step in steps.collect():
        step_name = step["step_name"]
        start_time = datetime.now()
        try:
            records_processed = execute_step(spark, step, base_config)
            end_time = datetime.now()
            log_to_table(spark, program_name, product_name, stage_name, step_name, "SUCCESS", None,
                         records_processed, start_time, end_time, month_year)
            logger.info(f"Step {step_name} completed successfully. Records processed: {records_processed}")
        except Exception as e:
            end_time = datetime.now()
            log_to_table(spark, program_name, product_name, stage_name, step_name, "FAILED", str(e),
                         0, start_time, end_time, month_year)
            logger.error(f"Error in step {step_name}: {e}")

    spark.stop()

Execution Scripts

Monthly Execution in run_etl.py

Pass month_year as an argument to orchestrate ETL for the given period.

from etl.job_orchestrator import orchestrate_etl

if __name__ == "__main__":
    import sys
    month_year = sys.argv[1]  # Pass as YYYYMM
    base_config = "config/base_config.json"
    orchestrate_etl(base_config, month_year)

Example Control Table Data

INSERT INTO control_table VALUES
('Program1', 'Personal Loan', 'Stage1', 'Step1', 'SQL', 'SELECT * FROM source_table', NULL, 
 'temp_view1', 'target_table1', 'overwrite', NULL, 1, 1, '202412'),
('Program1', 'Personal Loan', 'Stage1', 'Step2', 'DataFrame', NULL, 'df = spark.table("temp_view1"); df = df.withColumn("score", df["score"] * 1.1)', 
 NULL, 'target_table2', 'snapshot', 'tableName_yyyymmdd', 1, 2, '202412');

This structure and code implementation ensures monthly ETL runs are modular, scalable, and well-logged.

Sparate spark submits for each stages of a program, can be built dynamically . shell script or python execution for individual stage of a program:-

Updated Table Definitions

Control Table

Add month_year as a key column to reflect the monthly processing needs and align with log_table.

CREATE TABLE IF NOT EXISTS control_table (
    program_name STRING,
    product_name STRING,
    stage_name STRING,
    step_name STRING,
    operation_type STRING, -- e.g., SQL, DataFrame
    query TEXT, -- SQL logic
    custom_logic TEXT, -- DataFrame or Python logic
    temp_view_name STRING, -- Name for temporary Spark views
    table_name STRING, -- Target table
    write_mode STRING, -- Write modes: write, append, snapshot, archive
    snapshot_mode STRING, -- Snapshot identifier: YYYYMMDD format
    stage_priority INT, -- Stage execution priority
    step_priority INT, -- Step execution priority
    month_year STRING -- Monthly partition in YYYYMM format
);

Log Table

Include month_year for traceability across monthly runs and execution_time_seconds to measure performance.

CREATE TABLE IF NOT EXISTS log_table (
    log_id BIGINT AUTO_INCREMENT,
    program_name STRING,
    product_name STRING,
    stage_name STRING,
    step_name STRING,
    status STRING, -- e.g., SUCCESS, FAILED
    error_message STRING, -- Details of failure, if any
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    records_processed BIGINT, -- Count of processed records
    execution_time_seconds DOUBLE, -- Time taken for step completion
    month_year STRING -- Monthly partition in YYYYMM format
);

2. Execution Pipeline Workflow

Shell Script (run_etl.sh)

The script triggers the ETL execution for specific stages dynamically based on program_name and stage_name.

bashCopy code#!/bin/bash

PROGRAM_NAME=$1
STAGE_NAME=$2
MONTH_YEAR=$(date +"%Y%m")

echo "Executing ETL for Program: $PROGRAM_NAME, Stage: $STAGE_NAME, Month-Year: $MONTH_YEAR"

spark-submit --class org.apache.spark.Main \
    --master yarn \
    --deploy-mode client \
    ./scripts/run_etl.py \
    --program_name $PROGRAM_NAME \
    --stage_name $STAGE_NAME \
    --month_year $MONTH_YEAR

if [ $? -eq 0 ]; then
    echo "ETL execution completed successfully for Stage: $STAGE_NAME"
else
    echo "ETL execution failed for Stage: $STAGE_NAME"
    exit 1
fi

Stage Runner (stage_runner.py)

The stage execution ensures priority-based processing and tracks logs dynamically.

from etl.execute_query import execute_step
from etl.log_status import log_to_table
from utils.config_utils import get_control_table_steps
from utils.spark_utils import get_spark_session
from datetime import datetime

def run_stage(program_name, stage_name, month_year):
    spark = get_spark_session()
    steps = get_control_table_steps(spark, program_name, stage_name, month_year)

    for step in steps.collect():
        step_name = step["step_name"]
        start_time = datetime.now()

        try:
            records_processed = execute_step(spark, step)
            end_time = datetime.now()
            execution_time = (end_time - start_time).total_seconds()
            log_to_table(
                spark, program_name, stage_name, step_name, "SUCCESS", 
                None, start_time, end_time, records_processed, execution_time, month_year
            )
        except Exception as e:
            end_time = datetime.now()
            execution_time = (end_time - start_time).total_seconds()
            log_to_table(
                spark, program_name, stage_name, step_name, "FAILED", 
                str(e), start_time, end_time, 0, execution_time, month_year
            )
            raise e

Execution Query (execute_query.py)

Handles the different write_mode options and calculates processed record counts.

def execute_step(spark, step):
    """
    Execute ETL logic for a single step.
    """
    operation_type = step["operation_type"]
    write_mode = step["write_mode"]
    temp_view_name = step.get("temp_view_name")
    table_name = step.get("table_name")
    query = step.get("query")
    custom_logic = step.get("custom_logic")
    snapshot_mode = step.get("snapshot_mode")

    if operation_type == "SQL":
        result_df = spark.sql(query)

    elif operation_type == "DataFrame":
        exec(custom_logic)  # Execute custom DataFrame logic

    # Handle write modes
    if write_mode == "write":
        result_df.write.mode("overwrite").saveAsTable(table_name)
    elif write_mode == "append":
        result_df.write.mode("append").saveAsTable(table_name)
    elif write_mode == "snapshot":
        snapshot_table = f"{table_name}_{snapshot_mode}"
        result_df.write.mode("overwrite").saveAsTable(snapshot_table)
        spark.sql(f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM {snapshot_table}")
    elif write_mode == "archive":
        archive_table = f"{table_name}_archive_{snapshot_mode}"
        result_df.write.mode("append").saveAsTable(archive_table)

    return result_df.count()

3. Orchestrator (job_orchestrator.py)

Manages the entire ETL pipeline.

from etl.stage_runner import run_stage

def orchestrate_program(program_name, month_year):
    stages = ["Stage1", "Stage2", "Stage3"]  # Define stage sequence
    for stage_name in stages:
        run_stage(program_name, stage_name, month_year)

4. Key Features

  • Dynamic Execution: Handle stages and programs dynamically using configurations.
  • Partitioning: Include month_year for monthly runs in both control_table and log_table.
  • Write Modes:
    • Write: Overwrites the table.
    • Append: Appends to the existing table.
    • Snapshot: Creates time-stamped versions and maintains a permanent view.
    • Archive: Stores older records in a separate table.

Let’s dive deeper into all the modules and provide detailed examples with explanations. This will include scripts for individual components, implementation details, and their integration into the overall workflow. I’ll structure this as follows:


1. Module Details and Scripts

1.1. config/

Stores configurations like base_config.json for reusable metadata.

base_config.json:

jsonCopy code{
  "spark_config": {
    "app_name": "ETL Pipeline",
    "master": "yarn",
    "deploy_mode": "client"
  },
  "log_table_name": "log_table",
  "control_table_name": "control_table"
}

1.2. control/

SQL scripts for setting up control and log tables.

control_table_setup.sql:

CREATE TABLE IF NOT EXISTS control_table (
    program_name STRING,
    product_name STRING,
    stage_name STRING,
    step_name STRING,
    operation_type STRING,
    query TEXT,
    custom_logic TEXT,
    temp_view_name STRING,
    table_name STRING,
    write_mode STRING,
    snapshot_mode STRING,
    stage_priority INT,
    step_priority INT,
    month_year STRING
);

sample_control_table_data.sql:

sqlCopy codeINSERT INTO control_table VALUES 
('Program_A', 'Personal Loan', 'Stage1', 'Step1', 'SQL',
 'SELECT * FROM source_table WHERE score > 700', NULL,
 'temp_stage1', 'personal_loan_stage1', 'write', NULL, 1, 1, '202412');

1.3. scripts/

Scripts for pipeline orchestration and execution.

run_etl.sh: Executes stages of programs dynamically.

#!/bin/bash

PROGRAM_NAME=$1
STAGE_NAME=$2
MONTH_YEAR=$3

echo "Running ETL: Program=$PROGRAM_NAME, Stage=$STAGE_NAME, Month-Year=$MONTH_YEAR"

spark-submit \
  --master yarn \
  ./etl/job_orchestrator.py \
  --program_name $PROGRAM_NAME \
  --stage_name $STAGE_NAME \
  --month_year $MONTH_YEAR

1.4. etl/

Core ETL logic including query execution and logging.

execute_query.py: Executes SQL or DataFrame-based steps.

from pyspark.sql import SparkSession

def execute_step(spark, step):
    operation_type = step["operation_type"]
    query = step["query"]
    custom_logic = step["custom_logic"]

    if operation_type == "SQL":
        df = spark.sql(query)
    elif operation_type == "DataFrame":
        exec(custom_logic)  # Custom DataFrame logic
    else:
        raise ValueError("Invalid operation type.")

    # Example write logic
    write_mode = step["write_mode"]
    table_name = step["table_name"]
    if write_mode == "write":
        df.write.mode("overwrite").saveAsTable(table_name)

    return df.count()

log_status.py: Logs success or failure status.

from pyspark.sql import SparkSession
from datetime import datetime

def log_to_table(spark, program_name, stage_name, step_name, status, error_message, start_time, end_time, records_processed, execution_time, month_year):
    log_data = [(program_name, stage_name, step_name, status, error_message, start_time, end_time, records_processed, execution_time, month_year)]
    schema = ["program_name", "stage_name", "step_name", "status", "error_message", "start_time", "end_time", "records_processed", "execution_time", "month_year"]
    log_df = spark.createDataFrame(log_data, schema=schema)
    log_df.write.mode("append").saveAsTable("log_table")

stage_runner.py: Runs all steps in a stage based on control_table.

from execute_query import execute_step
from log_status import log_to_table

def run_stage(spark, program_name, stage_name, month_year):
    control_df = spark.sql(f"SELECT * FROM control_table WHERE program_name='{program_name}' AND stage_name='{stage_name}' AND month_year='{month_year}' ORDER BY step_priority")
    steps = control_df.collect()

    for step in steps:
        try:
            start_time = datetime.now()
            records_processed = execute_step(spark, step)
            end_time = datetime.now()
            log_to_table(spark, program_name, stage_name, step["step_name"], "SUCCESS", None, start_time, end_time, records_processed, (end_time-   start_time).total_seconds(), month_year)
        except Exception as e:
            log_to_table(spark, program_name, stage_name, step["step_name"], "FAILED", str(e), start_time, datetime.now(), 0, 0, month_year)
            raise

job_orchestrator.py: Orchestrates stages of a program.

from stage_runner import run_stage

def orchestrate_program(spark, program_name, month_year):
    stages = spark.sql(f"SELECT DISTINCT stage_name FROM control_table WHERE program_name='{program_name}' AND month_year='{month_year}' ORDER BY stage_priority").collect()
    for stage in stages:
        run_stage(spark, program_name, stage["stage_name"], month_year)

1.5. logging/

Centralized logging utilities.

logger.py:

import logging

def get_logger(log_file):
    logger = logging.getLogger("ETL Logger")
    logger.setLevel(logging.INFO)
    handler = logging.FileHandler(log_file)
    handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
    logger.addHandler(handler)
    return logger

1.6. utils/

Utility functions for configuration, Spark, and error handling.

config_utils.py: Reads and parses configurations.

import json

def load_config(config_path):
    with open(config_path, 'r') as f:
        return json.load(f)

spark_utils.py: Manages Spark session creation.

from pyspark.sql import SparkSession

def get_spark_session(app_name):
    return SparkSession.builder.appName(app_name).getOrCreate()

2. Integration Plan

  1. Setup Tables:
    • Execute control_table_setup.sql and log_table_setup.sql to set up metadata tables.
    • Populate control_table with configuration for different steps.
  2. Script Execution:
    • Use run_etl.sh to run individual stages dynamically.
    • Define program_name, stage_name, and month_year in the script call.
  3. Monthly Runs:
    • Use month_year to filter and process the relevant data for monthly runs.
    • Use snapshot_mode and archive options for historical data management.

Let’s add detailed examples for DataFrame queries and error-handling patterns, tailored for your ETL structure.


1. DataFrame Queries

1.1. Filter and Transform Example

Use this for filtering and adding new columns with transformations.

from pyspark.sql import functions as F

def filter_and_transform_data(spark):
    # Load data from a table or view
    df = spark.table("cibil_filtered_view")

    # Apply filters
    filtered_df = df.filter((F.col("score") > 700) & (F.col("region") == "North"))

    # Add a new column with transformations
    transformed_df = filtered_df.withColumn("risk_score", F.col("score") * 1.1)

    # Save the output
    transformed_df.write.mode("overwrite").saveAsTable("transformed_cibil_data")

    return transformed_df.count()

1.2. Aggregation Example

Perform summary statistics or group-by operations.

def aggregate_data(spark):
    # Load data from a table
    df = spark.table("risk_data")

    # Perform aggregations
    aggregated_df = df.groupBy("region").agg(
        F.avg("risk_score").alias("avg_risk_score"),
        F.sum("loan_amount").alias("total_loan_amount"),
    )

    # Save the aggregated data
    aggregated_df.write.mode("overwrite").saveAsTable("aggregated_risk_data")

    return aggregated_df.count()

1.3. Join Example

Combine data from multiple sources.

def join_data_sources(spark):
    # Load data from two sources
    cibil_df = spark.table("cibil_data")
    loan_df = spark.table("loan_data")

    # Perform an inner join
    joined_df = cibil_df.join(loan_df, cibil_df["customer_id"] == loan_df["customer_id"], "inner")

    # Filter and select relevant columns
    result_df = joined_df.select(
        "customer_id",
        "score",
        "loan_amount",
        F.when(F.col("score") > 750, "Low Risk").otherwise("High Risk").alias("risk_category")
    )

    # Save the result
    result_df.write.mode("overwrite").saveAsTable("joined_customer_data")

    return result_df.count()

2. Error-Handling Patterns

2.1. Try-Except Block for DataFrame Operations

Wrap critical operations in a try-except block to handle failures gracefully.

def safe_execute_query(spark, query, output_table):
    try:
        # Execute query
        df = spark.sql(query)
        
        # Write to output table
        df.write.mode("overwrite").saveAsTable(output_table)
        
        print(f"Query executed successfully, data saved to {output_table}")
        return df.count()
    except Exception as e:
        print(f"Failed to execute query: {str(e)}")
        return 0

2.2. Logging Errors with Context

Capture errors and log them with relevant context.

from datetime import datetime
from pyspark.sql.utils import AnalysisException

def execute_with_logging(spark, logger, query, output_table):
    start_time = datetime.now()
    try:
        # Execute query
        df = spark.sql(query)
        df.write.mode("overwrite").saveAsTable(output_table)

        end_time = datetime.now()
        logger.info(f"Query executed successfully: {query}")
        logger.info(f"Records processed: {df.count()}, Time taken: {end_time - start_time}")
    except AnalysisException as ae:
        logger.error(f"Analysis error in query execution: {query}, Error: {str(ae)}")
    except Exception as e:
        logger.error(f"Unhandled error in query execution: {query}, Error: {str(e)}")

2.3. Retrying Logic for Fault-Tolerance

Retry failed steps with a maximum retry limit.

import time

def retry_with_backoff(func, max_retries=3, delay=5, *args, **kwargs):
    attempts = 0
    while attempts < max_retries:
        try:
            return func(*args, **kwargs)
        except Exception as e:
            attempts += 1
            print(f"Attempt {attempts} failed with error: {str(e)}")
            if attempts == max_retries:
                print("Max retries reached, aborting.")
                raise
            time.sleep(delay)

Usage:

retry_with_backoff(safe_execute_query, max_retries=3, delay=10, spark=spark, query="SELECT * FROM invalid_table", output_table="output_table")

3. Integrated Example

A complete step that incorporates DataFrame operations and error handling with logging and retries.

from logger import get_logger

def process_stage_with_error_handling(spark, query, output_table, logger):
    try:
        # Attempt execution with retries
        retry_with_backoff(safe_execute_query, max_retries=3, delay=5, spark=spark, query=query, output_table=output_table)
        logger.info(f"Successfully processed table {output_table}")
    except Exception as e:
        logger.error(f"Stage failed for table {output_table}: {str(e)}")
        raise

Full Pipeline Script Using Integrated Example:

from spark_utils import get_spark_session
from logger import get_logger
from config_utils import load_config

def main():
    # Initialize
    spark = get_spark_session("ETL Pipeline")
    logger = get_logger("./log/execution_logs.log")
    config = load_config("./config/base_config.json")
    
    query = "SELECT * FROM cibil_filtered_view WHERE score > 700"
    output_table = "high_score_customers"
    
    # Process stage
    process_stage_with_error_handling(spark, query, output_table, logger)
    
if __name__ == "__main__":
    main()

These examples ensure:

  1. Clear error handling for every step.
  2. Retry mechanisms for resilience.
  3. A consistent structure for DataFrame transformations and operations.


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Subscribe