1. Project Structure
etl_project/
├── config/
│ ├── base_config.json
│ ├── product_configs/
│ │ ├── pl_cibil750.json
├── control/
│ ├── control_table_setup.sql
│ ├── sample_control_table_data.sql
├── scripts/
│ ├── run_etl.sh
│ ├── run_etl.py
├── etl/
│ ├── execute_query.py
│ ├── log_status.py
│ ├── stage_runner.py
│ ├── job_orchestrator.py
├── logging/
│ ├── logger.py
├── utils/
│ ├── spark_utils.py
│ ├── query_utils.py
│ ├── data_utils.py
│ ├── config_utils.py
│ ├── error_handling.py
└── README.md
2. Configuration Files
a. base_config.json
jsonCopy code{
"hive_metastore": "thrift://localhost:9083",
"spark_master": "local[*]",
"log_level": "INFO",
"oracle_host": "oracle-host",
"oracle_port": "1521",
"oracle_service_name": "orcl",
"oracle_user": "oracle_user",
"oracle_password": "oracle_password"
}
b. pl_cibil750.json
jsonCopy code{
"program_name": "CIBIL 750 Program",
"stages": [
{
"stage_name": "CIBIL Filter",
"steps": [
{
"step_name": "Read CIBIL Data",
"operation_type": "SQL",
"query": "SELECT * FROM cibil_data WHERE score > 750",
"temp_view_name": "cibil_filtered_view"
}
]
},
{
"stage_name": "Risk Filter",
"steps": [
{
"step_name": "Apply Risk Filter",
"operation_type": "SQL",
"query": "SELECT * FROM risk_data WHERE risk_level < 3",
"temp_view_name": "risk_filtered_view"
}
]
},
{
"stage_name": "Final Write",
"steps": [
{
"step_name": "Write to Hive",
"operation_type": "SQL",
"query": "INSERT INTO final_table SELECT * FROM temp_table"
}
]
}
]
}
3. Control Table
a. control_table_setup.sql
sqlCopy codeCREATE TABLE IF NOT EXISTS control_table (
program_name STRING,
stage_name STRING,
step_name STRING,
operation_type STRING,
query TEXT,
temp_view_name STRING,
table_name STRING,
write_mode STRING,
status STRING,
error_message STRING,
start_time TIMESTAMP,
end_time TIMESTAMP
);
b. sample_control_table_data.sql
INSERT INTO control_table VALUES
('CIBIL 750 Program', 'CIBIL Filter', 'Read CIBIL Data', 'SQL', 'SELECT * FROM cibil_data WHERE score > 750', 'cibil_filtered_view', NULL, 'temp_view', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'Risk Filter', 'Apply Risk Filter', 'SQL', 'SELECT * FROM risk_data WHERE risk_level < 3', 'risk_filtered_view', NULL, 'temp_view', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'Final Write', 'Write to Hive', 'SQL', 'INSERT INTO final_table SELECT * FROM temp_table', NULL, 'snapshot', 'PENDING', NULL, NULL, NULL);
4. Scripts
a. run_etl.sh
bashCopy code#!/bin/bash
CONFIG_PATH="config/base_config.json"
PROGRAM_NAME="CIBIL 750 Program"
spark-submit --master local[*] --deploy-mode client scripts/run_etl.py $PROGRAM_NAME $CONFIG_PATH
b. run_etl.py
pythonCopy codeimport sys
from etl.job_orchestrator import run_program
if __name__ == "__main__":
program_name = sys.argv[1]
config_path = sys.argv[2]
run_program(program_name, config_path)
c. etl/execute_query.py
pythonCopy codefrom datetime import datetime
from utils.query_utils import execute_sql_query, execute_dataframe_operation
def execute_step(spark, step, config):
operation_type = step["operation_type"]
query = step["query"]
temp_view_name = step["temp_view_name"]
table_name = step["table_name"]
write_mode = step["write_mode"]
start_time = datetime.now()
try:
if operation_type == "SQL":
execute_sql_query(spark, query, temp_view_name, table_name, write_mode)
end_time = datetime.now()
return "SUCCESS", None, start_time, end_time
except Exception as e:
end_time = datetime.now()
return "FAILED", str(e), start_time, end_time
d. etl/log_status.py
pythonCopy codedef log_status_to_control_table(spark, program_name, stage_name, step_name, status, error_message, start_time, end_time):
spark.sql(f"""
INSERT INTO control_table (program_name, stage_name, step_name, status, error_message, start_time, end_time)
VALUES ('{program_name}', '{stage_name}', '{step_name}', '{status}', '{error_message}', '{start_time}', '{end_time}')
""")
e. etl/stage_runner.py
pythonCopy codefrom etl.execute_query import execute_step
from etl.log_status import log_status_to_control_table
from utils.config_utils import get_control_table_steps
from utils.spark_utils import get_spark_session
def run_stage(program_name, stage_name, base_config):
spark = get_spark_session()
steps = get_control_table_steps(spark, program_name, stage_name)
for step in steps.collect():
step_name = step["step_name"]
status, error_message, start_time, end_time = execute_step(spark, step, base_config)
log_status_to_control_table(spark, program_name, stage_name, step_name, status, error_message, start_time, end_time)
spark.stop()
f. etl/job_orchestrator.py
pythonCopy codefrom etl.stage_runner import run_stage
from utils.config_utils import load_json_config
from utils.spark_utils import get_spark_session
def run_program(program_name, config_path):
config = load_json_config(config_path)
spark = get_spark_session()
stages = spark.sql(f"SELECT DISTINCT stage_name FROM control_table WHERE program_name='{program_name}'")
for stage in stages.collect():
run_stage(program_name, stage["stage_name"], config)
spark.stop()
5. Utilities
a. utils/spark_utils.py
pythonCopy codefrom pyspark.sql import SparkSession
def get_spark_session(app_name="ETL", master="local[*]"):
return SparkSession.builder \
.appName(app_name) \
.master(master) \
.enableHiveSupport() \
.getOrCreate()
b. utils/config_utils.py
import json
def load_json_config(config_path):
with open(config_path, 'r') as file:
return json.load(file)
def get_control_table_steps(spark, program_name, stage_name):
query = f"SELECT * FROM control_table WHERE program_name='{program_name}' AND stage_name='{stage_name}'"
return spark.sql(query)
Missing Utility Files
a. data_utils.py
pythonCopy codedef custom_transform_function(df):
"""
Example custom transformation function for DataFrame operations.
Modify this as per the specific business logic.
"""
return df.filter("score > 700").withColumnRenamed("old_column_name", "new_column_name")
b. config_utils.py
import json
def load_json_config(config_path):
"""
Load configuration from a JSON file.
"""
with open(config_path, 'r') as file:
return json.load(file)
def get_control_table_steps(spark, program_name, stage_name):
"""
Fetch steps for a specific stage and program from the control table.
"""
query = f"SELECT * FROM control_table WHERE program_name='{program_name}' AND stage_name='{stage_name}'"
return spark.sql(query)
c. error_handling.py
def log_error(error_message, logger):
"""
Log error messages to a logging service or file.
"""
logger.error(f"Error: {error_message}")
return {"status": "FAILED", "message": error_message}
2. Updated Control Table Data
Control Table SQL
INSERT INTO control_table VALUES
('CIBIL 750 Program', 'CIBIL Filter', 'Read CIBIL Data', 'SQL', 'SELECT * FROM cibil_data WHERE score > 750', 'cibil_filtered_view', NULL, 'temp_view', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'CIBIL Filter', 'Apply Risk Filter', 'SQL', 'SELECT * FROM risk_data WHERE risk_level < 3', 'risk_filtered_view', NULL, 'temp_view', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'Dataframe Operation', 'Custom Transform', 'DataFrame', 'custom_transform_function()', 'custom_transformed_view', NULL, 'temp_view', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'Oracle Read', 'Read Oracle Data', 'Oracle', 'SELECT * FROM oracle_table WHERE condition = value', 'oracle_view', NULL, 'temp_view', 'PENDING', NULL, NULL, NULL),
('CIBIL 750 Program', 'Final Write', 'Write to Hive', 'SQL', 'INSERT INTO final_table SELECT * FROM temp_table', NULL, 'snapshot', 'PENDING', NULL, NULL, NULL);
3. Enhanced execute_query.py
This handles all types of operations dynamically, including SQL, DataFrame operations, and Oracle reads.
from utils.query_utils import execute_sql_query, execute_dataframe_operation
from utils.data_utils import custom_transform_function
def execute_step(spark, step, config):
"""
Execute a single step in the ETL process.
Supports multiple operation types: SQL, DataFrame, Oracle.
"""
operation_type = step["operation_type"]
query = step["query"]
temp_view_name = step.get("temp_view_name")
table_name = step.get("table_name")
write_mode = step.get("write_mode")
if operation_type == "SQL":
# Execute a SQL query
execute_sql_query(spark, query, temp_view_name, table_name, write_mode)
elif operation_type == "DataFrame":
# Execute a DataFrame operation
if query == "custom_transform_function()":
df = spark.table(temp_view_name) # Assume input is already registered as a view
transformed_df = custom_transform_function(df)
transformed_df.createOrReplaceTempView(temp_view_name)
elif operation_type == "Oracle":
# Example: Reading from Oracle
oracle_jdbc_url = f"jdbc:oracle:thin:@{config['oracle_host']}:{config['oracle_port']}:{config['oracle_service_name']}"
df = spark.read \
.format("jdbc") \
.option("url", oracle_jdbc_url) \
.option("dbtable", f"({query})") \
.option("user", config["oracle_user"]) \
.option("password", config["oracle_password"]) \
.load()
df.createOrReplaceTempView(temp_view_name)
else:
raise ValueError(f"Unsupported operation type: {operation_type}")
4. Updated query_utils.py
def execute_sql_query(spark, query, temp_view_name=None, table_name=None, write_mode=None):
"""
Execute SQL query and optionally register as a temporary view or write to a table.
"""
df = spark.sql(query)
if temp_view_name:
df.createOrReplaceTempView(temp_view_name)
if table_name:
if write_mode == "overwrite":
df.write.mode("overwrite").saveAsTable(table_name)
elif write_mode == "append":
df.write.mode("append").saveAsTable(table_name)
else:
raise ValueError(f"Unsupported write mode: {write_mode}")
5. Logging Enhancements
Logging Script (logging/logger.py
)
import logging
def get_logger(name):
"""
Set up a logger with a specific name and INFO level logging.
"""
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
6. Enhanced ETL Process
Stage Runner
from etl.execute_query import execute_step
from etl.log_status import log_status_to_control_table
from utils.config_utils import get_control_table_steps
from utils.spark_utils import get_spark_session
from logging.logger import get_logger
logger = get_logger("ETL")
def run_stage(program_name, stage_name, base_config):
"""
Execute all steps in a stage, logging the results to the control table.
"""
spark = get_spark_session()
steps = get_control_table_steps(spark, program_name, stage_name)
for step in steps.collect():
step_name = step["step_name"]
try:
execute_step(spark, step, base_config)
log_status_to_control_table(spark, program_name, stage_name, step_name, "SUCCESS", None, start_time, end_time)
logger.info(f"Step {step_name} completed successfully.")
except Exception as e:
log_status_to_control_table(spark, program_name, stage_name, step_name, "FAILED", str(e), start_time, end_time)
logger.error(f"Error in step {step_name}: {e}")
spark.stop()
This setup dynamically supports various operations on data, whether they are SQL queries, DataFrame transformations, or Oracle reads, and logs all actions and errors effectively. Let me know if further adjustments are needed!
Here’s how the updated setup can handle the addition of the snapshot
mode and make it configurable directly through the control table.
1. Updated execute_sql_query
to Support snapshot
Mode
This updated function dynamically handles temp_view
, table
, and snapshot
modes based on control table configuration.
from datetime import datetime
def execute_sql_query(spark, query, temp_view_name=None, table_name=None, write_mode=None):
"""
Execute SQL query and handle different write modes: temp_view, table, snapshot.
"""
df = spark.sql(query)
if write_mode == "temp_view":
df.createOrReplaceTempView(temp_view_name)
elif write_mode == "table":
df.write.mode("overwrite").saveAsTable(table_name)
elif write_mode == "snapshot":
current_time = datetime.now().strftime("%Y%m%d%H%M")
snapshot_table_name = f"{table_name}_{current_time}"
df.write.mode("overwrite").saveAsTable(snapshot_table_name)
df.createOrReplaceTempView(f"{table_name}_view")
elif write_mode == "append":
df.write.mode("append").saveAsTable(table_name)
else:
raise ValueError(f"Unsupported write mode: {write_mode}")
2. Updated Control Table SQL
The control table now includes the write_mode
column to specify the operation type (temp_view
, table
, append
, snapshot
). This ensures flexibility in defining the desired operation.
INSERT INTO control_table VALUES
('CIBIL 750 Program', 'CIBIL Filter', 'Read CIBIL Data', 'SQL', 'SELECT * FROM cibil_data WHERE score > 750', 'cibil_filtered_view', NULL, 'temp_view', 'PENDING', NULL, NULL, 'temp_view'),
('CIBIL 750 Program', 'CIBIL Filter', 'Apply Risk Filter', 'SQL', 'SELECT * FROM risk_data WHERE risk_level < 3', 'risk_filtered_view', NULL, 'temp_view', 'PENDING', NULL, NULL, 'temp_view'),
('CIBIL 750 Program', 'Dataframe Operation', 'Custom Transform', 'DataFrame', 'custom_transform_function()', 'custom_transformed_view', NULL, 'temp_view', 'PENDING', NULL, NULL, 'temp_view'),
('CIBIL 750 Program', 'Oracle Read', 'Read Oracle Data', 'Oracle', 'SELECT * FROM oracle_table WHERE condition = value', 'oracle_view', NULL, 'temp_view', 'PENDING', NULL, NULL, 'temp_view'),
('CIBIL 750 Program', 'Final Write', 'Write to Hive', 'SQL', 'INSERT INTO final_table SELECT * FROM temp_table', 'final_table', NULL, 'snapshot', 'PENDING', NULL, NULL, 'snapshot');
3. Updated execute_step
Function
This integrates the write_mode
logic directly from the control table.
def execute_step(spark, step, config):
"""
Execute a single step in the ETL process, supporting multiple write modes.
"""
operation_type = step["operation_type"]
query = step["query"]
temp_view_name = step.get("temp_view_name")
table_name = step.get("table_name")
write_mode = step.get("write_mode")
if operation_type == "SQL":
# Execute a SQL query
execute_sql_query(spark, query, temp_view_name, table_name, write_mode)
elif operation_type == "DataFrame":
# Example: Execute a DataFrame operation
if query == "custom_transform_function()":
df = spark.table(temp_view_name) # Assume input is already registered as a view
transformed_df = custom_transform_function(df)
transformed_df.createOrReplaceTempView(temp_view_name)
elif operation_type == "Oracle":
# Example: Reading from Oracle
oracle_jdbc_url = f"jdbc:oracle:thin:@{config['oracle_host']}:{config['oracle_port']}:{config['oracle_service_name']}"
df = spark.read \
.format("jdbc") \
.option("url", oracle_jdbc_url) \
.option("dbtable", f"({query})") \
.option("user", config["oracle_user"]) \
.option("password", config["oracle_password"]) \
.load()
df.createOrReplaceTempView(temp_view_name)
else:
raise ValueError(f"Unsupported operation type: {operation_type}")
4. Updated Control Table Schema
The control table schema includes an additional column write_mode
to handle different write behaviors, making it clear and configurable:
Program Name | Stage Name | Step Name | Operation Type | Query | Temp View Name | Table Name | Write Mode |
---|---|---|---|---|---|---|---|
CIBIL 750 Program | CIBIL Filter | Read CIBIL Data | SQL | SELECT * FROM cibil_data WHERE score > 750 | cibil_filtered_view | NULL | temp_view |
CIBIL 750 Program | CIBIL Filter | Apply Risk Filter | SQL | SELECT * FROM risk_data WHERE risk_level < 3 | risk_filtered_view | NULL | temp_view |
CIBIL 750 Program | Dataframe Op | Custom Transform | DataFrame | custom_transform_function() | custom_transformed_view | NULL | temp_view |
CIBIL 750 Program | Oracle Read | Read Oracle Data | Oracle | SELECT * FROM oracle_table WHERE condition = value | oracle_view | NULL | temp_view |
CIBIL 750 Program | Final Write | Write to Hive | SQL | INSERT INTO final_table SELECT * FROM temp_table | NULL | final_table | snapshot |
This approach ensures maximum flexibility and allows different behaviors such as appending data, overwriting tables, or creating snapshot versions directly through the control table configuration. Let me know if you need additional adjustments!
To accommodate the execution of DataFrame operations where the exact code snippet is passed in the control table, we need to execute the snippet dynamically using exec
. Here’s how the solution can handle this:
Updated execute_step
Function
This version supports executing code snippets for DataFrame operations.
def execute_step(spark, step, config):
"""
Execute a single step in the ETL process, supporting SQL, Oracle, and DataFrame operations.
"""
operation_type = step["operation_type"]
query = step["query"]
temp_view_name = step.get("temp_view_name")
table_name = step.get("table_name")
write_mode = step.get("write_mode")
if operation_type == "SQL":
# Execute a SQL query
execute_sql_query(spark, query, temp_view_name, table_name, write_mode)
elif operation_type == "DataFrame":
# Execute a custom DataFrame operation (dynamic snippet execution)
# Assume 'query' contains the Python code snippet as a string
exec_locals = {"spark": spark}
exec(query, globals(), exec_locals)
# Check if a DataFrame was created in the snippet
if "df" in exec_locals:
df = exec_locals["df"]
if write_mode == "temp_view":
df.createOrReplaceTempView(temp_view_name)
elif write_mode == "table":
df.write.mode("overwrite").saveAsTable(table_name)
elif write_mode == "append":
df.write.mode("append").saveAsTable(table_name)
elif write_mode == "snapshot":
current_time = datetime.now().strftime("%Y%m%d%H%M")
snapshot_table_name = f"{table_name}_{current_time}"
df.write.mode("overwrite").saveAsTable(snapshot_table_name)
df.createOrReplaceTempView(f"{table_name}_view")
else:
raise ValueError("DataFrame operation did not create a DataFrame named 'df'.")
elif operation_type == "Oracle":
# Example: Reading from Oracle
oracle_jdbc_url = f"jdbc:oracle:thin:@{config['oracle_host']}:{config['oracle_port']}:{config['oracle_service_name']}"
df = spark.read \
.format("jdbc") \
.option("url", oracle_jdbc_url) \
.option("dbtable", f"({query})") \
.option("user", config["oracle_user"]) \
.option("password", config["oracle_password"]) \
.load()
df.createOrReplaceTempView(temp_view_name)
else:
raise ValueError(f"Unsupported operation type: {operation_type}")
Example Control Table Entry for DataFrame Operation
Here’s how a DataFrame operation can be represented in the control table. The query
column contains the Python code snippet.
INSERT INTO control_table VALUES
('CIBIL 750 Program', 'Dataframe Operation', 'Custom Transform', 'DataFrame',
'
# Sample DataFrame operation code
df = spark.table("cibil_filtered_view")
df = df.withColumn("risk_score", df["score"] * 1.1)
',
'custom_transformed_view', NULL, 'temp_view', 'PENDING', NULL, NULL, 'temp_view');
Code Snippet in query
Column
The query
column for DataFrame operations must be a valid Python code snippet. For example:
# Example: Filtering and transforming a DataFrame
df = spark.table("risk_filtered_view")
df = df.filter("risk_level < 2")
df = df.withColumn("adjusted_score", df["score"] * 0.9)
How It Works
- Dynamic Execution:
- The
query
column for a DataFrame operation is treated as Python code. - The code is executed using
exec
within a controlled environment.
- The
- DataFrame Handling:
- The snippet must create or modify a DataFrame named
df
. - The function handles writing this DataFrame based on the
write_mode
(e.g.,temp_view
,table
,append
, orsnapshot
).
- The snippet must create or modify a DataFrame named
- Flexibility:
- You can define any DataFrame transformation in the control table without hardcoding it into the application.
Control Table Schema with Example
Program Name | Stage Name | Step Name | Operation Type | Query | Temp View Name | Table Name | Write Mode |
---|---|---|---|---|---|---|---|
CIBIL 750 Program | Dataframe Operation | Custom Transform | DataFrame | # DataFrame snippet df = spark.table("cibil_filtered_view") df = df.withColumn("risk_score", df["score"] * 1.1) | custom_transformed_view | NULL | temp_view |
This approach provides flexibility while maintaining a structured ETL pipeline. Let me know if you have more specific requirements!