Project 4

etl_project/
├── config/
│   ├── base_config.json
│   ├── product_configs/
│   │   ├── pl_cibil750.json
├── control/
│   ├── control_table_setup.sql
│   ├── sample_control_table_data.sql
├── scripts/
│   ├── run_etl.sh
│   ├── run_etl.py
├── etl/
│   ├── execute_query.py
│   ├── log_status.py
│   ├── stage_runner.py
│   ├── job_orchestrator.py
├── logging/
│   ├── logger.py
├── utils/
│   ├── spark_utils.py
│   ├── query_utils.py
│   ├── data_utils.py
│   ├── config_utils.py
│   ├── error_handling.py
└── README.md

2. Configuration Files

a. base_config.json

jsonCopy code{
  "hive_metastore": "thrift://localhost:9083",
  "spark_master": "local[*]",
  "log_level": "INFO",
  "oracle_host": "oracle-host",
  "oracle_port": "1521",
  "oracle_service_name": "orcl",
  "oracle_user": "oracle_user",
  "oracle_password": "oracle_password"
}

b. pl_cibil750.json

{
  "program_name": "CIBIL 750 Program",
  "stages": [
    {
      "stage_name": "CIBIL Filter",
      "steps": [
        {
          "step_name": "Read CIBIL Data",
          "operation_type": "SQL",
          "query": "SELECT * FROM cibil_data WHERE score > 750",
          "temp_view_name": "cibil_filtered_view"
        }
      ]
    },
    {
      "stage_name": "Risk Filter",
      "steps": [
        {
          "step_name": "Apply Risk Filter",
          "operation_type": "SQL",
          "query": "SELECT * FROM risk_data WHERE risk_level < 3",
          "temp_view_name": "risk_filtered_view"
        }
      ]
    },
    {
      "stage_name": "Final Write",
      "steps": [
        {
          "step_name": "Write to Hive",
          "operation_type": "SQL",
          "query": "INSERT INTO final_table SELECT * FROM temp_table"
        }
      ]
    }
  ]
}

3. Control Table

a. control_table_setup.sql

The control table includes the write_mode column to specify the operation type (temp_viewtableappendsnapshot). This ensures flexibility in defining the desired operation.

INSERT INTO control_table VALUES 
('CIBIL 750 Program', 'CIBIL Filter', 'Read CIBIL Data', 'SQL', 'SELECT * FROM cibil_data WHERE score > 750', 'cibil_filtered_view', NULL, 'temp_view', 'PENDING', NULL, NULL, 'temp_view'),
('CIBIL 750 Program', 'CIBIL Filter', 'Apply Risk Filter', 'SQL', 'SELECT * FROM risk_data WHERE risk_level < 3', 'risk_filtered_view', NULL, 'temp_view', 'PENDING', NULL, NULL, 'temp_view'),
('CIBIL 750 Program', 'Dataframe Operation', 'Custom Transform', 'DataFrame', 'custom_transform_function()', 'custom_transformed_view', NULL, 'temp_view', 'PENDING', NULL, NULL, 'temp_view'),
('CIBIL 750 Program', 'Oracle Read', 'Read Oracle Data', 'Oracle', 'SELECT * FROM oracle_table WHERE condition = value', 'oracle_view', NULL, 'temp_view', 'PENDING', NULL, NULL, 'temp_view'),
('CIBIL 750 Program', 'Final Write', 'Write to Hive', 'SQL', 'INSERT INTO final_table SELECT * FROM temp_table', 'final_table', NULL, 'snapshot', 'PENDING', NULL, NULL, 'snapshot');
INSERT INTO control_table VALUES
('CIBIL 750 Program', 'Dataframe Operation', 'Custom Transform', 'DataFrame',
'
# Sample DataFrame operation code
df = spark.table("cibil_filtered_view")
df = df.withColumn("risk_score", df["score"] * 1.1)
',
'custom_transformed_view', NULL, 'temp_view', 'PENDING', NULL, NULL, 'temp_view');


Program NameStage NameStep NameOperation TypeQueryTemp View NameTable NameWrite Mode
CIBIL 750 ProgramCIBIL FilterRead CIBIL DataSQLSELECT * FROM cibil_data WHERE score > 750cibil_filtered_viewNULLtemp_view
CIBIL 750 ProgramCIBIL FilterApply Risk FilterSQLSELECT * FROM risk_data WHERE risk_level < 3risk_filtered_viewNULLtemp_view
CIBIL 750 ProgramDataframe OpCustom TransformDataFramecustom_transform_function()custom_transformed_viewNULLtemp_view
CIBIL 750 ProgramOracle ReadRead Oracle DataOracleSELECT * FROM oracle_table WHERE condition = valueoracle_viewNULLtemp_view
CIBIL 750 ProgramFinal WriteWrite to HiveSQLINSERT INTO final_table SELECT * FROM temp_tableNULLfinal_tablesnapshot
Program NameStage NameStep NameOperation TypeQueryTemp View NameTable NameWrite Mode
CIBIL 750 ProgramDataframe OperationCustom TransformDataFrame# DataFrame snippet
df = spark.table("cibil_filtered_view")
df = df.withColumn("risk_score", df["score"] * 1.1)
custom_transformed_viewNULLtemp_view

