Complete project implementation for a dynamic ETL framework in PySpark. It includes:
- Metadata Table Definition and Sample Data
- Log Table Definition
- Dynamic ETL Script
- Sample Execution using Spark Submit
1. Metadata Table and Sample Data
Metadata Table Schema
CREATE TABLE IF NOT EXISTS metadatatable (
program_name STRING,
product_name STRING,
stage_name STRING,
step_name STRING,
operation_type STRING, -- SQL, DataFrame
query TEXT,
custom_logic TEXT, -- For non-SQL transformations
temp_view_name STRING,
table_name STRING,
write_mode STRING, -- overwrite, append, archive, snapshot
snapshot_mode STRING, -- e.g., "tableName_yyyymmdd"
stage_priority INT,
steps_priority INT,
month_year STRING -- e.g., "202412"
);
Sample Metadata
INSERT INTO metadatatable VALUES
('Risk Program', 'Personal Loan', 'Stage 1', 'Filter CIBIL', 'SQL',
'SELECT * FROM cibil_data WHERE score > 700', NULL, 'filtered_cibil', NULL, NULL, NULL, 1, 1, '202412'),
('Risk Program', 'Personal Loan', 'Stage 1', 'Add Risk Column', 'DataFrame', NULL,
'df = spark.table("filtered_cibil"); df = df.withColumn("risk_score", df["score"] * 1.1)',
'cibil_risk_view', NULL, NULL, 1, 2, '202412'),
('Risk Program', 'Personal Loan', 'Stage 2', 'Write Data', 'DataFrame', NULL,
'df = spark.table("cibil_risk_view"); df.write.mode("overwrite").saveAsTable("final_risk_data")',
NULL, 'final_risk_data', 'overwrite', NULL, 2, 1, '202412');
2. Log Table
Log Table Schema
CREATE TABLE IF NOT EXISTS log_table (
log_id BIGINT AUTO_INCREMENT,
program_name STRING,
product_name STRING,
stage_name STRING,
step_name STRING,
status STRING,
error_message STRING,
start_time TIMESTAMP,
end_time TIMESTAMP,
records_processed BIGINT,
execution_time_seconds DOUBLE
);
3. Dynamic ETL Script
Python Script (ETL Framework)
Save this as dynamic_etl_framework.py
:
from pyspark.sql import SparkSession
from datetime import datetime
import time
# Initialize SparkSession
spark = SparkSession.builder
.appName("Dynamic ETL Framework")
.enableHiveSupport()
.getOrCreate()
# Get current month-year
month_year = "202412"
# Load Metadata Table
metadata = spark.sql(f"SELECT * FROM metadatatable WHERE month_year = '{month_year}'")
metadata = metadata.orderBy("stage_priority", "steps_priority")
# Define Log Table
log_table = "log_table"
log_entries = []
# Function to log status
def log_status(log_id, product_name, program_name, stage_name, step_name, status,
error_message=None, start_time=None, end_time=None, records_processed=None):
execution_time_seconds = None
if start_time and end_time:
execution_time_seconds = (end_time - start_time).total_seconds()
log_entries.append({
"log_id": log_id,
"product_name": product_name,
"program_name": program_name,
"stage_name": stage_name,
"step_name": step_name,
"status": status,
"error_message": error_message,
"start_time": start_time.strftime("%Y-%m-%d %H:%M:%S") if start_time else None,
"end_time": end_time.strftime("%Y-%m-%d %H:%M:%S") if end_time else None,
"records_processed": records_processed,
"execution_time_seconds": execution_time_seconds
})
# Function to write to target table
def write_to_table(df, table_name, write_mode, snapshot_mode):
if write_mode == "snapshot":
snapshot_table_name = f"{table_name}_{month_year}"
df.write.mode("overwrite").saveAsTable(snapshot_table_name)
spark.sql(f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM {snapshot_table_name}")
else:
df.write.mode(write_mode).saveAsTable(table_name)
# Process each step in the metadata
for row in metadata.collect():
step = row.asDict()
product_name = step["product_name"]
program_name = step["program_name"]
stage_name = step["stage_name"]
step_name = step["step_name"]
operation_type = step["operation_type"]
query = step["query"]
custom_logic = step["custom_logic"]
temp_view_name = step["temp_view_name"]
table_name = step["table_name"]
write_mode = step["write_mode"]
snapshot_mode = step["snapshot_mode"]
start_time = datetime.now()
try:
print(f"Executing Step: {product_name} > {program_name} > {stage_name} > {step_name}")
if operation_type == "SQL":
# Execute SQL query
df = spark.sql(query)
if temp_view_name:
df.createOrReplaceTempView(temp_view_name)
elif operation_type == "DataFrame":
# Execute DataFrame logic
exec(custom_logic)
df = locals().get(temp_view_name) # Get the resulting DataFrame
else:
raise ValueError(f"Unknown operation type: {operation_type}")
# Count records processed
records_processed = df.count() if df else None
# Write to table
if table_name:
write_to_table(df, table_name, write_mode, snapshot_mode)
# Log success
end_time = datetime.now()
log_status(None, product_name, program_name, stage_name, step_name,
"SUCCESS", start_time=start_time, end_time=end_time, records_processed=records_processed)
except Exception as e:
# Log failure
end_time = datetime.now()
log_status(None, product_name, program_name, stage_name, step_name,
"FAILED", error_message=str(e), start_time=start_time, end_time=end_time)
print(f"Error in Step: {step_name}, Error: {str(e)}")
break
# Write log entries to Log Table
if log_entries:
log_df = spark.createDataFrame(log_entries)
log_df.write.mode("append").saveAsTable(log_table)
print("ETL Process Completed!")
4. Execution
Command to Run the Script
Save the above script as dynamic_etl_framework.py
and execute using spark-submit
:
spark-submit --master yarn --deploy-mode cluster dynamic_etl_framework.py
Sample Hive Tables
#cibil_dataCREATE TABLE cibil_data (customer_id INT, score INT); INSERT INTO cibil_data VALUES (1, 750), (2, 650), (3, 800);
final_risk_data
: Automatically created during the process.
This end-to-end project demonstrates the dynamic processing of metadata-driven ETL tasks using PySpark, with logging, error handling, and modular execution.
Discover more from HintsToday
Subscribe to get the latest posts sent to your email.