# Project Structure
# etl_project/
# ├── config
# │ ├── config.json
# │ └── logging.conf
# ├── data
# │ ├── input
# │ │ ├── customer_data.csv
# │ └── output
# │ ├── eligible_customers.csv
# │ └── log_data.csv
# ├── etl
# │ ├── etl.py
# ├── logging
# │ └── logger.py
# ├── utils
# │ ├── spark_utils.py
# │ └── data_utils.py
# └── main.py
# ===============
# config/config.json
# ===============
{
"spark": {
"master": "yarn",
"appName": "ETL Project",
"hiveSupport": true
},
"input_data": {
"hive_tables": {
"customer_data": "db_name.customer_table",
"cibil_data": "db_name.cibil_table",
"risk_data": "db_name.risk_table",
"collection_data": "db_name.collection_table"
},
"excel_files": [
"data/input/extra_data.xlsx"
]
},
"output_data": {
"hive_table": "db_name.eligible_customers",
"oracle_table": "eligible_customers_oracle",
"log_data": "data/output/log_data.csv"
},
"oracle": {
"url": "jdbc:oracle:thin:@host:port/sid",
"user": "username",
"password": "password",
"driver": "oracle.jdbc.driver.OracleDriver"
},
"etl_process": {
"products": ["Personal Loan", "Business Loan", "UPI Loan"],
"programs": {
"Personal Loan": ["Cibil 750 Program"],
"Business Loan": ["Risk Score Program"],
"UPI Loan": ["Collection History Program"]
}
}
}
# ===============
# utils/spark_utils.py
# ===============
from pyspark.sql import SparkSession
def create_spark_session(config):
"""Create and return a Spark session with Hive support if enabled."""
builder = SparkSession.builder
.appName(config["spark"]["appName"])
.master(config["spark"]["master"])
if config["spark"].get("hiveSupport", False):
builder = builder.enableHiveSupport()
return builder.getOrCreate()
# ===============
# utils/data_utils.py
# ===============
from pyspark.sql import DataFrame
def read_hive_table(spark, table_name: str) -> DataFrame:
"""Read data from a Hive table."""
return spark.sql(f"SELECT * FROM {table_name}")
def read_excel_file(spark, file_path: str) -> DataFrame:
"""Read data from an Excel file."""
return spark.read.format("com.crealytics.spark.excel")
.option("header", "true")
.option("inferSchema", "true")
.load(file_path)
def write_hive_table(df: DataFrame, table_name: str):
"""Write data to a Hive table."""
df.write.mode("overwrite").saveAsTable(table_name)
def write_oracle_table(df: DataFrame, config: dict):
"""Write data to an Oracle table."""
df.write.format("jdbc")
.option("url", config["oracle"]["url"])
.option("dbtable", config["oracle"]["table"])
.option("user", config["oracle"]["user"])
.option("password", config["oracle"]["password"])
.option("driver", config["oracle"]["driver"])
.mode("overwrite")
.save()
# ===============
# logging/logger.py
# ===============
import logging
def setup_logging():
"""Set up logging for the ETL project."""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("data/output/log_data.csv"),
logging.StreamHandler()
]
)
# ===============
# etl/etl.py
# ===============
import logging
from utils.data_utils import read_hive_table, read_excel_file, write_hive_table, write_oracle_table
def etl_process(spark, config):
logging.info("Starting ETL process...")
# Load input data from Hive
customer_data = read_hive_table(spark, config["input_data"]["hive_tables"]["customer_data"])
cibil_data = read_hive_table(spark, config["input_data"]["hive_tables"]["cibil_data"])
risk_data = read_hive_table(spark, config["input_data"]["hive_tables"]["risk_data"])
collection_data = read_hive_table(spark, config["input_data"]["hive_tables"]["collection_data"])
# Load Excel files if specified
for excel_file in config["input_data"]["excel_files"]:
extra_data = read_excel_file(spark, excel_file)
customer_data = customer_data.union(extra_data)
# Process each product and its programs
for product in config["etl_process"]["products"]:
for program in config["etl_process"]["programs"][product]:
logging.info(f"Processing {program} for {product}...")
if program == "Cibil 750 Program":
eligible_customers = customer_data.filter("cibil_score >= 750")
elif program == "Risk Score Program":
eligible_customers = customer_data.filter("risk_score <= 50")
elif program == "Collection History Program":
eligible_customers = customer_data.filter("collection_history == 'Good'")
else:
logging.warning(f"Unknown program: {program}")
continue
# Write results to Hive
write_hive_table(eligible_customers, config["output_data"]["hive_table"])
# Write results to Oracle
write_oracle_table(eligible_customers, config)
logging.info("ETL process completed successfully!")
# ===============
# main.py
# ===============
import json
import logging
from etl.etl import etl_process
from utils.spark_utils import create_spark_session
from logging.logger import setup_logging
if __name__ == "__main__":
# Setup logging
setup_logging()
# Load configuration
config_path = "config/config.json"
with open(config_path, "r") as config_file:
config = json.load(config_file)
# Create Spark session
spark = create_spark_session(config)
# Run ETL process
try:
etl_process(spark, config)
except Exception as e:
logging.error(f"ETL process failed: {e}")
logging.info("ETL project completed successfully!")
Discover more from HintsToday
Subscribe to get the latest posts sent to your email.