Project 2

etl_project/
├── config/
│ ├── base_config.json # Static environment configurations
│ ├── product_configs/ # Individual configs for each program
│ │ ├── pl_cibil750.json
├── control/
│ ├── control_table_setup.sql # SQL to create the control table
│ └── sample_control_table_data.sql # Sample data for the control table
├── scripts/
│ ├── run_etl.sh # Shell script to orchestrate Spark job submissions
│ ├── run_etl.py # Python script to orchestrate Spark job submissions
├── etl/
│ ├── execute_stage.py # Script to execute a single stage dynamically
├── logging/
│ ├── logger.py # Centralized logging utility
├── utils/
│ ├── query_utils.py # Query execution and validation
│ ├── data_utils.py # Data I/O operations
│ ├── config_utils.py # Utility to read configs and control tables
│ └── error_handling.py # Error logging and recovery
└── README.md

Configuration Files

base_config.json (Static Environment Configurations)

json

{
  "hive_metastore": "thrift://localhost:9083",
  "spark_master": "local[*]",
  "log_level": "INFO",
  "logging_service_url": "http://logging-service/api/logs"
}

product_configs/pl_cibil750.json (Individual Configs for Each Program)

json

{
  "program_name": "CIBIL 750 Program",
  "stages": [
    {
      "stage_name": "CIBIL Filter",
      "steps": [
        {
          "step_name": "Read CIBIL Data",
          "operation_type": "SQL",
          "query": "SELECT * FROM cibil_data WHERE score > 750",
          "temp_view_name": "cibil_filtered_view"
        }
      ]
    },
    {
      "stage_name": "Risk Filter",
      "steps": [
        {
          "step_name": "Apply Risk Filter",
          "operation_type": "SQL",
          "query": "SELECT * FROM risk_data WHERE risk_level < 3",
          "temp_view_name": "risk_filtered_view"
        }
      ]
    },
    {
      "stage_name": "Dataframe Operation",
      "steps": [
        {
          "step_name": "Custom Transform",
          "operation_type": "DataFrame",
          "query": "custom_transform_function()",
          "temp_view_name": "custom_transformed_view"
        }
      ]
    },
    {
      "stage_name": "Oracle Read",
      "steps": [
        {
          "step_name": "Read Oracle Data",
          "operation_type": "Oracle",
          "query": "SELECT * FROM oracle_table WHERE condition = value",
          "temp_view_name": "oracle_view"
        }
      ]
    },
    {
      "stage_name": "Final Write",
      "steps": [
        {
          "step_name": "Write to Hive",
          "operation_type": "SQL",
          "query": "INSERT INTO final_table SELECT * FROM temp_table"
        }
      ]
    }
  ]
}
CREATE TABLE IF NOT EXISTS control_table (
program_name STRING,
stage_name STRING,
step_name STRING,
operation_type STRING,
query TEXT,
temp_view_name STRING,
table_name STRING,
write_mode STRING, -- 'temp_view', 'table', or 'snapshot'
status STRING,
error_message STRING,
start_time TIMESTAMP,
end_time TIMESTAMP
);

Sample Control Table Data

Load sample data into the control table.

sql

INSERT INTO control_table VALUES 
('CIBIL 750 Program', 'CIBIL Filter', 'Read CIBIL Data', 'SQL', 'SELECT * FROM cibil_data WHERE score > 750', 'cibil_filtered_view', NULL, 'temp_view', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'CIBIL Filter', 'Apply Risk Filter', 'SQL', 'SELECT * FROM risk_data WHERE risk_level < 3', 'risk_filtered_view', NULL, 'temp_view', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'Dataframe Operation', 'Custom Transform', 'DataFrame', 'custom_transform_function()', 'custom_transformed_view', NULL, 'temp_view', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'Oracle Read', 'Read Oracle Data', 'Oracle', 'SELECT * FROM oracle_table WHERE condition = value', 'oracle_view', NULL, 'temp_view', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'Final Write', 'Write to Hive', 'SQL', 'INSERT INTO final_table SELECT * FROM temp_table', NULL, 'snapshot', 'PENDING', NULL, NULL, NULL);

Dynamic Stage Execution Script (etl/execute_stage.py)

This script will handle both creating temp views or writing to tables, and will manage final table writes in snapshot mode.

python

from pyspark.sql import SparkSession
import time
from datetime import datetime
import sys
import json

def get_spark_session(config):
    spark = SparkSession.builder 
        .appName("ETL") 
        .master(config["spark_master"]) 
        .enableHiveSupport() 
        .getOrCreate()
    return spark

def execute_query(spark, query, temp_view_name, table_name, write_mode):
    start_time = datetime.now()
    try:
        df = spark.sql(query)
        if write_mode == 'temp_view':
            df.createOrReplaceTempView(temp_view_name)
        elif write_mode == 'table':
            df.write.mode("overwrite").saveAsTable(table_name)
        elif write_mode == 'snapshot':
            current_time = datetime.now().strftime("%Y%m%d%H%M")
            snapshot_table_name = f"{table_name}_{current_time}"
            df.write.mode("overwrite").saveAsTable(snapshot_table_name)
            df.createOrReplaceTempView(f"{table_name}_view")
        end_time = datetime.now()
        return "SUCCESS", None, start_time, end_time
    except Exception as e:
        end_time = datetime.now()
        return "FAILED", str(e), start_time, end_time

