etl_project/
├── config/
│ ├── base_config.json
├── control/
│ ├── control_table_setup.sql
│ ├── sample_control_table_data.sql
├── scripts/
│ ├── run_etl.sh
│ ├── run_etl.py
├── etl/
│ ├── execute_query.py
│ ├── log_status.py
│ ├── stage_runner.py
│ ├── job_orchestrator.py
├── logging/
│ ├── logger.py
├── utils/
│ ├── spark_utils.py
│ ├── query_utils.py
│ ├── config_utils.py
│ ├── error_handling.py
├── log/
│ ├── execution_logs.log
└── README.md
2. Configuration Files
config/base_config.json
jsonCopy code{
"spark_config": {
"appName": "ETL Framework",
"master": "local[*]"
},
"hive_config": {
"enableHiveSupport": true
}
}
Database Setup
control_table_setup.sql
CREATE TABLE IF NOT EXISTS control_table (
program_name STRING,
product_name STRING,
stage_name STRING,
step_name STRING,
operation_type STRING,
query TEXT,
custom_logic TEXT,
temp_view_name STRING,
table_name STRING,
write_mode STRING,
snapshot_mode STRING,
status STRING,
error_message STRING,
start_time TIMESTAMP,
end_time TIMESTAMP
);
log_table_setup.sql
CREATE TABLE IF NOT EXISTS log_table (
log_id BIGINT AUTO_INCREMENT,
program_name STRING,
product_name STRING,
stage_name STRING,
step_name STRING,
status STRING,
error_message STRING,
start_time TIMESTAMP,
end_time TIMESTAMP,
records_processed BIGINT,
execution_time_seconds DOUBLE
);
Sample Control Table Data
INSERT INTO control_table VALUES
('CIBIL 750 Program', 'Personal Loan', 'CIBIL Filter', 'Read CIBIL Data', 'SQL',
'SELECT * FROM cibil_data WHERE score > 750', NULL, 'cibil_filtered_view', NULL,
'temp_view', 'overwrite', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'Personal Loan', 'Risk Filter', 'Apply Risk Filter', 'SQL',
'SELECT * FROM risk_data WHERE risk_level < 3', NULL, 'risk_filtered_view', NULL,
'temp_view', 'append', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'Personal Loan', 'Final Write', 'Write to Hive', 'SQL',
'INSERT INTO final_table SELECT * FROM temp_table', NULL, NULL, 'final_table',
'snapshot', 'PENDING', NULL, NULL, NULL);
etl/execute_query.py
from datetime import datetime
from utils.query_utils import execute_sql_query
from utils.error_handling import handle_error
def execute_step(spark, step, config):
operation_type = step["operation_type"]
query = step["query"]
custom_logic = step["custom_logic"]
temp_view_name = step["temp_view_name"]
table_name = step["table_name"]
write_mode = step["write_mode"]
snapshot_mode = step["snapshot_mode"]
start_time = datetime.now()
try:
if operation_type == "SQL":
execute_sql_query(spark, query, temp_view_name, table_name, write_mode, snapshot_mode)
elif operation_type == "CustomLogic":
exec(custom_logic, {"spark": spark})
else:
raise ValueError(f"Unsupported operation type: {operation_type}")
end_time = datetime.now()
return "SUCCESS", None, start_time, end_time
except Exception as e:
return "FAILED", str(e), start_time, datetime.now()
etl/log_status.py
def log_status_to_control_table(spark, program_name, product_name, stage_name, step_name,
status, error_message, start_time, end_time, records_processed=None):
execution_time = (end_time - start_time).total_seconds()
spark.sql(f"""
INSERT INTO log_table (
program_name, product_name, stage_name, step_name, status, error_message,
start_time, end_time, records_processed, execution_time_seconds
)
VALUES (
'{program_name}', '{product_name}', '{stage_name}', '{step_name}',
'{status}', '{error_message}', '{start_time}', '{end_time}',
{records_processed if records_processed else 'NULL'}, {execution_time}
)
""")
etl/stage_runner.py
from etl.execute_query import execute_step
from etl.log_status import log_status_to_table
def run_stage(spark, program_name, product_name, config):
stages = spark.sql(f"""
SELECT *
FROM control_table
WHERE program_name = '{program_name}' AND product_name = '{product_name}'
""").collect()
for stage in stages:
stage_name = stage["stage_name"]
step_name = stage["step_name"]
status, error_message, start_time, end_time = execute_step(spark, stage.asDict())
log_status_to_table(
spark, program_name, product_name, stage_name, step_name,
status, error_message, start_time, end_time
)
etl/job_orchestrator.py
from etl.stage_runner import run_stage
from utils.config_utils import get_control_table_programs
from utils.spark_utils import get_spark_session
def run_etl(config_path):
spark = get_spark_session()
programs = get_control_table_programs(spark)
for program in programs.collect():
program_name = program["program_name"]
product_name = program["product_name"]
run_stage(spark, program_name, product_name, config_path)
spark.stop()
5. Utility Scripts
utils/spark_utils.py
from pyspark.sql import SparkSession
import json
def get_spark_session(config_path):
with open(config_path, "r") as f:
config = json.load(f)
spark = SparkSession.builder \
.appName(config["spark_config"]["appName"]) \
.master(config["spark_config"]["master"]) \
.enableHiveSupport() if config["hive_config"]["enableHiveSupport"] else SparkSession.builder.getOrCreate()
return spark
utils/query_utils.py
def execute_sql_query(spark, query, temp_view_name=None, table_name=None, write_mode=None, snapshot_mode=None):
df = spark.sql(query)
if temp_view_name:
df.createOrReplaceTempView(temp_view_name)
if table_name:
if write_mode == "overwrite":
if snapshot_mode == "archive":
spark.sql(f"INSERT INTO archive_{table_name} SELECT * FROM {table_name}")
spark.sql(f"TRUNCATE TABLE {table_name}")
df.write.mode("overwrite").saveAsTable(table_name)
elif write_mode == "append":
df.write.mode("append").saveAsTable(table_name)
else:
raise ValueError(f"Unsupported write mode: {write_mode}")
utils/error_handling.py
import logging
from logging.logger import get_logger
logger = get_logger("ETL_Error")
def handle_error(error_message):
logger.error(f"Error occurred: {error_message}")
raise Exception(error_message)
scripts/run_etl.py
from etl.job_orchestrator import run_etl
if __name__ == "__main__":
config_path = "config/base_config.json"
run_etl(config_path)