Project 3


1. Project Structure

etl_project/
├── config/
│   ├── base_config.json
│   ├── product_configs/
│   │   ├── pl_cibil750.json
├── control/
│   ├── control_table_setup.sql
│   ├── sample_control_table_data.sql
├── scripts/
│   ├── run_etl.sh
│   ├── run_etl.py
├── etl/
│   ├── execute_query.py
│   ├── log_status.py
│   ├── stage_runner.py
│   ├── job_orchestrator.py
├── logging/
│   ├── logger.py
├── utils/
│   ├── spark_utils.py
│   ├── query_utils.py
│   ├── data_utils.py
│   ├── config_utils.py
│   ├── error_handling.py
└── README.md

2. Configuration Files

a. base_config.json

jsonCopy code{
  "hive_metastore": "thrift://localhost:9083",
  "spark_master": "local[*]",
  "log_level": "INFO",
  "oracle_host": "oracle-host",
  "oracle_port": "1521",
  "oracle_service_name": "orcl",
  "oracle_user": "oracle_user",
  "oracle_password": "oracle_password"
}

b. pl_cibil750.json

jsonCopy code{
  "program_name": "CIBIL 750 Program",
  "stages": [
    {
      "stage_name": "CIBIL Filter",
      "steps": [
        {
          "step_name": "Read CIBIL Data",
          "operation_type": "SQL",
          "query": "SELECT * FROM cibil_data WHERE score > 750",
          "temp_view_name": "cibil_filtered_view"
        }
      ]
    },
    {
      "stage_name": "Risk Filter",
      "steps": [
        {
          "step_name": "Apply Risk Filter",
          "operation_type": "SQL",
          "query": "SELECT * FROM risk_data WHERE risk_level < 3",
          "temp_view_name": "risk_filtered_view"
        }
      ]
    },
    {
      "stage_name": "Final Write",
      "steps": [
        {
          "step_name": "Write to Hive",
          "operation_type": "SQL",
          "query": "INSERT INTO final_table SELECT * FROM temp_table"
        }
      ]
    }
  ]
}

3. Control Table

a. control_table_setup.sql

sqlCopy codeCREATE TABLE IF NOT EXISTS control_table (
  program_name STRING,
  stage_name STRING,
  step_name STRING,
  operation_type STRING,
  query TEXT,
  temp_view_name STRING,
  table_name STRING,
  write_mode STRING,
  status STRING,
  error_message STRING,
  start_time TIMESTAMP,
  end_time TIMESTAMP
);

b. sample_control_table_data.sql

INSERT INTO control_table VALUES 
('CIBIL 750 Program', 'CIBIL Filter', 'Read CIBIL Data', 'SQL', 'SELECT * FROM cibil_data WHERE score > 750', 'cibil_filtered_view', NULL, 'temp_view', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'Risk Filter', 'Apply Risk Filter', 'SQL', 'SELECT * FROM risk_data WHERE risk_level < 3', 'risk_filtered_view', NULL, 'temp_view', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'Final Write', 'Write to Hive', 'SQL', 'INSERT INTO final_table SELECT * FROM temp_table', NULL, 'snapshot', 'PENDING', NULL, NULL, NULL);

4. Scripts

a. run_etl.sh

bashCopy code#!/bin/bash

CONFIG_PATH="config/base_config.json"
PROGRAM_NAME="CIBIL 750 Program"

spark-submit --master local[*] --deploy-mode client scripts/run_etl.py $PROGRAM_NAME $CONFIG_PATH

b. run_etl.py

pythonCopy codeimport sys
from etl.job_orchestrator import run_program

if __name__ == "__main__":
    program_name = sys.argv[1]
    config_path = sys.argv[2]
    run_program(program_name, config_path)

c. etl/execute_query.py

pythonCopy codefrom datetime import datetime
from utils.query_utils import execute_sql_query, execute_dataframe_operation

def execute_step(spark, step, config):
    operation_type = step["operation_type"]
    query = step["query"]
    temp_view_name = step["temp_view_name"]
    table_name = step["table_name"]
    write_mode = step["write_mode"]

    start_time = datetime.now()
    try:
        if operation_type == "SQL":
            execute_sql_query(spark, query, temp_view_name, table_name, write_mode)
        end_time = datetime.now()
        return "SUCCESS", None, start_time, end_time
    except Exception as e:
        end_time = datetime.now()
        return "FAILED", str(e), start_time, end_time

