etl_project/
├── config/
│ ├── base_config.json
│ ├── product_configs/
│ │ ├── pl_cibil750.json
├── control/
│ ├── control_table_setup.sql
│ ├── sample_control_table_data.sql
├── scripts/
│ ├── run_etl.sh
│ ├── run_etl.py
├── etl/
│ ├── execute_query.py
│ ├── log_status.py
│ ├── stage_runner.py
│ ├── job_orchestrator.py
├── logging/
│ ├── logger.py
├── utils/
│ ├── spark_utils.py
│ ├── query_utils.py
│ ├── data_utils.py
│ ├── config_utils.py
│ ├── error_handling.py
└── README.md
2. Configuration Files
a. base_config.json
jsonCopy code{
"hive_metastore": "thrift://localhost:9083",
"spark_master": "local[*]",
"log_level": "INFO",
"oracle_host": "oracle-host",
"oracle_port": "1521",
"oracle_service_name": "orcl",
"oracle_user": "oracle_user",
"oracle_password": "oracle_password"
}
b. pl_cibil750.json
{
"program_name": "CIBIL 750 Program",
"stages": [
{
"stage_name": "CIBIL Filter",
"steps": [
{
"step_name": "Read CIBIL Data",
"operation_type": "SQL",
"query": "SELECT * FROM cibil_data WHERE score > 750",
"temp_view_name": "cibil_filtered_view"
}
]
},
{
"stage_name": "Risk Filter",
"steps": [
{
"step_name": "Apply Risk Filter",
"operation_type": "SQL",
"query": "SELECT * FROM risk_data WHERE risk_level < 3",
"temp_view_name": "risk_filtered_view"
}
]
},
{
"stage_name": "Final Write",
"steps": [
{
"step_name": "Write to Hive",
"operation_type": "SQL",
"query": "INSERT INTO final_table SELECT * FROM temp_table"
}
]
}
]
}
3. Control Table
a. control_table_setup.sql
The control table includes the write_mode
column to specify the operation type (temp_view
, table
, append
, snapshot
). This ensures flexibility in defining the desired operation.
INSERT INTO control_table VALUES
('CIBIL 750 Program', 'CIBIL Filter', 'Read CIBIL Data', 'SQL', 'SELECT * FROM cibil_data WHERE score > 750', 'cibil_filtered_view', NULL, 'temp_view', 'PENDING', NULL, NULL, 'temp_view'),
('CIBIL 750 Program', 'CIBIL Filter', 'Apply Risk Filter', 'SQL', 'SELECT * FROM risk_data WHERE risk_level < 3', 'risk_filtered_view', NULL, 'temp_view', 'PENDING', NULL, NULL, 'temp_view'),
('CIBIL 750 Program', 'Dataframe Operation', 'Custom Transform', 'DataFrame', 'custom_transform_function()', 'custom_transformed_view', NULL, 'temp_view', 'PENDING', NULL, NULL, 'temp_view'),
('CIBIL 750 Program', 'Oracle Read', 'Read Oracle Data', 'Oracle', 'SELECT * FROM oracle_table WHERE condition = value', 'oracle_view', NULL, 'temp_view', 'PENDING', NULL, NULL, 'temp_view'),
('CIBIL 750 Program', 'Final Write', 'Write to Hive', 'SQL', 'INSERT INTO final_table SELECT * FROM temp_table', 'final_table', NULL, 'snapshot', 'PENDING', NULL, NULL, 'snapshot');
INSERT INTO control_table VALUES
('CIBIL 750 Program', 'Dataframe Operation', 'Custom Transform', 'DataFrame',
'
# Sample DataFrame operation code
df = spark.table("cibil_filtered_view")
df = df.withColumn("risk_score", df["score"] * 1.1)
',
'custom_transformed_view', NULL, 'temp_view', 'PENDING', NULL, NULL, 'temp_view');
Program Name | Stage Name | Step Name | Operation Type | Query | Temp View Name | Table Name | Write Mode |
---|---|---|---|---|---|---|---|
CIBIL 750 Program | CIBIL Filter | Read CIBIL Data | SQL | SELECT * FROM cibil_data WHERE score > 750 | cibil_filtered_view | NULL | temp_view |
CIBIL 750 Program | CIBIL Filter | Apply Risk Filter | SQL | SELECT * FROM risk_data WHERE risk_level < 3 | risk_filtered_view | NULL | temp_view |
CIBIL 750 Program | Dataframe Op | Custom Transform | DataFrame | custom_transform_function() | custom_transformed_view | NULL | temp_view |
CIBIL 750 Program | Oracle Read | Read Oracle Data | Oracle | SELECT * FROM oracle_table WHERE condition = value | oracle_view | NULL | temp_view |
CIBIL 750 Program | Final Write | Write to Hive | SQL | INSERT INTO final_table SELECT * FROM temp_table | NULL | final_table | snapshot |
Program Name | Stage Name | Step Name | Operation Type | Query | Temp View Name | Table Name | Write Mode |
---|---|---|---|---|---|---|---|
CIBIL 750 Program | Dataframe Operation | Custom Transform | DataFrame | # DataFrame snippet df = spark.table("cibil_filtered_view") df = df.withColumn("risk_score", df["score"] * 1.1) | custom_transformed_view | NULL | temp_view |
4. Scripts
a. run_etl.sh
#!/bin/bash
CONFIG_PATH="config/base_config.json"
PROGRAM_NAME="CIBIL 750 Program"
spark-submit --master local[*] --deploy-mode client scripts/run_etl.py $PROGRAM_NAME $CONFIG_PATH
b. run_etl.py
import sys
from etl.job_orchestrator import run_program
if __name__ == "__main__":
program_name = sys.argv[1]
config_path = sys.argv[2]
run_program(program_name, config_path)
c. etl/execute_query.py
from utils.query_utils import execute_sql_query, execute_dataframe_operation
from utils.data_utils import custom_transform_function
def execute_step(spark, step, config):
"""
Execute a single step in the ETL process, supporting multiple write modes.
"""
operation_type = step["operation_type"]
query = step["query"]
temp_view_name = step.get("temp_view_name")
table_name = step.get("table_name")
write_mode = step.get("write_mode")
if operation_type == "SQL":
# Execute a SQL query
execute_sql_query(spark, query, temp_view_name, table_name, write_mode)
elif operation_type == "DataFrame":
# Example: Execute a DataFrame operation
if query == "custom_transform_function()":
df = spark.table(temp_view_name) # Assume input is already registered as a view
transformed_df = custom_transform_function(df)
transformed_df.createOrReplaceTempView(temp_view_name)
elif operation_type == "DataFrame":
# Execute a custom DataFrame operation (dynamic snippet execution)
# Assume 'query' contains the Python code snippet as a string
exec_locals = {"spark": spark}
exec(query, globals(), exec_locals)
# Check if a DataFrame was created in the snippet
if "df" in exec_locals:
df = exec_locals["df"]
if write_mode == "temp_view":
df.createOrReplaceTempView(temp_view_name)
elif write_mode == "table":
df.write.mode("overwrite").saveAsTable(table_name)
elif write_mode == "append":
df.write.mode("append").saveAsTable(table_name)
elif write_mode == "snapshot":
current_time = datetime.now().strftime("%Y%m%d%H%M")
snapshot_table_name = f"{table_name}_{current_time}"
df.write.mode("overwrite").saveAsTable(snapshot_table_name)
df.createOrReplaceTempView(f"{table_name}_view")
else:
raise ValueError("DataFrame operation did not create a DataFrame named 'df'.")
elif operation_type == "Oracle":
# Example: Reading from Oracle
oracle_jdbc_url = f"jdbc:oracle:thin:@{config['oracle_host']}:{config['oracle_port']}:{config['oracle_service_name']}"
df = spark.read
.format("jdbc")
.option("url", oracle_jdbc_url)
.option("dbtable", f"({query})")
.option("user", config["oracle_user"])
.option("password", config["oracle_password"])
.load()
df.createOrReplaceTempView(temp_view_name)
else:
raise ValueError(f"Unsupported operation type: {operation_type}")
d. etl/log_status.py
def log_status_to_control_table(spark, program_name, stage_name, step_name, status, error_message, start_time, end_time):
spark.sql(f"""
INSERT INTO control_table (program_name, stage_name, step_name, status, error_message, start_time, end_time)
VALUES ('{program_name}', '{stage_name}', '{step_name}', '{status}', '{error_message}', '{start_time}', '{end_time}')
""")
##Logging Enhancements
Logging Script (logging/logger.py)
import logging
def get_logger(name):
"""
Set up a logger with a specific name and INFO level logging.
"""
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
e. etl/stage_runner.py
from etl.execute_query import execute_step
from etl.log_status import log_status_to_control_table
from utils.config_utils import get_control_table_steps
from utils.spark_utils import get_spark_session
def run_stage(program_name, stage_name, base_config):
spark = get_spark_session()
steps = get_control_table_steps(spark, program_name, stage_name)
for step in steps.collect():
step_name = step["step_name"]
status, error_message, start_time, end_time = execute_step(spark, step, base_config)
log_status_to_control_table(spark, program_name, stage_name, step_name, status, error_message, start_time, end_time)
spark.stop()
updated Stage Runner
from etl.execute_query import execute_step
from etl.log_status import log_status_to_control_table
from utils.config_utils import get_control_table_steps
from utils.spark_utils import get_spark_session
from logging.logger import get_logger
logger = get_logger("ETL")
def run_stage(program_name, stage_name, base_config):
"""
Execute all steps in a stage, logging the results to the control table.
"""
spark = get_spark_session()
steps = get_control_table_steps(spark, program_name, stage_name)
for step in steps.collect():
step_name = step["step_name"]
try:
execute_step(spark, step, base_config)
log_status_to_control_table(spark, program_name, stage_name, step_name, "SUCCESS", None, start_time, end_time)
logger.info(f"Step {step_name} completed successfully.")
except Exception as e:
log_status_to_control_table(spark, program_name, stage_name, step_name, "FAILED", str(e), start_time, end_time)
logger.error(f"Error in step {step_name}: {e}")
spark.stop()
f. etl/job_orchestrator.py
from etl.stage_runner import run_stage
from utils.config_utils import load_json_config
from utils.spark_utils import get_spark_session
def run_program(program_name, config_path):
config = load_json_config(config_path)
spark = get_spark_session()
stages = spark.sql(f"SELECT DISTINCT stage_name FROM control_table WHERE program_name='{program_name}'")
for stage in stages.collect():
run_stage(program_name, stage["stage_name"], config)
spark.stop()
5. Utilities
a. utils/spark_utils.py
from pyspark.sql import SparkSession
def get_spark_session(app_name="ETL", master="local[*]"):
return SparkSession.builder
.appName(app_name)
.master(master)
.enableHiveSupport()
.getOrCreate()
b. utils/config_utils.py
import json
def load_json_config(config_path):
with open(config_path, 'r') as file:
return json.load(file)
def get_control_table_steps(spark, program_name, stage_name):
query = f"SELECT * FROM control_table WHERE program_name='{program_name}' AND stage_name='{stage_name}'"
return spark.sql(query)
c. data_utils.py
def custom_transform_function(df):
"""
Example custom transformation function for DataFrame operations.
Modify this as per the specific business logic.
"""
return df.filter("score > 700").withColumnRenamed("old_column_name", "new_column_name")
d1. config_utils.py
import json
def load_json_config(config_path):
"""
Load configuration from a JSON file.
"""
with open(config_path, 'r') as file:
return json.load(file)
def get_control_table_steps(spark, program_name, stage_name):
"""
Fetch steps for a specific stage and program from the control table.
"""
query = f"SELECT * FROM control_table WHERE program_name='{program_name}' AND stage_name='{stage_name}'"
return spark.sql(query)
e. error_handling.py
def log_error(error_message, logger):
"""
Log error messages to a logging service or file.
"""
logger.error(f"Error: {error_message}")
return {"status": "FAILED", "message": error_message}
e. query_utils.py
from datetime import datetime
def execute_dataframe_operation(spark, operation, temp_view_name, table_name, write_mode):
start_time = datetime.now()
try:
exec(operation)
if write_mode == 'temp_view':
spark.catalog.createOrReplaceTempView(temp_view_name)
elif write_mode == 'table':
spark.table(temp_view_name).write.mode("overwrite").saveAsTable(table_name)
elif write_mode == 'snapshot':
current_time = datetime.now().strftime("%Y%m%d%H%M")
snapshot_table_name = f"{table_name}_{current_time}"
spark.table(temp_view_name).write.mode("overwrite").saveAsTable(snapshot_table_name)
spark.table(temp_view_name).createOrReplaceTempView(f"{table_name}_view")
end_time = datetime.now()
return "SUCCESS", None, start_time, end_time
except Exception as e:
end_time = datetime.now()
return "FAILED", str(e), start_time, end_time
def execute_sql_query(spark, query, temp_view_name=None, table_name=None, write_mode=None):
"""
Execute SQL query and handle different write modes: temp_view, table, snapshot.
"""
df = spark.sql(query)
if write_mode == "temp_view":
df.createOrReplaceTempView(temp_view_name)
elif write_mode == "table":
df.write.mode("overwrite").saveAsTable(table_name)
elif write_mode == "snapshot":
current_time = datetime.now().strftime("%Y%m%d%H%M")
snapshot_table_name = f"{table_name}_{current_time}"
df.write.mode("overwrite").saveAsTable(snapshot_table_name)
df.createOrReplaceTempView(f"{table_name}_view")
elif write_mode == "append":
df.write.mode("append").saveAsTable(table_name)
else:
raise ValueError(f"Unsupported write mode: {write_mode}")
we willSave the codes in their respective files:
base_config.json
andpl_cibil750.json
in theconfig/
directorycontrol_table_setup.sql
andsample_control_table_data.sql
in thecontrol/
directoryrun_etl.sh
andrun_etl.py
in thescripts/
directoryexecute_query.py
,log_status.py
,stage_runner.py
, andjob_orchestrator.py
in theetl/
directorylogger.py
in thelogging/
directoryspark_utils.py
,query_utils.py
,data_utils.py
,config_utils.py
, anderror_handling.py
in theutils/
directory
To execute the ETL pipeline, we will run the run_etl.sh
script from the scripts/
directory:
Bash
cd etl_project/scripts
./run_etl.sh
Discover more from HintsToday
Subscribe to get the latest posts sent to your email.