Below is the complete, enhanced ETL framework code that dynamically processes a specific stage of a program using metadata, with additional features such as parameterization, logging, error handling, and retries.
Dynamic ETL Framework Code
Save this as dynamic_etl_framework.py
:
from pyspark.sql import SparkSession
from datetime import datetime
import time
import sys
# Initialize SparkSession
spark = SparkSession.builder
.appName("Dynamic ETL Framework")
.enableHiveSupport()
.getOrCreate()
# Input parameters from the command line
if len(sys.argv) < 4:
print("Usage: dynamic_etl_framework.py <program_name> <stage_name> <month_year>")
sys.exit(1)
program_name_filter = sys.argv[1] # e.g., "Risk Program"
stage_name_filter = sys.argv[2] # e.g., "Stage 1"
month_year = sys.argv[3] # e.g., "202412"
# Retry configuration
MAX_RETRIES = 3
RETRY_DELAY = 5 # Seconds
# Load Metadata Table for the specific program and stage
metadata = spark.sql(f"""
SELECT *
FROM metadatatable
WHERE month_year = '{month_year}'
AND program_name = '{program_name_filter}'
AND stage_name = '{stage_name_filter}'
""").orderBy("stage_priority", "steps_priority")
# Define Log Table
log_table = "log_table"
log_entries = []
# Function to log status
def log_status(product_name, program_name, stage_name, step_name, status,
error_message=None, start_time=None, end_time=None, records_processed=None):
execution_time_seconds = None
if start_time and end_time:
execution_time_seconds = (end_time - start_time).total_seconds()
log_entries.append({
"product_name": product_name,
"program_name": program_name,
"stage_name": stage_name,
"step_name": step_name,
"status": status,
"error_message": error_message,
"start_time": start_time.strftime("%Y-%m-%d %H:%M:%S") if start_time else None,
"end_time": end_time.strftime("%Y-%m-%d %H:%M:%S") if end_time else None,
"records_processed": records_processed,
"execution_time_seconds": execution_time_seconds
})
# Function to write to target table
def write_to_table(df, table_name, write_mode, snapshot_mode):
if write_mode == "snapshot":
snapshot_table_name = f"{table_name}_{month_year}"
df.write.mode("overwrite").saveAsTable(snapshot_table_name)
spark.sql(f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM {snapshot_table_name}")
else:
df.write.mode(write_mode).saveAsTable(table_name)
# Process each step in the metadata
for row in metadata.collect():
step = row.asDict()
product_name = step["product_name"]
program_name = step["program_name"]
stage_name = step["stage_name"]
step_name = step["step_name"]
operation_type = step["operation_type"]
query = step["query"]
custom_logic = step["custom_logic"]
temp_view_name = step["temp_view_name"]
table_name = step["table_name"]
write_mode = step["write_mode"]
snapshot_mode = step["snapshot_mode"]
start_time = datetime.now()
retry_count = 0
success = False
while retry_count < MAX_RETRIES and not success:
try:
print(f"Executing Step: {product_name} > {program_name} > {stage_name} > {step_name}")
if operation_type == "SQL":
# Execute SQL query
df = spark.sql(query)
if temp_view_name:
df.createOrReplaceTempView(temp_view_name)
elif operation_type == "DataFrame":
# Execute DataFrame logic
exec(custom_logic)
df = locals().get(temp_view_name) # Get the resulting DataFrame
else:
raise ValueError(f"Unknown operation type: {operation_type}")
# Count records processed
records_processed = df.count() if df else None
# Write to table
if table_name:
write_to_table(df, table_name, write_mode, snapshot_mode)
# Log success
end_time = datetime.now()
log_status(product_name, program_name, stage_name, step_name,
"SUCCESS", start_time=start_time, end_time=end_time, records_processed=records_processed)
success = True
except Exception as e:
retry_count += 1
if retry_count >= MAX_RETRIES:
# Log failure after max retries
end_time = datetime.now()
log_status(product_name, program_name, stage_name, step_name,
"FAILED", error_message=str(e), start_time=start_time, end_time=end_time)
print(f"Error in Step: {step_name}, Error: {str(e)}")
break
else:
print(f"Retrying Step: {step_name} ({retry_count}/{MAX_RETRIES}) after error: {str(e)}")
time.sleep(RETRY_DELAY)
# Write log entries to Log Table
if log_entries:
log_df = spark.createDataFrame(log_entries)
log_df.write.mode("append").saveAsTable(log_table)
print("ETL Process Completed!")
Execution
Save the script as dynamic_etl_framework.py
and execute using Spark Submit:
spark-submit dynamic_etl_framework.py "Risk Program" "Stage 1" "202412"
Additional Features in This Version
- Parameterization:
- Runs for specific
program_name
,stage_name
, andmonth_year
. - Passed as command-line arguments for flexibility.
- Runs for specific
- Error Handling and Retries:
- Retries each step up to
MAX_RETRIES
times before logging it as a failure. - Adds a delay between retries to handle transient issues.
- Retries each step up to
- Logging:
- Captures detailed logs, including
status
,error_message
,records_processed
, and execution time. - Logs are stored in the
log_table
.
- Captures detailed logs, including
- Dynamic Write Modes:
- Supports
overwrite
,append
,archive
, andsnapshot
modes for target tables.
- Supports
Sample Metadata for Testing
Here’s the same sample metadata that filters and processes CIBIL data:
Sample Metadata Table
INSERT INTO metadatatable VALUES
('Risk Program', 'Personal Loan', 'Stage 1', 'Filter CIBIL', 'SQL',
'SELECT * FROM cibil_data WHERE score > 700', NULL, 'filtered_cibil', NULL, NULL, NULL, 1, 1, '202412'),
('Risk Program', 'Personal Loan', 'Stage 1', 'Add Risk Column', 'DataFrame', NULL,
'df = spark.table("filtered_cibil"); df = df.withColumn("risk_score", df["score"] * 1.1)',
'cibil_risk_view', NULL, NULL, 1, 2, '202412'),
('Risk Program', 'Personal Loan', 'Stage 1', 'Write Data', 'DataFrame', NULL,
'df = spark.table("cibil_risk_view"); df.write.mode("overwrite").saveAsTable("final_risk_data")',
NULL, 'final_risk_data', 'overwrite', NULL, 1, 3, '202412');
Log Table Schema
CREATE TABLE IF NOT EXISTS log_table (
log_id BIGINT AUTO_INCREMENT,
program_name STRING,
product_name STRING,
stage_name STRING,
step_name STRING,
status STRING,
error_message STRING,
start_time TIMESTAMP,
end_time TIMESTAMP,
records_processed BIGINT,
execution_time_seconds DOUBLE
);
This approach ensures a robust, dynamic, and extensible ETL pipeline, with clear logging and retry mechanisms. Let me know if you need further refinements!
Discover more from HintsToday
Subscribe to get the latest posts sent to your email.