4. Scripts

a. run_etl.sh

#!/bin/bash

CONFIG_PATH="config/base_config.json"
PROGRAM_NAME="CIBIL 750 Program"

spark-submit --master local[*] --deploy-mode client scripts/run_etl.py $PROGRAM_NAME $CONFIG_PATH

b. run_etl.py

import sys
from etl.job_orchestrator import run_program

if __name__ == "__main__":
    program_name = sys.argv[1]
    config_path = sys.argv[2]
    run_program(program_name, config_path)

c. etl/execute_query.py


from utils.query_utils import execute_sql_query, execute_dataframe_operation
from utils.data_utils import custom_transform_function

def execute_step(spark, step, config):
    """
    Execute a single step in the ETL process, supporting multiple write modes.
    """
    operation_type = step["operation_type"]
    query = step["query"]
    temp_view_name = step.get("temp_view_name")
    table_name = step.get("table_name")
    write_mode = step.get("write_mode")

    if operation_type == "SQL":
        # Execute a SQL query
        execute_sql_query(spark, query, temp_view_name, table_name, write_mode)
    elif operation_type == "DataFrame":
        # Example: Execute a DataFrame operation
        if query == "custom_transform_function()":
            df = spark.table(temp_view_name)  # Assume input is already registered as a view
            transformed_df = custom_transform_function(df)
            transformed_df.createOrReplaceTempView(temp_view_name)
    elif operation_type == "DataFrame":
        # Execute a custom DataFrame operation (dynamic snippet execution)
        # Assume 'query' contains the Python code snippet as a string
        exec_locals = {"spark": spark}
        exec(query, globals(), exec_locals)

        # Check if a DataFrame was created in the snippet
        if "df" in exec_locals:
            df = exec_locals["df"]
            if write_mode == "temp_view":
                df.createOrReplaceTempView(temp_view_name)
            elif write_mode == "table":
                df.write.mode("overwrite").saveAsTable(table_name)
            elif write_mode == "append":
                df.write.mode("append").saveAsTable(table_name)
            elif write_mode == "snapshot":
                current_time = datetime.now().strftime("%Y%m%d%H%M")
                snapshot_table_name = f"{table_name}_{current_time}"
                df.write.mode("overwrite").saveAsTable(snapshot_table_name)
                df.createOrReplaceTempView(f"{table_name}_view")
        else:
            raise ValueError("DataFrame operation did not create a DataFrame named 'df'.") 
    elif operation_type == "Oracle":
        # Example: Reading from Oracle
        oracle_jdbc_url = f"jdbc:oracle:thin:@{config['oracle_host']}:{config['oracle_port']}:{config['oracle_service_name']}"
        df = spark.read 
            .format("jdbc") 
            .option("url", oracle_jdbc_url) 
            .option("dbtable", f"({query})") 
            .option("user", config["oracle_user"]) 
            .option("password", config["oracle_password"]) 
            .load()
        df.createOrReplaceTempView(temp_view_name)
    else:
        raise ValueError(f"Unsupported operation type: {operation_type}")

d. etl/log_status.py

def log_status_to_control_table(spark, program_name, stage_name, step_name, status, error_message, start_time, end_time):
    spark.sql(f"""
        INSERT INTO control_table (program_name, stage_name, step_name, status, error_message, start_time, end_time)
        VALUES ('{program_name}', '{stage_name}', '{step_name}', '{status}', '{error_message}', '{start_time}', '{end_time}')
    """)
##Logging Enhancements
Logging Script (logging/logger.py)
import logging

def get_logger(name):
    """
    Set up a logger with a specific name and INFO level logging.
    """
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    console_handler = logging.StreamHandler()
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)
    return logger

e. etl/stage_runner.py

from etl.execute_query import execute_step
from etl.log_status import log_status_to_control_table
from utils.config_utils import get_control_table_steps
from utils.spark_utils import get_spark_session

def run_stage(program_name, stage_name, base_config):
    spark = get_spark_session()
    steps = get_control_table_steps(spark, program_name, stage_name)

    for step in steps.collect():
        step_name = step["step_name"]
        status, error_message, start_time, end_time = execute_step(spark, step, base_config)
        log_status_to_control_table(spark, program_name, stage_name, step_name, status, error_message, start_time, end_time)
    
    spark.stop()
updated Stage Runner
from etl.execute_query import execute_step
from etl.log_status import log_status_to_control_table
from utils.config_utils import get_control_table_steps
from utils.spark_utils import get_spark_session
from logging.logger import get_logger

logger = get_logger("ETL")