def execute_dataframe_operation(spark, operation, temp_view_name, table_name, write_mode):
    start_time = datetime.now()
    try:
        exec(operation)
        if write_mode == 'temp_view':
            spark.catalog.createOrReplaceTempView(temp_view_name)
        elif write_mode == 'table':
            spark.table(temp_view_name).write.mode("overwrite").saveAsTable(table_name)
        elif write_mode == 'snapshot':
            current_time = datetime.now().strftime("%Y%m%d%H%M")
            snapshot_table_name = f"{table_name}_{current_time}"
            spark.table(temp_view_name).write.mode("overwrite").saveAsTable(snapshot_table_name)
            spark.table(temp_view_name).createOrReplaceTempView(f"{table_name}_view")
        end_time = datetime.now()
        return "SUCCESS", None, start_time, end_time
    except Exception as e:
        end_time = datetime.now()
        return "FAILED", str(e), start_time, end_time

def execute_oracle_query(spark, query, temp_view_name, table_name, write_mode, config):
    import cx_Oracle
    start_time = datetime.now()
    try:
        dsn = cx_Oracle.makedsn(config["oracle_host"], config["oracle_port"], service_name=config["oracle_service_name"])
        connection = cx_Oracle.connect(user=config["oracle_user"], password=config["oracle_password"], dsn=dsn)
        cursor = connection.cursor()
        cursor.execute(query)
        df = spark.createDataFrame(cursor)
        if write_mode == 'temp_view':
            df.createOrReplaceTempView(temp_view_name)
        elif write_mode == 'table':
            df.write.mode("overwrite").saveAsTable(table_name)
        elif write_mode == 'snapshot':
            current_time = datetime.now().strftime("%Y%m%d%H%M")
            snapshot_table_name = f"{table_name}_{current_time}"
            df.write.mode("overwrite").saveAsTable(snapshot_table_name)
            df.createOrReplaceTempView(f"{table_name}_view")
        cursor.close()
        connection.close()
        end_time = datetime.now()
        return "SUCCESS", None, start_time, end_time
    except Exception as e:
        end_time = datetime.now()
        return "FAILED", str(e), start_time, end_time

def log_status_to_hive(spark, program_name, stage_name, step_name, query, temp_view_name, table_name, write_mode, status, error_message, start_time, end_time):
    spark.sql(f"""
        INSERT INTO control_table (program_name, stage_name, step_name, query, temp_view_name, table_name, write_mode, status, error_message, start_time, end_time) 
        VALUES ('{program_name}', '{stage_name}', '{step_name}', '{query}', '{temp_view_name}', '{table_name}', '{write_mode}', '{status}', '{error_message}', '{start_time}', '{end_time}')
    """)

def run_stage(stage_name, program_name, base_config):
    spark = get_spark_session(base_config)

    # Fetch steps from control table
    steps = spark.sql(f"SELECT * FROM control_table WHERE program_name='{program_name}' AND stage_name='{stage_name}'")

    for row in steps.collect():
        step_name = row["step_name"]
        operation_type = row["operation_type"]
        query = row["query"]
        temp_view_name = row["temp_view_name"]
        table_name = row["table_name"]
        write_mode = row["write_mode"]
        
        if operation_type == "SQL":
            status, error, start_time, end_time = execute_query(spark, query, temp_view_name, table_name, write_mode)
        elif operation_type == "DataFrame":
            status, error, start_time, end_time = execute_dataframe_operation(spark, query, temp_view_name, table_name, write_mode)
        elif operation_type == "Oracle":
            status, error, start_time, end_time = execute_oracle_query(spark, query, temp_view_name, table_name, write_mode, base_config)
        
        log_status_to_hive(spark, program_name, stage_name, step_name, query, temp_view_name, table_name, write_mode, status, error, start_time, end_time)

    spark.stop()

if __name__ == "__main__":
    stage_name = sys.argv[1]
    program_name = sys.argv[2]
    config_path = sys.argv[3]

    with open(config_path, 'r') as file:
        base_config = json.load(file)

    run_stage(stage_name, program_name, base_config)
import subprocess
import json
from pyspark.sql import SparkSession

def get_spark_session():
    spark = SparkSession.builder 
        .appName("ETL Orchestrator") 
        .enableHiveSupport() 
        .getOrCreate()
    return spark

def submit_spark_job(stage_name, program_name, config_path):
    try:
        print(f"Starting stage: {stage_name} of program: {program_name}")
        result = subprocess.run(['spark-submit', '--master', 'local[*]', '--deploy-mode', 'client', 'etl/execute_stage.py', stage_name, program_name, config_path], check=True)
        print(f"Completed stage: {stage_name} of program: {program_name}")
    except subprocess.CalledProcessError as e:
        print(f"Failed stage: {stage_name} of program: {program_name}")
        sys.exit(1)

def main():
    config_path = "config/base_config.json"
    program_name = "CIBIL 750 Program"

    spark = get_spark_session()
#!/bin/bash

# Base config path
CONFIG_PATH="config/base_config.json"

# Function to submit a Spark job
submit_spark_job() {
  local stage_name=$1
  local program_name=$2
  echo "Starting stage: $stage_name of program: $program_name"
  spark-submit --master local[*] --deploy-mode client etl/execute_stage.py $stage_name $program_name $CONFIG_PATH
  if [ $? -eq 0 ]; then
    echo "Completed stage: $stage_name of program: $program_name"
  else
    echo "Failed stage: $stage_name of program: $program_name"
    exit 1
  fi
}

# Retrieve distinct stages from the control table
STAGES=$(spark-sql --silent --outputformat csv --database mydatabase -e "SELECT DISTINCT stage_name FROM control_table WHERE program_name='CIBIL 750 Program'")

for STAGE in $STAGES
do
  submit_spark_job "$STAGE" "CIBIL 750 Program"
done

echo "ETL process completed."

Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Skills

Posted on

December 22, 2024