d. etl/log_status.py

pythonCopy codedef log_status_to_control_table(spark, program_name, stage_name, step_name, status, error_message, start_time, end_time):
    spark.sql(f"""
        INSERT INTO control_table (program_name, stage_name, step_name, status, error_message, start_time, end_time)
        VALUES ('{program_name}', '{stage_name}', '{step_name}', '{status}', '{error_message}', '{start_time}', '{end_time}')
    """)

e. etl/stage_runner.py

pythonCopy codefrom etl.execute_query import execute_step
from etl.log_status import log_status_to_control_table
from utils.config_utils import get_control_table_steps
from utils.spark_utils import get_spark_session

def run_stage(program_name, stage_name, base_config):
    spark = get_spark_session()
    steps = get_control_table_steps(spark, program_name, stage_name)

    for step in steps.collect():
        step_name = step["step_name"]
        status, error_message, start_time, end_time = execute_step(spark, step, base_config)
        log_status_to_control_table(spark, program_name, stage_name, step_name, status, error_message, start_time, end_time)
    
    spark.stop()

f. etl/job_orchestrator.py

pythonCopy codefrom etl.stage_runner import run_stage
from utils.config_utils import load_json_config
from utils.spark_utils import get_spark_session

def run_program(program_name, config_path):
    config = load_json_config(config_path)
    spark = get_spark_session()
    
    stages = spark.sql(f"SELECT DISTINCT stage_name FROM control_table WHERE program_name='{program_name}'")
    for stage in stages.collect():
        run_stage(program_name, stage["stage_name"], config)
    
    spark.stop()

5. Utilities

a. utils/spark_utils.py

pythonCopy codefrom pyspark.sql import SparkSession

def get_spark_session(app_name="ETL", master="local[*]"):
    return SparkSession.builder \
        .appName(app_name) \
        .master(master) \
        .enableHiveSupport() \
        .getOrCreate()

b. utils/config_utils.py

import json

def load_json_config(config_path):
    with open(config_path, 'r') as file:
        return json.load(file)

def get_control_table_steps(spark, program_name, stage_name):
    query = f"SELECT * FROM control_table WHERE program_name='{program_name}' AND stage_name='{stage_name}'"
    return spark.sql(query)

Missing Utility Files

a. data_utils.py

pythonCopy codedef custom_transform_function(df):
    """
    Example custom transformation function for DataFrame operations.
    Modify this as per the specific business logic.
    """
    return df.filter("score > 700").withColumnRenamed("old_column_name", "new_column_name")

b. config_utils.py

import json

def load_json_config(config_path):
    """
    Load configuration from a JSON file.
    """
    with open(config_path, 'r') as file:
        return json.load(file)

def get_control_table_steps(spark, program_name, stage_name):
    """
    Fetch steps for a specific stage and program from the control table.
    """
    query = f"SELECT * FROM control_table WHERE program_name='{program_name}' AND stage_name='{stage_name}'"
    return spark.sql(query)

c. error_handling.py

def log_error(error_message, logger):
    """
    Log error messages to a logging service or file.
    """
    logger.error(f"Error: {error_message}")
    return {"status": "FAILED", "message": error_message}

2. Updated Control Table Data

Control Table SQL

INSERT INTO control_table VALUES 
('CIBIL 750 Program', 'CIBIL Filter', 'Read CIBIL Data', 'SQL', 'SELECT * FROM cibil_data WHERE score > 750', 'cibil_filtered_view', NULL, 'temp_view', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'CIBIL Filter', 'Apply Risk Filter', 'SQL', 'SELECT * FROM risk_data WHERE risk_level < 3', 'risk_filtered_view', NULL, 'temp_view', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'Dataframe Operation', 'Custom Transform', 'DataFrame', 'custom_transform_function()', 'custom_transformed_view', NULL, 'temp_view', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'Oracle Read', 'Read Oracle Data', 'Oracle', 'SELECT * FROM oracle_table WHERE condition = value', 'oracle_view', NULL, 'temp_view', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'Final Write', 'Write to Hive', 'SQL', 'INSERT INTO final_table SELECT * FROM temp_table', NULL, 'snapshot', 'PENDING', NULL, NULL, NULL);

3. Enhanced execute_query.py

