Dynamic ETL Framework Design
This ETL framework is designed for a hierarchical, monthly process, where products contain programs, which are further divided into stages and steps. Steps include SQL queries or DataFrame transformations and follow the metadata-driven execution.
Key Features
- Hierarchy:
- Products → Programs → Stages → Steps.
- Write Modes:
- Overwrite: Replace data in the table.
- Append: Add new data to the table.
- Archive: Save historical data using time-based partitioning.
- Snapshot: Save with a timestamped table name and maintain a permanent view.
- Control & Log Table:
- Maintain
month_year
for each ETL run. - Log execution statuses (success, failure, retries).
- Maintain
- Metadata-Driven Execution:
- Metadata table stores SQL/DataFrame operations, dependencies, and priorities.
- Execution Modes:
- SQL queries executed via
spark.sql
. - DataFrame transformations executed as Python code.
- SQL queries executed via
SQL to Create Metadata Table
CREATE TABLE IF NOT EXISTS metadatatable (
program_name STRING,
product_name STRING,
stage_name STRING,
step_name STRING,
operation_type STRING, -- SQL or DataFrame
query TEXT, -- For SQL queries
custom_logic TEXT, -- For DataFrame transformations
temp_view_name STRING, -- Temporary view created by this step
table_name STRING, -- Target table name
write_mode STRING, -- overwrite, append, archive, snapshot
snapshot_mode STRING, -- e.g., tableName_yyyymmdd
stage_priority INT, -- Priority of stages
steps_priority INT, -- Priority of steps within a stage
month_year STRING -- ETL run month/year (e.g., "202412")
);
Python Implementation
Below is a Python implementation using PySpark:
1. Dynamic Framework Code
from pyspark.sql import SparkSession
from datetime import datetime
# Initialize SparkSession
spark = SparkSession.builder
.appName("Dynamic Monthly ETL Framework")
.enableHiveSupport()
.getOrCreate()
# Get current month and year for the ETL process
month_year = "202412"
# Step 1: Load Metadata Table
metadata = spark.sql(f"SELECT * FROM metadatatable WHERE month_year = '{month_year}'")
metadata = metadata.orderBy("stage_priority", "steps_priority") # Process in order
# Step 2: Control and Log Table
log_table = "etl_job_log"
control_table = "etl_control_table"
# Initialize log DataFrame
log_entries = []
# Function to log step status
def log_status(product_name, program_name, stage_name, step_name, status, error_message=None):
log_entries.append({
"product_name": product_name,
"program_name": program_name,
"stage_name": stage_name,
"step_name": step_name,
"month_year": month_year,
"status": status,
"error_message": error_message,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
# Function to write to target table
def write_to_table(df, table_name, write_mode, snapshot_mode):
if write_mode == "snapshot":
snapshot_table_name = f"{table_name}_{month_year}"
df.write.mode("overwrite").saveAsTable(snapshot_table_name)
spark.sql(f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM {snapshot_table_name}")
else:
df.write.mode(write_mode).saveAsTable(table_name)
# Step 3: Process Steps
for row in metadata.collect():
step = row.asDict()
product_name = step["product_name"]
program_name = step["program_name"]
stage_name = step["stage_name"]
step_name = step["step_name"]
operation_type = step["operation_type"]
query = step["query"]
custom_logic = step["custom_logic"]
temp_view_name = step["temp_view_name"]
table_name = step["table_name"]
write_mode = step["write_mode"]
snapshot_mode = step["snapshot_mode"]
try:
print(f"Executing Step: {product_name} > {program_name} > {stage_name} > {step_name}")
if operation_type == "SQL":
# Execute SQL query
df = spark.sql(query)
if temp_view_name:
df.createOrReplaceTempView(temp_view_name)
elif operation_type == "DataFrame":
# Execute DataFrame logic
exec(custom_logic)
df = locals().get(temp_view_name) # Get the resulting DataFrame
else:
raise ValueError(f"Unknown operation type: {operation_type}")
# Write to target table
if table_name:
write_to_table(df, table_name, write_mode, snapshot_mode)
log_status(product_name, program_name, stage_name, step_name, "SUCCESS")
except Exception as e:
log_status(product_name, program_name, stage_name, step_name, "FAILED", str(e))
print(f"Error in Step: {step_name}, Error: {str(e)}")
break # Stop processing on failure
# Step 4: Write Logs to Log Table
if log_entries:
log_df = spark.createDataFrame(log_entries)
log_df.write.mode("append").saveAsTable(log_table)
# Step 5: Update Control Table
control_data = [{"month_year": month_year, "status": "COMPLETED", "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}]
control_df = spark.createDataFrame(control_data)
control_df.write.mode("append").saveAsTable(control_table)
print("ETL Process Completed Successfully!")
Execution Using spark-submit
Save the above code as monthly_etl.py
and execute it using:
spark-submit --master yarn --deploy-mode cluster monthly_etl.py
Key Features of the Framework
- Metadata-Driven:
- Steps are dynamically executed based on the metadata table.
- Flexible Write Modes:
- Supports overwrite, append, archive, and snapshot.
- Error Handling and Logging:
- Logs success/failure of each step in the log table.
- Stops processing if a step fails.
- Snapshots:
- Creates timestamped tables and updates permanent views.
- Control Table:
- Tracks the overall status of the ETL process.
- Extensibility:
- Add or modify steps by updating the metadata table without changing the code.
Sample Metadata
Here’s how the metadata table might look for the provided scenario:
program_name | product_name | stage_name | step_name | operation_type | query | custom_logic | temp_view_name | table_name | write_mode | snapshot_mode | stage_priority | steps_priority | month_year |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Risk Program | Personal Loan | Stage 1 | Calculate Totals | SQL | SELECT category, SUM(amount) AS total FROM transactions GROUP BY category | NULL | totals_view | risk_totals | overwrite | NULL | 1 | 1 | 202412 |
Risk Program | Personal Loan | Stage 1 | Filter High Totals | DataFrame | NULL | df = spark.sql(“SELECT * FROM totals_view”).filter(“total > 500”) | high_totals_view | high_totals | append | NULL | 1 | 2 | 202412 |
This framework is ready for scalable, dynamic, and robust ETL processing in PySpark.
Discover more from HintsToday
Subscribe to get the latest posts sent to your email.