etl_project/
├── config/
│ ├── base_config.json # Static environment configurations
│ ├── product_configs/ # Individual configs for each program
│ │ ├── pl_cibil750.json
├── control/
│ ├── control_table_setup.sql # SQL to create the control table
│ └── sample_control_table_data.sql # Sample data for the control table
├── scripts/
│ ├── run_etl.sh # Shell script to orchestrate Spark job submissions
│ ├── run_etl.py # Python script to orchestrate Spark job submissions
├── etl/
│ ├── execute_stage.py # Script to execute a single stage dynamically
├── logging/
│ ├── logger.py # Centralized logging utility
├── utils/
│ ├── query_utils.py # Query execution and validation
│ ├── data_utils.py # Data I/O operations
│ ├── config_utils.py # Utility to read configs and control tables
│ └── error_handling.py # Error logging and recovery
└── README.md
Configuration Files
base_config.json
(Static Environment Configurations)
json
{
"hive_metastore": "thrift://localhost:9083",
"spark_master": "local[*]",
"log_level": "INFO",
"logging_service_url": "http://logging-service/api/logs"
}
product_configs/pl_cibil750.json
(Individual Configs for Each Program)
json
{
"program_name": "CIBIL 750 Program",
"stages": [
{
"stage_name": "CIBIL Filter",
"steps": [
{
"step_name": "Read CIBIL Data",
"operation_type": "SQL",
"query": "SELECT * FROM cibil_data WHERE score > 750",
"temp_view_name": "cibil_filtered_view"
}
]
},
{
"stage_name": "Risk Filter",
"steps": [
{
"step_name": "Apply Risk Filter",
"operation_type": "SQL",
"query": "SELECT * FROM risk_data WHERE risk_level < 3",
"temp_view_name": "risk_filtered_view"
}
]
},
{
"stage_name": "Dataframe Operation",
"steps": [
{
"step_name": "Custom Transform",
"operation_type": "DataFrame",
"query": "custom_transform_function()",
"temp_view_name": "custom_transformed_view"
}
]
},
{
"stage_name": "Oracle Read",
"steps": [
{
"step_name": "Read Oracle Data",
"operation_type": "Oracle",
"query": "SELECT * FROM oracle_table WHERE condition = value",
"temp_view_name": "oracle_view"
}
]
},
{
"stage_name": "Final Write",
"steps": [
{
"step_name": "Write to Hive",
"operation_type": "SQL",
"query": "INSERT INTO final_table SELECT * FROM temp_table"
}
]
}
]
}
CREATE TABLE IF NOT EXISTS control_table (
program_name STRING,
stage_name STRING,
step_name STRING,
operation_type STRING,
query TEXT,
temp_view_name STRING,
table_name STRING,
write_mode STRING, -- 'temp_view', 'table', or 'snapshot'
status STRING,
error_message STRING,
start_time TIMESTAMP,
end_time TIMESTAMP
);
Sample Control Table Data
Load sample data into the control table.
sql
INSERT INTO control_table VALUES
('CIBIL 750 Program', 'CIBIL Filter', 'Read CIBIL Data', 'SQL', 'SELECT * FROM cibil_data WHERE score > 750', 'cibil_filtered_view', NULL, 'temp_view', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'CIBIL Filter', 'Apply Risk Filter', 'SQL', 'SELECT * FROM risk_data WHERE risk_level < 3', 'risk_filtered_view', NULL, 'temp_view', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'Dataframe Operation', 'Custom Transform', 'DataFrame', 'custom_transform_function()', 'custom_transformed_view', NULL, 'temp_view', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'Oracle Read', 'Read Oracle Data', 'Oracle', 'SELECT * FROM oracle_table WHERE condition = value', 'oracle_view', NULL, 'temp_view', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'Final Write', 'Write to Hive', 'SQL', 'INSERT INTO final_table SELECT * FROM temp_table', NULL, 'snapshot', 'PENDING', NULL, NULL, NULL);
Dynamic Stage Execution Script (etl/execute_stage.py
)
This script will handle both creating temp views or writing to tables, and will manage final table writes in snapshot mode.
python
from pyspark.sql import SparkSession
import time
from datetime import datetime
import sys
import json
def get_spark_session(config):
spark = SparkSession.builder
.appName("ETL")
.master(config["spark_master"])
.enableHiveSupport()
.getOrCreate()
return spark
def execute_query(spark, query, temp_view_name, table_name, write_mode):
start_time = datetime.now()
try:
df = spark.sql(query)
if write_mode == 'temp_view':
df.createOrReplaceTempView(temp_view_name)
elif write_mode == 'table':
df.write.mode("overwrite").saveAsTable(table_name)
elif write_mode == 'snapshot':
current_time = datetime.now().strftime("%Y%m%d%H%M")
snapshot_table_name = f"{table_name}_{current_time}"
df.write.mode("overwrite").saveAsTable(snapshot_table_name)
df.createOrReplaceTempView(f"{table_name}_view")
end_time = datetime.now()
return "SUCCESS", None, start_time, end_time
except Exception as e:
end_time = datetime.now()
return "FAILED", str(e), start_time, end_time
def execute_dataframe_operation(spark, operation, temp_view_name, table_name, write_mode):
start_time = datetime.now()
try:
exec(operation)
if write_mode == 'temp_view':
spark.catalog.createOrReplaceTempView(temp_view_name)
elif write_mode == 'table':
spark.table(temp_view_name).write.mode("overwrite").saveAsTable(table_name)
elif write_mode == 'snapshot':
current_time = datetime.now().strftime("%Y%m%d%H%M")
snapshot_table_name = f"{table_name}_{current_time}"
spark.table(temp_view_name).write.mode("overwrite").saveAsTable(snapshot_table_name)
spark.table(temp_view_name).createOrReplaceTempView(f"{table_name}_view")
end_time = datetime.now()
return "SUCCESS", None, start_time, end_time
except Exception as e:
end_time = datetime.now()
return "FAILED", str(e), start_time, end_time
def execute_oracle_query(spark, query, temp_view_name, table_name, write_mode, config):
import cx_Oracle
start_time = datetime.now()
try:
dsn = cx_Oracle.makedsn(config["oracle_host"], config["oracle_port"], service_name=config["oracle_service_name"])
connection = cx_Oracle.connect(user=config["oracle_user"], password=config["oracle_password"], dsn=dsn)
cursor = connection.cursor()
cursor.execute(query)
df = spark.createDataFrame(cursor)
if write_mode == 'temp_view':
df.createOrReplaceTempView(temp_view_name)
elif write_mode == 'table':
df.write.mode("overwrite").saveAsTable(table_name)
elif write_mode == 'snapshot':
current_time = datetime.now().strftime("%Y%m%d%H%M")
snapshot_table_name = f"{table_name}_{current_time}"
df.write.mode("overwrite").saveAsTable(snapshot_table_name)
df.createOrReplaceTempView(f"{table_name}_view")
cursor.close()
connection.close()
end_time = datetime.now()
return "SUCCESS", None, start_time, end_time
except Exception as e:
end_time = datetime.now()
return "FAILED", str(e), start_time, end_time
def log_status_to_hive(spark, program_name, stage_name, step_name, query, temp_view_name, table_name, write_mode, status, error_message, start_time, end_time):
spark.sql(f"""
INSERT INTO control_table (program_name, stage_name, step_name, query, temp_view_name, table_name, write_mode, status, error_message, start_time, end_time)
VALUES ('{program_name}', '{stage_name}', '{step_name}', '{query}', '{temp_view_name}', '{table_name}', '{write_mode}', '{status}', '{error_message}', '{start_time}', '{end_time}')
""")
def run_stage(stage_name, program_name, base_config):
spark = get_spark_session(base_config)
# Fetch steps from control table
steps = spark.sql(f"SELECT * FROM control_table WHERE program_name='{program_name}' AND stage_name='{stage_name}'")
for row in steps.collect():
step_name = row["step_name"]
operation_type = row["operation_type"]
query = row["query"]
temp_view_name = row["temp_view_name"]
table_name = row["table_name"]
write_mode = row["write_mode"]
if operation_type == "SQL":
status, error, start_time, end_time = execute_query(spark, query, temp_view_name, table_name, write_mode)
elif operation_type == "DataFrame":
status, error, start_time, end_time = execute_dataframe_operation(spark, query, temp_view_name, table_name, write_mode)
elif operation_type == "Oracle":
status, error, start_time, end_time = execute_oracle_query(spark, query, temp_view_name, table_name, write_mode, base_config)
log_status_to_hive(spark, program_name, stage_name, step_name, query, temp_view_name, table_name, write_mode, status, error, start_time, end_time)
spark.stop()
if __name__ == "__main__":
stage_name = sys.argv[1]
program_name = sys.argv[2]
config_path = sys.argv[3]
with open(config_path, 'r') as file:
base_config = json.load(file)
run_stage(stage_name, program_name, base_config)
import subprocess
import json
from pyspark.sql import SparkSession
def get_spark_session():
spark = SparkSession.builder
.appName("ETL Orchestrator")
.enableHiveSupport()
.getOrCreate()
return spark
def submit_spark_job(stage_name, program_name, config_path):
try:
print(f"Starting stage: {stage_name} of program: {program_name}")
result = subprocess.run(['spark-submit', '--master', 'local[*]', '--deploy-mode', 'client', 'etl/execute_stage.py', stage_name, program_name, config_path], check=True)
print(f"Completed stage: {stage_name} of program: {program_name}")
except subprocess.CalledProcessError as e:
print(f"Failed stage: {stage_name} of program: {program_name}")
sys.exit(1)
def main():
config_path = "config/base_config.json"
program_name = "CIBIL 750 Program"
spark = get_spark_session()
#!/bin/bash
# Base config path
CONFIG_PATH="config/base_config.json"
# Function to submit a Spark job
submit_spark_job() {
local stage_name=$1
local program_name=$2
echo "Starting stage: $stage_name of program: $program_name"
spark-submit --master local[*] --deploy-mode client etl/execute_stage.py $stage_name $program_name $CONFIG_PATH
if [ $? -eq 0 ]; then
echo "Completed stage: $stage_name of program: $program_name"
else
echo "Failed stage: $stage_name of program: $program_name"
exit 1
fi
}
# Retrieve distinct stages from the control table
STAGES=$(spark-sql --silent --outputformat csv --database mydatabase -e "SELECT DISTINCT stage_name FROM control_table WHERE program_name='CIBIL 750 Program'")
for STAGE in $STAGES
do
submit_spark_job "$STAGE" "CIBIL 750 Program"
done
echo "ETL process completed."
Discover more from HintsToday
Subscribe to get the latest posts sent to your email.