This handles all types of operations dynamically, including SQL, DataFrame operations, and Oracle reads.

from utils.query_utils import execute_sql_query, execute_dataframe_operation
from utils.data_utils import custom_transform_function

def execute_step(spark, step, config):
    """
    Execute a single step in the ETL process.
    Supports multiple operation types: SQL, DataFrame, Oracle.
    """
    operation_type = step["operation_type"]
    query = step["query"]
    temp_view_name = step.get("temp_view_name")
    table_name = step.get("table_name")
    write_mode = step.get("write_mode")

    if operation_type == "SQL":
        # Execute a SQL query
        execute_sql_query(spark, query, temp_view_name, table_name, write_mode)
    elif operation_type == "DataFrame":
        # Execute a DataFrame operation
        if query == "custom_transform_function()":
            df = spark.table(temp_view_name)  # Assume input is already registered as a view
            transformed_df = custom_transform_function(df)
            transformed_df.createOrReplaceTempView(temp_view_name)
    elif operation_type == "Oracle":
        # Example: Reading from Oracle
        oracle_jdbc_url = f"jdbc:oracle:thin:@{config['oracle_host']}:{config['oracle_port']}:{config['oracle_service_name']}"
        df = spark.read \
            .format("jdbc") \
            .option("url", oracle_jdbc_url) \
            .option("dbtable", f"({query})") \
            .option("user", config["oracle_user"]) \
            .option("password", config["oracle_password"]) \
            .load()
        df.createOrReplaceTempView(temp_view_name)
    else:
        raise ValueError(f"Unsupported operation type: {operation_type}")

4. Updated query_utils.py

def execute_sql_query(spark, query, temp_view_name=None, table_name=None, write_mode=None):
    """
    Execute SQL query and optionally register as a temporary view or write to a table.
    """
    df = spark.sql(query)

    if temp_view_name:
        df.createOrReplaceTempView(temp_view_name)

    if table_name:
        if write_mode == "overwrite":
            df.write.mode("overwrite").saveAsTable(table_name)
        elif write_mode == "append":
            df.write.mode("append").saveAsTable(table_name)
        else:
            raise ValueError(f"Unsupported write mode: {write_mode}")

5. Logging Enhancements

Logging Script (logging/logger.py)

import logging

def get_logger(name):
    """
    Set up a logger with a specific name and INFO level logging.
    """
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    console_handler = logging.StreamHandler()
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)
    return logger

6. Enhanced ETL Process

Stage Runner

from etl.execute_query import execute_step
from etl.log_status import log_status_to_control_table
from utils.config_utils import get_control_table_steps
from utils.spark_utils import get_spark_session
from logging.logger import get_logger

logger = get_logger("ETL")

def run_stage(program_name, stage_name, base_config):
    """
    Execute all steps in a stage, logging the results to the control table.
    """
    spark = get_spark_session()
    steps = get_control_table_steps(spark, program_name, stage_name)

    for step in steps.collect():
        step_name = step["step_name"]
        try:
            execute_step(spark, step, base_config)
            log_status_to_control_table(spark, program_name, stage_name, step_name, "SUCCESS", None, start_time, end_time)
            logger.info(f"Step {step_name} completed successfully.")
        except Exception as e:
            log_status_to_control_table(spark, program_name, stage_name, step_name, "FAILED", str(e), start_time, end_time)
            logger.error(f"Error in step {step_name}: {e}")
    
    spark.stop()

This setup dynamically supports various operations on data, whether they are SQL queries, DataFrame transformations, or Oracle reads, and logs all actions and errors effectively. Let me know if further adjustments are needed!

Here’s how the updated setup can handle the addition of the snapshot mode and make it configurable directly through the control table.


1. Updated execute_sql_query to Support snapshot Mode

This updated function dynamically handles temp_view, table, and snapshot modes based on control table configuration.

from datetime import datetime

def execute_sql_query(spark, query, temp_view_name=None, table_name=None, write_mode=None):
    """
    Execute SQL query and handle different write modes: temp_view, table, snapshot.
    """
    df = spark.sql(query)

    if write_mode == "temp_view":
        df.createOrReplaceTempView(temp_view_name)
    elif write_mode == "table":
        df.write.mode("overwrite").saveAsTable(table_name)
    elif write_mode == "snapshot":
        current_time = datetime.now().strftime("%Y%m%d%H%M")
        snapshot_table_name = f"{table_name}_{current_time}"
        df.write.mode("overwrite").saveAsTable(snapshot_table_name)
        df.createOrReplaceTempView(f"{table_name}_view")
    elif write_mode == "append":
        df.write.mode("append").saveAsTable(table_name)
    else:
        raise ValueError(f"Unsupported write mode: {write_mode}")