def run_stage(program_name, stage_name, base_config):
    """
    Execute all steps in a stage, logging the results to the control table.
    """
    spark = get_spark_session()
    steps = get_control_table_steps(spark, program_name, stage_name)

    for step in steps.collect():
        step_name = step["step_name"]
        try:
            execute_step(spark, step, base_config)
            log_status_to_control_table(spark, program_name, stage_name, step_name, "SUCCESS", None, start_time, end_time)
            logger.info(f"Step {step_name} completed successfully.")
        except Exception as e:
            log_status_to_control_table(spark, program_name, stage_name, step_name, "FAILED", str(e), start_time, end_time)
            logger.error(f"Error in step {step_name}: {e}")
    
    spark.stop()

f. etl/job_orchestrator.py

from etl.stage_runner import run_stage
from utils.config_utils import load_json_config
from utils.spark_utils import get_spark_session

def run_program(program_name, config_path):
    config = load_json_config(config_path)
    spark = get_spark_session()
    
    stages = spark.sql(f"SELECT DISTINCT stage_name FROM control_table WHERE program_name='{program_name}'")
    for stage in stages.collect():
        run_stage(program_name, stage["stage_name"], config)
    
    spark.stop()

5. Utilities

a. utils/spark_utils.py

from pyspark.sql import SparkSession

def get_spark_session(app_name="ETL", master="local[*]"):
    return SparkSession.builder 
        .appName(app_name) 
        .master(master) 
        .enableHiveSupport() 
        .getOrCreate()

b. utils/config_utils.py

import json

def load_json_config(config_path):
    with open(config_path, 'r') as file:
        return json.load(file)

def get_control_table_steps(spark, program_name, stage_name):
    query = f"SELECT * FROM control_table WHERE program_name='{program_name}' AND stage_name='{stage_name}'"
    return spark.sql(query)

c. data_utils.py

def custom_transform_function(df):
    """
    Example custom transformation function for DataFrame operations.
    Modify this as per the specific business logic.
    """
    return df.filter("score > 700").withColumnRenamed("old_column_name", "new_column_name")

d1. config_utils.py

import json

def load_json_config(config_path):
    """
    Load configuration from a JSON file.
    """
    with open(config_path, 'r') as file:
        return json.load(file)

def get_control_table_steps(spark, program_name, stage_name):
    """
    Fetch steps for a specific stage and program from the control table.
    """
    query = f"SELECT * FROM control_table WHERE program_name='{program_name}' AND stage_name='{stage_name}'"
    return spark.sql(query)

e. error_handling.py

def log_error(error_message, logger):
    """
    Log error messages to a logging service or file.
    """
    logger.error(f"Error: {error_message}")
    return {"status": "FAILED", "message": error_message}

e. query_utils.py

from datetime import datetime
def execute_dataframe_operation(spark, operation, temp_view_name, table_name, write_mode):
    start_time = datetime.now()
    try:
        exec(operation)
        if write_mode == 'temp_view':
            spark.catalog.createOrReplaceTempView(temp_view_name)
        elif write_mode == 'table':
            spark.table(temp_view_name).write.mode("overwrite").saveAsTable(table_name)
        elif write_mode == 'snapshot':
            current_time = datetime.now().strftime("%Y%m%d%H%M")
            snapshot_table_name = f"{table_name}_{current_time}"
            spark.table(temp_view_name).write.mode("overwrite").saveAsTable(snapshot_table_name)
            spark.table(temp_view_name).createOrReplaceTempView(f"{table_name}_view")
        end_time = datetime.now()
        return "SUCCESS", None, start_time, end_time
    except Exception as e:
        end_time = datetime.now()
        return "FAILED", str(e), start_time, end_time


def execute_sql_query(spark, query, temp_view_name=None, table_name=None, write_mode=None):
    """
    Execute SQL query and handle different write modes: temp_view, table, snapshot.
    """
    df = spark.sql(query)

    if write_mode == "temp_view":
        df.createOrReplaceTempView(temp_view_name)
    elif write_mode == "table":
        df.write.mode("overwrite").saveAsTable(table_name)
    elif write_mode == "snapshot":
        current_time = datetime.now().strftime("%Y%m%d%H%M")
        snapshot_table_name = f"{table_name}_{current_time}"
        df.write.mode("overwrite").saveAsTable(snapshot_table_name)
        df.createOrReplaceTempView(f"{table_name}_view")
    elif write_mode == "append":
        df.write.mode("append").saveAsTable(table_name)
    else:
        raise ValueError(f"Unsupported write mode: {write_mode}")

we willSave the codes in their respective files:

  • base_config.json and pl_cibil750.json in the config/ directory
  • control_table_setup.sql and sample_control_table_data.sql in the control/ directory
  • run_etl.sh and run_etl.py in the scripts/ directory
  • execute_query.py, log_status.py, stage_runner.py, and job_orchestrator.py in the etl/ directory
  • logger.py in the logging/ directory
  • spark_utils.py, query_utils.py, data_utils.py, config_utils.py, and error_handling.py in the utils/ directory

To execute the ETL pipeline, we will run the run_etl.sh script from the scripts/ directory:

Bash

cd etl_project/scripts
./run_etl.sh

Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Skills

Posted on

December 22, 2024