Project 1

# Project Structure

# etl_project/
# ├── config
# │   ├── config.json
# │   └── logging.conf
# ├── data
# │   ├── input
# │   │   ├── customer_data.csv
# │   └── output
# │       ├── eligible_customers.csv
# │       └── log_data.csv
# ├── etl
# │   ├── etl.py
# ├── logging
# │   └── logger.py
# ├── utils
# │   ├── spark_utils.py
# │   └── data_utils.py
# └── main.py

# ===============
# config/config.json
# ===============
{
  "spark": {
    "master": "yarn",
    "appName": "ETL Project",
    "hiveSupport": true
  },
  "input_data": {
    "hive_tables": {
      "customer_data": "db_name.customer_table",
      "cibil_data": "db_name.cibil_table",
      "risk_data": "db_name.risk_table",
      "collection_data": "db_name.collection_table"
    },
    "excel_files": [
      "data/input/extra_data.xlsx"
    ]
  },
  "output_data": {
    "hive_table": "db_name.eligible_customers",
    "oracle_table": "eligible_customers_oracle",
    "log_data": "data/output/log_data.csv"
  },
  "oracle": {
    "url": "jdbc:oracle:thin:@host:port/sid",
    "user": "username",
    "password": "password",
    "driver": "oracle.jdbc.driver.OracleDriver"
  },
  "etl_process": {
    "products": ["Personal Loan", "Business Loan", "UPI Loan"],
    "programs": {
      "Personal Loan": ["Cibil 750 Program"],
      "Business Loan": ["Risk Score Program"],
      "UPI Loan": ["Collection History Program"]
    }
  }
}

# ===============
# utils/spark_utils.py
# ===============
from pyspark.sql import SparkSession

def create_spark_session(config):
    """Create and return a Spark session with Hive support if enabled."""
    builder = SparkSession.builder 
        .appName(config["spark"]["appName"]) 
        .master(config["spark"]["master"])

    if config["spark"].get("hiveSupport", False):
        builder = builder.enableHiveSupport()

    return builder.getOrCreate()

# ===============
# utils/data_utils.py
# ===============
from pyspark.sql import DataFrame

def read_hive_table(spark, table_name: str) -> DataFrame:
    """Read data from a Hive table."""
    return spark.sql(f"SELECT * FROM {table_name}")

def read_excel_file(spark, file_path: str) -> DataFrame:
    """Read data from an Excel file."""
    return spark.read.format("com.crealytics.spark.excel") 
        .option("header", "true") 
        .option("inferSchema", "true") 
        .load(file_path)

def write_hive_table(df: DataFrame, table_name: str):
    """Write data to a Hive table."""
    df.write.mode("overwrite").saveAsTable(table_name)

def write_oracle_table(df: DataFrame, config: dict):
    """Write data to an Oracle table."""
    df.write.format("jdbc") 
        .option("url", config["oracle"]["url"]) 
        .option("dbtable", config["oracle"]["table"]) 
        .option("user", config["oracle"]["user"]) 
        .option("password", config["oracle"]["password"]) 
        .option("driver", config["oracle"]["driver"]) 
        .mode("overwrite") 
        .save()

# ===============
# logging/logger.py
# ===============
import logging

def setup_logging():
    """Set up logging for the ETL project."""
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler("data/output/log_data.csv"),
            logging.StreamHandler()
        ]
    )

# ===============
# etl/etl.py
# ===============
import logging
from utils.data_utils import read_hive_table, read_excel_file, write_hive_table, write_oracle_table

def etl_process(spark, config):
    logging.info("Starting ETL process...")

    # Load input data from Hive
    customer_data = read_hive_table(spark, config["input_data"]["hive_tables"]["customer_data"])
    cibil_data = read_hive_table(spark, config["input_data"]["hive_tables"]["cibil_data"])
    risk_data = read_hive_table(spark, config["input_data"]["hive_tables"]["risk_data"])
    collection_data = read_hive_table(spark, config["input_data"]["hive_tables"]["collection_data"])

    # Load Excel files if specified
    for excel_file in config["input_data"]["excel_files"]:
        extra_data = read_excel_file(spark, excel_file)
        customer_data = customer_data.union(extra_data)

    # Process each product and its programs
    for product in config["etl_process"]["products"]:
        for program in config["etl_process"]["programs"][product]:
            logging.info(f"Processing {program} for {product}...")
            
            if program == "Cibil 750 Program":
                eligible_customers = customer_data.filter("cibil_score >= 750")
            elif program == "Risk Score Program":
                eligible_customers = customer_data.filter("risk_score <= 50")
            elif program == "Collection History Program":
                eligible_customers = customer_data.filter("collection_history == 'Good'")
            else:
                logging.warning(f"Unknown program: {program}")
                continue

            # Write results to Hive
            write_hive_table(eligible_customers, config["output_data"]["hive_table"])

            # Write results to Oracle
            write_oracle_table(eligible_customers, config)

    logging.info("ETL process completed successfully!")

# ===============
# main.py
# ===============
import json
import logging
from etl.etl import etl_process
from utils.spark_utils import create_spark_session
from logging.logger import setup_logging

if __name__ == "__main__":
    # Setup logging
    setup_logging()

    # Load configuration
    config_path = "config/config.json"
    with open(config_path, "r") as config_file:
        config = json.load(config_file)

    # Create Spark session
    spark = create_spark_session(config)

    # Run ETL process
    try:
        etl_process(spark, config)
    except Exception as e:
        logging.error(f"ETL process failed: {e}")

    logging.info("ETL project completed successfully!")

Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Skills

Posted on

December 21, 2024