2. Updated Control Table SQL

The control table now includes the write_mode column to specify the operation type (temp_view, table, append, snapshot). This ensures flexibility in defining the desired operation.

INSERT INTO control_table VALUES 
('CIBIL 750 Program', 'CIBIL Filter', 'Read CIBIL Data', 'SQL', 'SELECT * FROM cibil_data WHERE score > 750', 'cibil_filtered_view', NULL, 'temp_view', 'PENDING', NULL, NULL, 'temp_view'),
('CIBIL 750 Program', 'CIBIL Filter', 'Apply Risk Filter', 'SQL', 'SELECT * FROM risk_data WHERE risk_level < 3', 'risk_filtered_view', NULL, 'temp_view', 'PENDING', NULL, NULL, 'temp_view'),
('CIBIL 750 Program', 'Dataframe Operation', 'Custom Transform', 'DataFrame', 'custom_transform_function()', 'custom_transformed_view', NULL, 'temp_view', 'PENDING', NULL, NULL, 'temp_view'),
('CIBIL 750 Program', 'Oracle Read', 'Read Oracle Data', 'Oracle', 'SELECT * FROM oracle_table WHERE condition = value', 'oracle_view', NULL, 'temp_view', 'PENDING', NULL, NULL, 'temp_view'),
('CIBIL 750 Program', 'Final Write', 'Write to Hive', 'SQL', 'INSERT INTO final_table SELECT * FROM temp_table', 'final_table', NULL, 'snapshot', 'PENDING', NULL, NULL, 'snapshot');

3. Updated execute_step Function

This integrates the write_mode logic directly from the control table.

def execute_step(spark, step, config):
    """
    Execute a single step in the ETL process, supporting multiple write modes.
    """
    operation_type = step["operation_type"]
    query = step["query"]
    temp_view_name = step.get("temp_view_name")
    table_name = step.get("table_name")
    write_mode = step.get("write_mode")

    if operation_type == "SQL":
        # Execute a SQL query
        execute_sql_query(spark, query, temp_view_name, table_name, write_mode)
    elif operation_type == "DataFrame":
        # Example: Execute a DataFrame operation
        if query == "custom_transform_function()":
            df = spark.table(temp_view_name)  # Assume input is already registered as a view
            transformed_df = custom_transform_function(df)
            transformed_df.createOrReplaceTempView(temp_view_name)
    elif operation_type == "Oracle":
        # Example: Reading from Oracle
        oracle_jdbc_url = f"jdbc:oracle:thin:@{config['oracle_host']}:{config['oracle_port']}:{config['oracle_service_name']}"
        df = spark.read \
            .format("jdbc") \
            .option("url", oracle_jdbc_url) \
            .option("dbtable", f"({query})") \
            .option("user", config["oracle_user"]) \
            .option("password", config["oracle_password"]) \
            .load()
        df.createOrReplaceTempView(temp_view_name)
    else:
        raise ValueError(f"Unsupported operation type: {operation_type}")

4. Updated Control Table Schema

The control table schema includes an additional column write_mode to handle different write behaviors, making it clear and configurable:

Program NameStage NameStep NameOperation TypeQueryTemp View NameTable NameWrite Mode
CIBIL 750 ProgramCIBIL FilterRead CIBIL DataSQLSELECT * FROM cibil_data WHERE score > 750cibil_filtered_viewNULLtemp_view
CIBIL 750 ProgramCIBIL FilterApply Risk FilterSQLSELECT * FROM risk_data WHERE risk_level < 3risk_filtered_viewNULLtemp_view
CIBIL 750 ProgramDataframe OpCustom TransformDataFramecustom_transform_function()custom_transformed_viewNULLtemp_view
CIBIL 750 ProgramOracle ReadRead Oracle DataOracleSELECT * FROM oracle_table WHERE condition = valueoracle_viewNULLtemp_view
CIBIL 750 ProgramFinal WriteWrite to HiveSQLINSERT INTO final_table SELECT * FROM temp_tableNULLfinal_tablesnapshot

