etl_project/
├── config/
│ ├── base_config.json
├── control/
│ ├── control_table_setup.sql
│ ├── sample_control_table_data.sql
│ ├── log_table_setup.sql
├── scripts/
│ ├── run_etl.sh
│ ├── run_etl.py
├── etl/
│ ├── execute_query.py
│ ├── log_status.py
│ ├── stage_runner.py
│ ├── job_orchestrator.py
├── logging/
│ ├── logger.py
├── utils/
│ ├── spark_utils.py
│ ├── query_utils.py
│ ├── config_utils.py
│ ├── error_handling.py
├── log/
│ ├── execution_logs.log
├── tests/
│ ├── test_etl.py
│ ├── test_query_utils.py
│ ├── test_logger.py
│ ├── test_error_handling.py
└── README.md
ETL Workflow with Enhanced Logging and Snapshot Management
Below is a refined ETL architecture and corresponding SQL and Python updates, based on your project structure and requirements.
SQL Table Definitions
Control Table
The control table is used to manage ETL steps, track their sequence, and specify configurations such as write_mode
and snapshot_mode
.
CREATE TABLE IF NOT EXISTS control_table (
program_name STRING,
product_name STRING,
stage_name STRING,
step_name STRING,
operation_type STRING, -- SQL, DataFrame
query TEXT,
custom_logic TEXT, -- For non-SQL transformations
temp_view_name STRING,
table_name STRING,
write_mode STRING, -- overwrite, append, archive, snapshot
snapshot_mode STRING, -- e.g., "tableName_yyyymmdd"
stage_priority INT,
steps_priority INT,
month_year STRING -- e.g., "202412"
);
Log Table
The log table tracks the execution of each ETL step, including performance metrics and error details.
CREATE TABLE IF NOT EXISTS log_table (
log_id BIGINT AUTO_INCREMENT PRIMARY KEY,
program_name STRING,
product_name STRING,
stage_name STRING,
step_name STRING,
status STRING, -- SUCCESS, FAILED
error_message STRING,
start_time TIMESTAMP,
end_time TIMESTAMP,
records_processed BIGINT,
execution_time_seconds DOUBLE,
month_year STRING -- e.g., "202412"
);
Key Enhancements
- Monthly Partitioning (
month_year
):- Both control and log tables include a
month_year
column to support monthly ETL runs.
- Both control and log tables include a
- Write Modes:
- overwrite: Replace existing data.
- append: Add new records.
- archive: Save to a time-stamped snapshot table.
- snapshot: Create a table with a time-stamped name and a permanent view for consistent access.
- Execution Time and Records Processed:
- The log table tracks execution time (
execution_time_seconds
) and the number of records processed.
- The log table tracks execution time (
- Support for Spark SQL and DataFrame:
- Steps can be SQL queries or DataFrame code snippets.
Python Code Updates
Step Execution with Snapshot Handling
In execute_query.py
, handle write_mode
and snapshot_mode
.
from datetime import datetime
def execute_step(spark, step, base_config):
"""
Execute a single step in the ETL process and return the number of records processed.
"""
if step["operation_type"] == "SQL":
temp_view_name = step["temp_view_name"]
query = step["query"]
result_df = spark.sql(query)
if step["write_mode"] == "temp_view":
result_df.createOrReplaceTempView(temp_view_name)
elif step["write_mode"] == "table":
table_name = step["table_name"]
result_df.write.mode("overwrite").saveAsTable(table_name)
elif step["write_mode"] == "append":
table_name = step["table_name"]
result_df.write.mode("append").saveAsTable(table_name)
elif step["write_mode"] == "archive":
table_name = f"{step['table_name']}_{datetime.now().strftime('%Y%m%d')}"
result_df.write.mode("overwrite").saveAsTable(table_name)
elif step["write_mode"] == "snapshot":
table_name = f"{step['table_name']}_{datetime.now().strftime('%Y%m%d')}"
result_df.write.mode("overwrite").saveAsTable(table_name)
# Create or replace permanent view
spark.sql(f"CREATE OR REPLACE VIEW {step['table_name']} AS SELECT * FROM {table_name}")
return result_df.count() # Return the number of records processed
elif step["operation_type"] == "DataFrame":
custom_logic = step["custom_logic"]
exec(custom_logic) # Run custom DataFrame code
return df.count() # Assuming df is the resulting DataFrame
Logging Updates
Update the log_to_table
function to include month_year
and execution_time_seconds
.
from datetime import datetime
def log_to_table(spark, program_name, product_name, stage_name, step_name, status, error_message, records_processed, start_time, end_time, month_year):
"""
Log execution details to the log_table.
"""
execution_time_seconds = (end_time - start_time).total_seconds()
spark.sql(f"""
INSERT INTO log_table (program_name, product_name, stage_name, step_name, status, error_message, start_time, end_time, records_processed, execution_time_seconds, month_year)
VALUES ('{program_name}', '{product_name}', '{stage_name}', '{step_name}', '{status}',
'{error_message}', '{start_time}', '{end_time}', {records_processed},
{execution_time_seconds}, '{month_year}')
""")
Stage Runner with Monthly Partitioning
In stage_runner.py
, pass month_year
for every step.
from datetime import datetime
from etl.execute_query import execute_step
from etl.log_status import log_to_table
from utils.config_utils import get_control_table_steps
from utils.spark_utils import get_spark_session
from logging.logger import get_logger
logger = get_logger("ETL")
def run_stage(program_name, product_name, stage_name, base_config, month_year):
"""
Execute all steps in a stage and log results.
"""
spark = get_spark_session()
steps = get_control_table_steps(spark, program_name, stage_name, month_year)
for step in steps.collect():
step_name = step["step_name"]
start_time = datetime.now()
try:
records_processed = execute_step(spark, step, base_config)
end_time = datetime.now()
log_to_table(spark, program_name, product_name, stage_name, step_name, "SUCCESS", None,
records_processed, start_time, end_time, month_year)
logger.info(f"Step {step_name} completed successfully. Records processed: {records_processed}")
except Exception as e:
end_time = datetime.now()
log_to_table(spark, program_name, product_name, stage_name, step_name, "FAILED", str(e),
0, start_time, end_time, month_year)
logger.error(f"Error in step {step_name}: {e}")
spark.stop()
Execution Scripts
Monthly Execution in run_etl.py
Pass month_year
as an argument to orchestrate ETL for the given period.
from etl.job_orchestrator import orchestrate_etl
if __name__ == "__main__":
import sys
month_year = sys.argv[1] # Pass as YYYYMM
base_config = "config/base_config.json"
orchestrate_etl(base_config, month_year)
Example Control Table Data
INSERT INTO control_table VALUES
('Program1', 'Personal Loan', 'Stage1', 'Step1', 'SQL', 'SELECT * FROM source_table', NULL,
'temp_view1', 'target_table1', 'overwrite', NULL, 1, 1, '202412'),
('Program1', 'Personal Loan', 'Stage1', 'Step2', 'DataFrame', NULL, 'df = spark.table("temp_view1"); df = df.withColumn("score", df["score"] * 1.1)',
NULL, 'target_table2', 'snapshot', 'tableName_yyyymmdd', 1, 2, '202412');
This structure and code implementation ensures monthly ETL runs are modular, scalable, and well-logged.
Sparate spark submits for each stages of a program, can be built dynamically . shell script or python execution for individual stage of a program:-
Updated Table Definitions
Control Table
Add month_year
as a key column to reflect the monthly processing needs and align with log_table
.
CREATE TABLE IF NOT EXISTS control_table (
program_name STRING,
product_name STRING,
stage_name STRING,
step_name STRING,
operation_type STRING, -- e.g., SQL, DataFrame
query TEXT, -- SQL logic
custom_logic TEXT, -- DataFrame or Python logic
temp_view_name STRING, -- Name for temporary Spark views
table_name STRING, -- Target table
write_mode STRING, -- Write modes: write, append, snapshot, archive
snapshot_mode STRING, -- Snapshot identifier: YYYYMMDD format
stage_priority INT, -- Stage execution priority
step_priority INT, -- Step execution priority
month_year STRING -- Monthly partition in YYYYMM format
);
Log Table
Include month_year
for traceability across monthly runs and execution_time_seconds
to measure performance.
CREATE TABLE IF NOT EXISTS log_table (
log_id BIGINT AUTO_INCREMENT,
program_name STRING,
product_name STRING,
stage_name STRING,
step_name STRING,
status STRING, -- e.g., SUCCESS, FAILED
error_message STRING, -- Details of failure, if any
start_time TIMESTAMP,
end_time TIMESTAMP,
records_processed BIGINT, -- Count of processed records
execution_time_seconds DOUBLE, -- Time taken for step completion
month_year STRING -- Monthly partition in YYYYMM format
);
2. Execution Pipeline Workflow
Shell Script (run_etl.sh
)
The script triggers the ETL execution for specific stages dynamically based on program_name
and stage_name
.
bashCopy code#!/bin/bash
PROGRAM_NAME=$1
STAGE_NAME=$2
MONTH_YEAR=$(date +"%Y%m")
echo "Executing ETL for Program: $PROGRAM_NAME, Stage: $STAGE_NAME, Month-Year: $MONTH_YEAR"
spark-submit --class org.apache.spark.Main \
--master yarn \
--deploy-mode client \
./scripts/run_etl.py \
--program_name $PROGRAM_NAME \
--stage_name $STAGE_NAME \
--month_year $MONTH_YEAR
if [ $? -eq 0 ]; then
echo "ETL execution completed successfully for Stage: $STAGE_NAME"
else
echo "ETL execution failed for Stage: $STAGE_NAME"
exit 1
fi
Stage Runner (stage_runner.py
)
The stage execution ensures priority-based processing and tracks logs dynamically.
from etl.execute_query import execute_step
from etl.log_status import log_to_table
from utils.config_utils import get_control_table_steps
from utils.spark_utils import get_spark_session
from datetime import datetime
def run_stage(program_name, stage_name, month_year):
spark = get_spark_session()
steps = get_control_table_steps(spark, program_name, stage_name, month_year)
for step in steps.collect():
step_name = step["step_name"]
start_time = datetime.now()
try:
records_processed = execute_step(spark, step)
end_time = datetime.now()
execution_time = (end_time - start_time).total_seconds()
log_to_table(
spark, program_name, stage_name, step_name, "SUCCESS",
None, start_time, end_time, records_processed, execution_time, month_year
)
except Exception as e:
end_time = datetime.now()
execution_time = (end_time - start_time).total_seconds()
log_to_table(
spark, program_name, stage_name, step_name, "FAILED",
str(e), start_time, end_time, 0, execution_time, month_year
)
raise e
Execution Query (execute_query.py
)
Handles the different write_mode
options and calculates processed record counts.
def execute_step(spark, step):
"""
Execute ETL logic for a single step.
"""
operation_type = step["operation_type"]
write_mode = step["write_mode"]
temp_view_name = step.get("temp_view_name")
table_name = step.get("table_name")
query = step.get("query")
custom_logic = step.get("custom_logic")
snapshot_mode = step.get("snapshot_mode")
if operation_type == "SQL":
result_df = spark.sql(query)
elif operation_type == "DataFrame":
exec(custom_logic) # Execute custom DataFrame logic
# Handle write modes
if write_mode == "write":
result_df.write.mode("overwrite").saveAsTable(table_name)
elif write_mode == "append":
result_df.write.mode("append").saveAsTable(table_name)
elif write_mode == "snapshot":
snapshot_table = f"{table_name}_{snapshot_mode}"
result_df.write.mode("overwrite").saveAsTable(snapshot_table)
spark.sql(f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM {snapshot_table}")
elif write_mode == "archive":
archive_table = f"{table_name}_archive_{snapshot_mode}"
result_df.write.mode("append").saveAsTable(archive_table)
return result_df.count()
3. Orchestrator (job_orchestrator.py
)
Manages the entire ETL pipeline.
from etl.stage_runner import run_stage
def orchestrate_program(program_name, month_year):
stages = ["Stage1", "Stage2", "Stage3"] # Define stage sequence
for stage_name in stages:
run_stage(program_name, stage_name, month_year)
4. Key Features
- Dynamic Execution: Handle stages and programs dynamically using configurations.
- Partitioning: Include
month_year
for monthly runs in bothcontrol_table
andlog_table
. - Write Modes:
- Write: Overwrites the table.
- Append: Appends to the existing table.
- Snapshot: Creates time-stamped versions and maintains a permanent view.
- Archive: Stores older records in a separate table.
Let’s dive deeper into all the modules and provide detailed examples with explanations. This will include scripts for individual components, implementation details, and their integration into the overall workflow. I’ll structure this as follows:
1. Module Details and Scripts
1.1. config/
Stores configurations like base_config.json
for reusable metadata.
base_config.json
:
jsonCopy code{
"spark_config": {
"app_name": "ETL Pipeline",
"master": "yarn",
"deploy_mode": "client"
},
"log_table_name": "log_table",
"control_table_name": "control_table"
}
1.2. control/
SQL scripts for setting up control and log tables.
control_table_setup.sql
:
CREATE TABLE IF NOT EXISTS control_table (
program_name STRING,
product_name STRING,
stage_name STRING,
step_name STRING,
operation_type STRING,
query TEXT,
custom_logic TEXT,
temp_view_name STRING,
table_name STRING,
write_mode STRING,
snapshot_mode STRING,
stage_priority INT,
step_priority INT,
month_year STRING
);
sample_control_table_data.sql
:
sqlCopy codeINSERT INTO control_table VALUES
('Program_A', 'Personal Loan', 'Stage1', 'Step1', 'SQL',
'SELECT * FROM source_table WHERE score > 700', NULL,
'temp_stage1', 'personal_loan_stage1', 'write', NULL, 1, 1, '202412');
1.3. scripts/
Scripts for pipeline orchestration and execution.
run_etl.sh
: Executes stages of programs dynamically.
#!/bin/bash
PROGRAM_NAME=$1
STAGE_NAME=$2
MONTH_YEAR=$3
echo "Running ETL: Program=$PROGRAM_NAME, Stage=$STAGE_NAME, Month-Year=$MONTH_YEAR"
spark-submit \
--master yarn \
./etl/job_orchestrator.py \
--program_name $PROGRAM_NAME \
--stage_name $STAGE_NAME \
--month_year $MONTH_YEAR
1.4. etl/
Core ETL logic including query execution and logging.
execute_query.py
: Executes SQL or DataFrame-based steps.
from pyspark.sql import SparkSession
def execute_step(spark, step):
operation_type = step["operation_type"]
query = step["query"]
custom_logic = step["custom_logic"]
if operation_type == "SQL":
df = spark.sql(query)
elif operation_type == "DataFrame":
exec(custom_logic) # Custom DataFrame logic
else:
raise ValueError("Invalid operation type.")
# Example write logic
write_mode = step["write_mode"]
table_name = step["table_name"]
if write_mode == "write":
df.write.mode("overwrite").saveAsTable(table_name)
return df.count()
log_status.py
: Logs success or failure status.
from pyspark.sql import SparkSession
from datetime import datetime
def log_to_table(spark, program_name, stage_name, step_name, status, error_message, start_time, end_time, records_processed, execution_time, month_year):
log_data = [(program_name, stage_name, step_name, status, error_message, start_time, end_time, records_processed, execution_time, month_year)]
schema = ["program_name", "stage_name", "step_name", "status", "error_message", "start_time", "end_time", "records_processed", "execution_time", "month_year"]
log_df = spark.createDataFrame(log_data, schema=schema)
log_df.write.mode("append").saveAsTable("log_table")
stage_runner.py
: Runs all steps in a stage based on control_table
.
from execute_query import execute_step
from log_status import log_to_table
def run_stage(spark, program_name, stage_name, month_year):
control_df = spark.sql(f"SELECT * FROM control_table WHERE program_name='{program_name}' AND stage_name='{stage_name}' AND month_year='{month_year}' ORDER BY step_priority")
steps = control_df.collect()
for step in steps:
try:
start_time = datetime.now()
records_processed = execute_step(spark, step)
end_time = datetime.now()
log_to_table(spark, program_name, stage_name, step["step_name"], "SUCCESS", None, start_time, end_time, records_processed, (end_time- start_time).total_seconds(), month_year)
except Exception as e:
log_to_table(spark, program_name, stage_name, step["step_name"], "FAILED", str(e), start_time, datetime.now(), 0, 0, month_year)
raise
job_orchestrator.py
: Orchestrates stages of a program.
from stage_runner import run_stage
def orchestrate_program(spark, program_name, month_year):
stages = spark.sql(f"SELECT DISTINCT stage_name FROM control_table WHERE program_name='{program_name}' AND month_year='{month_year}' ORDER BY stage_priority").collect()
for stage in stages:
run_stage(spark, program_name, stage["stage_name"], month_year)
1.5. logging/
Centralized logging utilities.
logger.py
:
import logging
def get_logger(log_file):
logger = logging.getLogger("ETL Logger")
logger.setLevel(logging.INFO)
handler = logging.FileHandler(log_file)
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
return logger
1.6. utils/
Utility functions for configuration, Spark, and error handling.
config_utils.py
: Reads and parses configurations.
import json
def load_config(config_path):
with open(config_path, 'r') as f:
return json.load(f)
spark_utils.py
: Manages Spark session creation.
from pyspark.sql import SparkSession
def get_spark_session(app_name):
return SparkSession.builder.appName(app_name).getOrCreate()
2. Integration Plan
- Setup Tables:
- Execute
control_table_setup.sql
andlog_table_setup.sql
to set up metadata tables. - Populate
control_table
with configuration for different steps.
- Execute
- Script Execution:
- Use
run_etl.sh
to run individual stages dynamically. - Define
program_name
,stage_name
, andmonth_year
in the script call.
- Use
- Monthly Runs:
- Use
month_year
to filter and process the relevant data for monthly runs. - Use
snapshot_mode
andarchive
options for historical data management.
- Use
Let’s add detailed examples for DataFrame queries and error-handling patterns, tailored for your ETL structure.
1. DataFrame Queries
1.1. Filter and Transform Example
Use this for filtering and adding new columns with transformations.
from pyspark.sql import functions as F
def filter_and_transform_data(spark):
# Load data from a table or view
df = spark.table("cibil_filtered_view")
# Apply filters
filtered_df = df.filter((F.col("score") > 700) & (F.col("region") == "North"))
# Add a new column with transformations
transformed_df = filtered_df.withColumn("risk_score", F.col("score") * 1.1)
# Save the output
transformed_df.write.mode("overwrite").saveAsTable("transformed_cibil_data")
return transformed_df.count()
1.2. Aggregation Example
Perform summary statistics or group-by operations.
def aggregate_data(spark):
# Load data from a table
df = spark.table("risk_data")
# Perform aggregations
aggregated_df = df.groupBy("region").agg(
F.avg("risk_score").alias("avg_risk_score"),
F.sum("loan_amount").alias("total_loan_amount"),
)
# Save the aggregated data
aggregated_df.write.mode("overwrite").saveAsTable("aggregated_risk_data")
return aggregated_df.count()
1.3. Join Example
Combine data from multiple sources.
def join_data_sources(spark):
# Load data from two sources
cibil_df = spark.table("cibil_data")
loan_df = spark.table("loan_data")
# Perform an inner join
joined_df = cibil_df.join(loan_df, cibil_df["customer_id"] == loan_df["customer_id"], "inner")
# Filter and select relevant columns
result_df = joined_df.select(
"customer_id",
"score",
"loan_amount",
F.when(F.col("score") > 750, "Low Risk").otherwise("High Risk").alias("risk_category")
)
# Save the result
result_df.write.mode("overwrite").saveAsTable("joined_customer_data")
return result_df.count()
2. Error-Handling Patterns
2.1. Try-Except Block for DataFrame Operations
Wrap critical operations in a try-except block to handle failures gracefully.
def safe_execute_query(spark, query, output_table):
try:
# Execute query
df = spark.sql(query)
# Write to output table
df.write.mode("overwrite").saveAsTable(output_table)
print(f"Query executed successfully, data saved to {output_table}")
return df.count()
except Exception as e:
print(f"Failed to execute query: {str(e)}")
return 0
2.2. Logging Errors with Context
Capture errors and log them with relevant context.
from datetime import datetime
from pyspark.sql.utils import AnalysisException
def execute_with_logging(spark, logger, query, output_table):
start_time = datetime.now()
try:
# Execute query
df = spark.sql(query)
df.write.mode("overwrite").saveAsTable(output_table)
end_time = datetime.now()
logger.info(f"Query executed successfully: {query}")
logger.info(f"Records processed: {df.count()}, Time taken: {end_time - start_time}")
except AnalysisException as ae:
logger.error(f"Analysis error in query execution: {query}, Error: {str(ae)}")
except Exception as e:
logger.error(f"Unhandled error in query execution: {query}, Error: {str(e)}")
2.3. Retrying Logic for Fault-Tolerance
Retry failed steps with a maximum retry limit.
import time
def retry_with_backoff(func, max_retries=3, delay=5, *args, **kwargs):
attempts = 0
while attempts < max_retries:
try:
return func(*args, **kwargs)
except Exception as e:
attempts += 1
print(f"Attempt {attempts} failed with error: {str(e)}")
if attempts == max_retries:
print("Max retries reached, aborting.")
raise
time.sleep(delay)
Usage:
retry_with_backoff(safe_execute_query, max_retries=3, delay=10, spark=spark, query="SELECT * FROM invalid_table", output_table="output_table")
3. Integrated Example
A complete step that incorporates DataFrame operations and error handling with logging and retries.
from logger import get_logger
def process_stage_with_error_handling(spark, query, output_table, logger):
try:
# Attempt execution with retries
retry_with_backoff(safe_execute_query, max_retries=3, delay=5, spark=spark, query=query, output_table=output_table)
logger.info(f"Successfully processed table {output_table}")
except Exception as e:
logger.error(f"Stage failed for table {output_table}: {str(e)}")
raise
Full Pipeline Script Using Integrated Example:
from spark_utils import get_spark_session
from logger import get_logger
from config_utils import load_config
def main():
# Initialize
spark = get_spark_session("ETL Pipeline")
logger = get_logger("./log/execution_logs.log")
config = load_config("./config/base_config.json")
query = "SELECT * FROM cibil_filtered_view WHERE score > 700"
output_table = "high_score_customers"
# Process stage
process_stage_with_error_handling(spark, query, output_table, logger)
if __name__ == "__main__":
main()
These examples ensure:
- Clear error handling for every step.
- Retry mechanisms for resilience.
- A consistent structure for DataFrame transformations and operations.