Project 5

etl_project/
├── config/
│ ├── base_config.json
├── control/
│ ├── control_table_setup.sql
│ ├── sample_control_table_data.sql
├── scripts/
│ ├── run_etl.sh
│ ├── run_etl.py
├── etl/
│ ├── execute_query.py
│ ├── log_status.py
│ ├── stage_runner.py
│ ├── job_orchestrator.py
├── logging/
│ ├── logger.py
├── utils/
│ ├── spark_utils.py
│ ├── query_utils.py
│ ├── config_utils.py
│ ├── error_handling.py
├── log/
│ ├── execution_logs.log
└── README.md

2. Configuration Files

config/base_config.json

jsonCopy code{
    "spark_config": {
        "appName": "ETL Framework",
        "master": "local[*]"
    },
    "hive_config": {
        "enableHiveSupport": true
    }
}

Database Setup

control_table_setup.sql

CREATE TABLE IF NOT EXISTS control_table (
    program_name STRING,
    product_name STRING,
    stage_name STRING,
    step_name STRING,
    operation_type STRING,
    query TEXT,
    custom_logic TEXT,
    temp_view_name STRING,
    table_name STRING,
    write_mode STRING,
    snapshot_mode STRING,
    status STRING,
    error_message STRING,
    start_time TIMESTAMP,
    end_time TIMESTAMP
);

log_table_setup.sql

CREATE TABLE IF NOT EXISTS log_table (
    log_id BIGINT AUTO_INCREMENT,
    program_name STRING,
    product_name STRING,
    stage_name STRING,
    step_name STRING,
    status STRING,
    error_message STRING,
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    records_processed BIGINT,
    execution_time_seconds DOUBLE
);

Sample Control Table Data

INSERT INTO control_table VALUES
('CIBIL 750 Program', 'Personal Loan', 'CIBIL Filter', 'Read CIBIL Data', 'SQL', 
 'SELECT * FROM cibil_data WHERE score > 750', NULL, 'cibil_filtered_view', NULL, 
 'temp_view', 'overwrite', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'Personal Loan', 'Risk Filter', 'Apply Risk Filter', 'SQL', 
 'SELECT * FROM risk_data WHERE risk_level < 3', NULL, 'risk_filtered_view', NULL, 
 'temp_view', 'append', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'Personal Loan', 'Final Write', 'Write to Hive', 'SQL', 
 'INSERT INTO final_table SELECT * FROM temp_table', NULL, NULL, 'final_table', 
 'snapshot', 'PENDING', NULL, NULL, NULL);

etl/execute_query.py

from datetime import datetime
from utils.query_utils import execute_sql_query
from utils.error_handling import handle_error

def execute_step(spark, step, config):
    operation_type = step["operation_type"]
    query = step["query"]
    custom_logic = step["custom_logic"]
    temp_view_name = step["temp_view_name"]
    table_name = step["table_name"]
    write_mode = step["write_mode"]
    snapshot_mode = step["snapshot_mode"]

    start_time = datetime.now()
    try:
        if operation_type == "SQL":
            execute_sql_query(spark, query, temp_view_name, table_name, write_mode, snapshot_mode)
        elif operation_type == "CustomLogic":
            exec(custom_logic, {"spark": spark})
        else:
            raise ValueError(f"Unsupported operation type: {operation_type}")
        
        end_time = datetime.now()
        return "SUCCESS", None, start_time, end_time
    except Exception as e:
        return "FAILED", str(e), start_time, datetime.now()

etl/log_status.py

def log_status_to_control_table(spark, program_name, product_name, stage_name, step_name, 
                                status, error_message, start_time, end_time, records_processed=None):
    execution_time = (end_time - start_time).total_seconds()
    spark.sql(f"""
        INSERT INTO log_table (
            program_name, product_name, stage_name, step_name, status, error_message, 
            start_time, end_time, records_processed, execution_time_seconds
        )
        VALUES (
            '{program_name}', '{product_name}', '{stage_name}', '{step_name}', 
            '{status}', '{error_message}', '{start_time}', '{end_time}', 
            {records_processed if records_processed else 'NULL'}, {execution_time}
        )
    """)

etl/stage_runner.py

from etl.execute_query import execute_step
from etl.log_status import log_status_to_table

def run_stage(spark, program_name, product_name, config):
    stages = spark.sql(f"""
        SELECT * 
        FROM control_table 
        WHERE program_name = '{program_name}' AND product_name = '{product_name}'
    """).collect()

    for stage in stages:
        stage_name = stage["stage_name"]
        step_name = stage["step_name"]

        status, error_message, start_time, end_time = execute_step(spark, stage.asDict())

        log_status_to_table(
            spark, program_name, product_name, stage_name, step_name, 
            status, error_message, start_time, end_time
        )

etl/job_orchestrator.py

from etl.stage_runner import run_stage
from utils.config_utils import get_control_table_programs
from utils.spark_utils import get_spark_session

def run_etl(config_path):
    spark = get_spark_session()
    programs = get_control_table_programs(spark)

    for program in programs.collect():
        program_name = program["program_name"]
        product_name = program["product_name"]
        run_stage(spark, program_name, product_name, config_path)

    spark.stop()

5. Utility Scripts

utils/spark_utils.py

from pyspark.sql import SparkSession
import json

def get_spark_session(config_path):
    with open(config_path, "r") as f:
        config = json.load(f)
    spark = SparkSession.builder \
        .appName(config["spark_config"]["appName"]) \
        .master(config["spark_config"]["master"]) \
        .enableHiveSupport() if config["hive_config"]["enableHiveSupport"] else SparkSession.builder.getOrCreate()
    return spark

utils/query_utils.py

def execute_sql_query(spark, query, temp_view_name=None, table_name=None, write_mode=None, snapshot_mode=None):
    df = spark.sql(query)

    if temp_view_name:
        df.createOrReplaceTempView(temp_view_name)

    if table_name:
        if write_mode == "overwrite":
            if snapshot_mode == "archive":
                spark.sql(f"INSERT INTO archive_{table_name} SELECT * FROM {table_name}")
            spark.sql(f"TRUNCATE TABLE {table_name}")
            df.write.mode("overwrite").saveAsTable(table_name)
        elif write_mode == "append":
            df.write.mode("append").saveAsTable(table_name)
        else:
            raise ValueError(f"Unsupported write mode: {write_mode}")

utils/error_handling.py

import logging
from logging.logger import get_logger

logger = get_logger("ETL_Error")

def handle_error(error_message):
    logger.error(f"Error occurred: {error_message}")
    raise Exception(error_message)

scripts/run_etl.py

from etl.job_orchestrator import run_etl

if __name__ == "__main__":
    config_path = "config/base_config.json"
    run_etl(config_path)

Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Subscribe