This approach ensures maximum flexibility and allows different behaviors such as appending data, overwriting tables, or creating snapshot versions directly through the control table configuration. Let me know if you need additional adjustments!

To accommodate the execution of DataFrame operations where the exact code snippet is passed in the control table, we need to execute the snippet dynamically using exec. Here’s how the solution can handle this:


Updated execute_step Function

This version supports executing code snippets for DataFrame operations.

def execute_step(spark, step, config):
    """
    Execute a single step in the ETL process, supporting SQL, Oracle, and DataFrame operations.
    """
    operation_type = step["operation_type"]
    query = step["query"]
    temp_view_name = step.get("temp_view_name")
    table_name = step.get("table_name")
    write_mode = step.get("write_mode")

    if operation_type == "SQL":
        # Execute a SQL query
        execute_sql_query(spark, query, temp_view_name, table_name, write_mode)
    elif operation_type == "DataFrame":
        # Execute a custom DataFrame operation (dynamic snippet execution)
        # Assume 'query' contains the Python code snippet as a string
        exec_locals = {"spark": spark}
        exec(query, globals(), exec_locals)

        # Check if a DataFrame was created in the snippet
        if "df" in exec_locals:
            df = exec_locals["df"]
            if write_mode == "temp_view":
                df.createOrReplaceTempView(temp_view_name)
            elif write_mode == "table":
                df.write.mode("overwrite").saveAsTable(table_name)
            elif write_mode == "append":
                df.write.mode("append").saveAsTable(table_name)
            elif write_mode == "snapshot":
                current_time = datetime.now().strftime("%Y%m%d%H%M")
                snapshot_table_name = f"{table_name}_{current_time}"
                df.write.mode("overwrite").saveAsTable(snapshot_table_name)
                df.createOrReplaceTempView(f"{table_name}_view")
        else:
            raise ValueError("DataFrame operation did not create a DataFrame named 'df'.")
    elif operation_type == "Oracle":
        # Example: Reading from Oracle
        oracle_jdbc_url = f"jdbc:oracle:thin:@{config['oracle_host']}:{config['oracle_port']}:{config['oracle_service_name']}"
        df = spark.read \
            .format("jdbc") \
            .option("url", oracle_jdbc_url) \
            .option("dbtable", f"({query})") \
            .option("user", config["oracle_user"]) \
            .option("password", config["oracle_password"]) \
            .load()
        df.createOrReplaceTempView(temp_view_name)
    else:
        raise ValueError(f"Unsupported operation type: {operation_type}")

Example Control Table Entry for DataFrame Operation

Here’s how a DataFrame operation can be represented in the control table. The query column contains the Python code snippet.

INSERT INTO control_table VALUES 
('CIBIL 750 Program', 'Dataframe Operation', 'Custom Transform', 'DataFrame', 
'
# Sample DataFrame operation code
df = spark.table("cibil_filtered_view")
df = df.withColumn("risk_score", df["score"] * 1.1)
', 
'custom_transformed_view', NULL, 'temp_view', 'PENDING', NULL, NULL, 'temp_view');

Code Snippet in query Column

The query column for DataFrame operations must be a valid Python code snippet. For example:

# Example: Filtering and transforming a DataFrame
df = spark.table("risk_filtered_view")
df = df.filter("risk_level < 2")
df = df.withColumn("adjusted_score", df["score"] * 0.9)

How It Works

  1. Dynamic Execution:
    • The query column for a DataFrame operation is treated as Python code.
    • The code is executed using exec within a controlled environment.
  2. DataFrame Handling:
    • The snippet must create or modify a DataFrame named df.
    • The function handles writing this DataFrame based on the write_mode (e.g., temp_view, table, append, or snapshot).
  3. Flexibility:
    • You can define any DataFrame transformation in the control table without hardcoding it into the application.

Control Table Schema with Example

Program NameStage NameStep NameOperation TypeQueryTemp View NameTable NameWrite Mode
CIBIL 750 ProgramDataframe OperationCustom TransformDataFrame# DataFrame snippet
df = spark.table("cibil_filtered_view")
df = df.withColumn("risk_score", df["score"] * 1.1)
custom_transformed_viewNULLtemp_view

This approach provides flexibility while maintaining a structured ETL pipeline. Let me know if you have more specific requirements!


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Subscribe