How to code in Pyspark a Complete ETL job using only Pyspark sql api not dataframe specific API?

Here’s an example of a complete ETL (Extract, Transform, Load) job using PySpark SQL API:

from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.appName("ETL Job").getOrCreate()

# Extract (Read from source)
spark.sql("CREATE TEMPORARY VIEW sales USING org.apache.spark.sql.parquet OPTIONS (path 'sales_data.parquet')")
spark.sql("CREATE TEMPORARY VIEW customers USING org.apache.spark.sql.parquet OPTIONS (path 'customer_data.parquet')")

# Transform (Join, Filter, Aggregate)
spark.sql("""
    SELECT 
        s.order_id, 
        c.customer_name, 
        SUM(s.order_total) AS total_sales
    FROM 
        sales s
    JOIN 
        customers c ON s.customer_id = c.customer_id
    WHERE 
        s.order_date >= '2022-01-01'
    GROUP BY 
        s.order_id, 
        c.customer_name
""").createOrReplaceTempView("transformed_data")

# Load (Write to target)
spark.sql("CREATE TABLE IF NOT EXISTS transformed_sales USING parquet OPTIONS (path 'transformed_sales_data.parquet')")
spark.sql("INSERT INTO TABLE transformed_sales SELECT * FROM transformed_data")

# Verify loaded data
spark.sql("SELECT * FROM transformed_sales").show()

Explanation

  1. Extract: Read data from source (Parquet files) into temporary views.
  2. Transform: Perform transformations (join, filter, aggregate) using SQL queries.
  3. Load: Write transformed data to target (Parquet file).

Tips and Variations

  1. Use CREATE TABLE instead of CREATE TEMPORARY VIEW for persistent storage.
  2. Specify storage formats (e.g., USING org.apache.spark.sql.json).
  3. Utilize SQL functions (e.g., DATE_TRUNC, CONCAT).
  4. Leverage Spark SQL’s built-in functions (e.g., IN, LIKE).
  5. Optimize performance with partitioning and caching.

PySpark ETL script that incorporates : control table management, job status tracking, data pre-checks, retries, dynamic broadcasting, caching, and logging.

Here’s a complete PySpark ETL script that incorporates all the features we discussed: control table management, job status tracking, data pre-checks, retries, dynamic broadcasting, caching, and logging. This is a robust framework for large-scale ETL jobs running in a production environment.

Complete PySpark ETL Code:

from pyspark.sql import SparkSession
from datetime import datetime, timedelta
import logging

# Initialize logger
logger = logging.getLogger('ETL_Logger')
logger.setLevel(logging.INFO)

# Spark session setup
spark = SparkSession.builder 
    .appName("ETL_Job") 
    .enableHiveSupport() 
    .getOrCreate()

# Function to update job status in the job status table
def update_job_status(job_id, status, error_message=None):
    end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    
    # Update job status in Hive table
    spark.sql(f"""
        UPDATE job_status_table
        SET status = '{status}', 
            end_time = '{end_time}', 
            error_message = '{error_message}' 
        WHERE job_id = {job_id}
    """)
    
    logger.info(f"Job {job_id} status updated to {status}")

# Function to check data availability for all source tables
def check_data_availability(sources, month):
    for source in sources:
        logger.info(f"Checking data availability for source: {source}")
        
        # Count rows in source table for the given month
        count_df = spark.sql(f"SELECT COUNT(1) FROM {source} WHERE month = '{month}'")
        count = count_df.collect()[0][0]

        # If no data exists for the current month, fail the job
        if count == 0:
            logger.error(f"No data available for {source} for month {month}. Failing the job.")
            raise Exception(f"Data not available for {source}")

        # Compare with previous month count
        prev_month = (datetime.now().replace(day=1) - timedelta(days=1)).strftime('%Y-%m')
        prev_count_df = spark.sql(f"SELECT COUNT(1) FROM {source} WHERE month = '{prev_month}'")
        prev_count = prev_count_df.collect()[0][0]

        logger.info(f"Source {source}: Current month count = {count}, Previous month count = {prev_count}")

        # Insert source count into control table
        spark.sql(f"""
            INSERT INTO control_table 
            (step_id, step_name, priority, source_table, source_count, status, run_date) 
            VALUES (1, 'read {source}', 1, '{source}', {count}, 'pending', current_date())
        """)

# Main ETL job function
def etl_job(job_id, sources, month):
    try:
        logger.info(f"Starting ETL job with Job ID: {job_id}")

        # Step 1: Check Data Availability
        check_data_availability(sources, month)

        # Step 2: Read data from sources (assuming they are in Hive tables)
        dataframes = {}
        for source in sources:
            logger.info(f"Reading data from {source}")
            df = spark.sql(f"SELECT * FROM {source} WHERE month = '{month}'")
            df.createOrReplaceTempView(f"{source}_view")
            dataframes[source] = df
        
        # Dynamic broadcasting of smaller tables
        for table in dataframes:
            df_size = dataframes[table].rdd.map(lambda x: len(str(x))).sum()
            if df_size < 100 * 1024 * 1024:  # Size threshold for broadcasting (100MB)
                logger.info(f"Broadcasting table: {table}")
                spark.conf.set("spark.sql.autoBroadcastJoinThreshold", df_size)

        # Step 3: Perform joins and transformations
        logger.info("Performing data transformations and joins")

        # First join
        join_df1 = spark.sql("""
            SELECT a.*, b.other_column
            FROM source1_view a
            JOIN source2_view b ON a.id = b.id
        """)
        join_df1.cache()  # Caching intermediate result

        # Second join with another table
        join_df2 = spark.sql("""
            SELECT a.*, c.additional_info
            FROM join_df1 a
            JOIN source3_view c ON a.id = c.id
        """)

        # Step 4: Write to target table
        logger.info("Writing results to the target table")
        join_df2.write.mode('overwrite').saveAsTable('bdl_final_table')

        # Step 5: Clean up cache
        join_df1.unpersist()
        join_df2.unpersist()

        # Step 6: Update job status to completed
        update_job_status(job_id, 'completed')
        
    except Exception as e:
        logger.error(f"Job failed due to error: {str(e)}")
        update_job_status(job_id, 'failed', str(e))

# Retry mechanism for running ETL jobs
def run_etl_with_retries(job_id, sources, month, max_retries=3):
    retry_count = 0
    while retry_count <= max_retries:
        try:
            etl_job(job_id, sources, month)
            logger.info(f"Job {job_id} completed successfully.")
            break
        except Exception as e:
            retry_count += 1
            logger.error(f"Job {job_id} failed on attempt {retry_count}: {e}")
            if retry_count > max_retries:
                logger.error(f"Max retries reached for job {job_id}. Job failed.")
                update_job_status(job_id, 'failed', f"Max retries reached: {e}")
            else:
                logger.info(f"Retrying job {job_id} (attempt {retry_count + 1}/{max_retries})")

# List of sources to read from
sources = ["bdl_source_table1", "bdl_source_table2", "bdl_source_table3"]

# Running the ETL job for the current month with retries
run_etl_with_retries(1001, sources, "2024-10", max_retries=3)

Explanation and Enhancements

  1. Data Availability Pre-checks:
    • The function check_data_availability checks for data availability for all source tables for the current month and logs counts into the control table.
    • The job fails early if required data is missing.
  2. Dynamic Broadcasting:
    • We dynamically broadcast small tables (less than 100MB) to optimize joins with larger tables.
  3. Caching and Memory Management:
    • The results of the first join (join_df1) are cached, and unpersisted after usage to free up memory.
    • Caching is applied where intermediate data needs to be reused across transformations.
  4. Job Status and Control Table:
    • The control_table logs information about the source table counts and progress of the ETL job.
    • The job_status_table logs the job’s overall status, including success, failure, and retry attempts.
  5. Retry Mechanism:
    • The run_etl_with_retries function wraps the main ETL job execution with a retry mechanism.
    • It retries the job up to max_retries times in case of failures, logging each attempt.
  6. Logging and Error Handling:
    • The logger captures key information such as job start/end, data availability, errors, and retry attempts.
    • In case of failure, the job status and error message are updated in the job_status_table.

Additional Enhancements

  • Email/Notification Alerts:
    • You can integrate email alerts (using a service like AWS SES or Spark’s alerting tool) to notify the team in case of job failures or retries.
  • Corrupt Data Handling:
    • If any data is found to be corrupt during reading or processing, you can implement a mechanism to move corrupt data into a “quarantine” folder for manual inspection.
  • Audit Logs:
    • Audit logs can be maintained for every ETL execution, logging details such as data transformations, count mismatches, and data quality checks for compliance reporting.

The solution includes:

  1. Schema creation for control and job status tables.
  2. Pre-checks for data availability.
  3. Data read, join, and transformation steps.
  4. Caching, broadcasting, and optimizations.
  5. Tracking ETL job progress and status updates.
  6. Handling retries and error logging.
  7. Enhancements such as auditing, retries, dynamic broadcasting, and memory management.

1. Table Schemas (Control and Job Status Tables)

Control Table Schema

CREATE TABLE IF NOT EXISTS control_table (
    step_id INT,
    step_name STRING,
    priority INT,
    source_table STRING,
    source_count BIGINT,
    target_table STRING,
    target_count BIGINT,
    status STRING,
    run_date DATE,
    error_message STRING
);

Job Status Table Schema

CREATE TABLE IF NOT EXISTS job_status_table (
    job_id INT,
    job_name STRING,
    run_date DATE,
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    retry_count INT,
    status STRING,
    error_message STRING
);

2. Pre-Check Logic for Data Availability

We will first verify if all the required source data is available for the specified month. If any source table lacks data, the ETL job should fail before any processing starts.

def check_data_availability(sources, month):
    for source in sources:
        logger.info(f"Checking data availability for source: {source}")
        
        # Count rows in source table for the given month
        count_df = spark.sql(f"SELECT COUNT(1) FROM {source} WHERE month = '{month}'")
        count = count_df.collect()[0][0]

        # If no data exists for the current month, fail the job
        if count == 0:
            logger.error(f"No data available for {source} for month {month}. Failing the job.")
            raise Exception(f"Data not available for {source}")

        # Compare with previous month count
        prev_month = (datetime.now().replace(day=1) - timedelta(days=1)).strftime('%Y-%m')
        prev_count_df = spark.sql(f"SELECT COUNT(1) FROM {source} WHERE month = '{prev_month}'")
        prev_count = prev_count_df.collect()[0][0]

        logger.info(f"Source {source}: Current month count = {count}, Previous month count = {prev_count}")

        # Insert source count into control table
        spark.sql(f"""
            INSERT INTO control_table 
            (step_id, step_name, priority, source_table, source_count, status, run_date) 
            VALUES (1, 'read {source}', 1, '{source}', {count}, 'pending', current_date())
        """)

3. Job Status Tracking

We’ll maintain the job status (whether it is running, completed, or failed) in the job_status_table. This allows us to track the entire job execution, including retries and errors.

def update_job_status(job_id, status, error_message=None):
    end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    
    # Update job status in Hive table
    spark.sql(f"""
        UPDATE job_status_table
        SET status = '{status}', 
            end_time = '{end_time}', 
            error_message = '{error_message}' 
        WHERE job_id = {job_id}
    """)
    
    logger.info(f"Job {job_id} status updated to {status}")

4. ETL Job with Data Reads, Joins, and Optimizations

This is the main body of the ETL job. We read the source tables, perform joins, apply transformations, and write the result to a target table.

Enhancements:

  1. Dynamic Broadcasting: If a table size is small, it should be broadcasted in joins.
  2. Caching: We cache intermediate results that will be reused multiple times.
  3. Error Handling: Logs and updates the status of the job and retries in case of failure.
  4. Memory Management: Cleans up after each step (cache, uncache).
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
import logging

# Initialize logger
logger = logging.getLogger('ETL_Logger')

# Spark session setup
spark = SparkSession.builder 
    .appName("ETL_Job") 
    .enableHiveSupport() 
    .getOrCreate()

# Main ETL job function
def etl_job(job_id, sources, month):
    try:
        logger.info(f"Starting ETL job with Job ID: {job_id}")

        # Step 1: Check Data Availability
        check_data_availability(sources, month)

        # Step 2: Read data from sources (assuming they are in Hive tables)
        dataframes = {}
        for source in sources:
            logger.info(f"Reading data from {source}")
            df = spark.sql(f"SELECT * FROM {source} WHERE month = '{month}'")
            df.createOrReplaceTempView(f"{source}_view")
            dataframes[source] = df
        
        # Dynamic broadcasting of smaller tables
        for table in dataframes:
            df_size = dataframes[table].rdd.map(lambda x: len(str(x))).sum()
            if df_size < 100 * 1024 * 1024:  # Size threshold for broadcasting (100MB)
                logger.info(f"Broadcasting table: {table}")
                spark.conf.set("spark.sql.autoBroadcastJoinThreshold", df_size)

        # Step 3: Perform joins and transformations
        logger.info("Performing data transformations and joins")

        # First join
        join_df1 = spark.sql("""
            SELECT a.*, b.other_column
            FROM source1_view a
            JOIN source2_view b ON a.id = b.id
        """)
        join_df1.cache()  # Caching intermediate result

        # Second join with another table
        join_df2 = spark.sql("""
            SELECT a.*, c.additional_info
            FROM join_df1 a
            JOIN source3_view c ON a.id = c.id
        """)

        # Step 4: Write to target table
        logger.info("Writing results to the target table")
        join_df2.write.mode('overwrite').saveAsTable('bdl_final_table')

        # Step 5: Clean up cache
        join_df1.unpersist()
        join_df2.unpersist()

        # Step 6: Update job status to completed
        update_job_status(job_id, 'completed')
        
    except Exception as e:
        logger.error(f"Job failed due to error: {str(e)}")
        update_job_status(job_id, 'failed', str(e))

# List of sources to read from
sources = ["bdl_source_table1", "bdl_source_table2", "bdl_source_table3"]

# Running the ETL job for the current month
etl_job(1001, sources, "2024-10")

5. Retry Mechanism

In case of failure, the job should be retried a set number of times. Here’s how to add a retry mechanism:

pythonCopy codedef run_etl_with_retries(job_id, sources, month, max_retries=3):
    retry_count = 0
    while retry_count <= max_retries:
        try:
            etl_job(job_id, sources, month)
            logger.info(f"Job {job_id} completed successfully.")
            break
        except Exception as e:
            retry_count += 1
            logger.error(f"Job {job_id} failed on attempt {retry_count}: {e}")
            if retry_count > max_retries:
                logger.error(f"Max retries reached for job {job_id}. Job failed.")
                update_job_status(job_id, 'failed', f"Max retries reached: {e}")
            else:
                logger.info(f"Retrying job {job_id} (attempt {retry_count + 1}/{max_retries})")

6. Enhancements Added

  1. Dynamic Broadcasting: If a table size is less than 100 MB, it gets broadcasted to optimize joins with larger tables.
  2. Caching and Uncaching: Intermediate results are cached to optimize repeated access, and uncached after use to free up memory.
  3. Pre-check Data Availability: The job will fail at the start if data is not available for the current month.
  4. Job Status and Control Table: Each step and job’s status is logged and recorded, making it easier to monitor, audit, and retry failed jobs.
  5. Retry Mechanism: Added a retry mechanism in case of job failure, with configurable retries.

Final Thoughts

  • Performance: Dynamic broadcasting and caching have been used to optimize joins and intermediate data processing.
  • Error Handling: The job logs every failure, updates the job status in the database, and supports automatic retries.
  • Monitoring: Both job status and step-level progress are tracked using the control and job status tables, giving visibility into the workflow’s state at any given point.
  • Memory Management: Proper use of cache() and unpersist() ensures that memory is efficiently managed.

This ETL framework is production-ready and is designed to handle large datasets, ensure robustness, and track job progress while providing opportunities for further customizations.

To successfully implement the ETL workflow described earlier, you need to have two essential tracking mechanisms in place: a control table and a job status table. These tables help to monitor the progress of the ETL job, track the job’s execution, log errors, and ensure data integrity.

Here, I’ll define the structure (schema) for both tables, provide sample data, and explain how to implement them in the PySpark SQL-based ETL job.

To successfully implement the ETL workflow described earlier, you need to have two essential tracking mechanisms in place: a control table and a job status table. These tables help to monitor the progress of the ETL job, track the job’s execution, log errors, and ensure data integrity.

Here, I’ll define the structure (schema) for both tables, provide sample data, and explain how to implement them in the PySpark SQL-based ETL job.


1. Control Table Schema:

This table tracks the details of each step of the ETL job, including its execution status, source data information, and any conditions for retry or failure.

Control Table Schema

Column NameData TypeDescription
step_idINTUnique ID for each step in the ETL job.
step_nameSTRINGName of the ETL step (e.g., ‘read source1’, ‘join tables’).
priorityINTPriority of the step (lower numbers mean higher priority).
source_tableSTRINGName of the source table being read or transformed.
source_countBIGINTRow count of the source data in the step.
target_tableSTRINGName of the target table being written.
target_countBIGINTRow count of the data being written.
statusSTRINGStatus of the step (‘pending’, ‘in-progress’, ‘completed’, ‘failed’).
run_dateDATEDate the step was last run.
error_messageSTRINGError message if the step fails.

Sample Data for Control Table

step_idstep_nameprioritysource_tablesource_counttarget_tabletarget_countstatusrun_dateerror_message
1read source11bdl_source_table11500000NULLNULLcompleted2024-10-01NULL
2join source1_22bdl_source_table21700000NULLNULLcompleted2024-10-01NULL
3write to target3NULLNULLbdl_target_table3000000completed2024-10-01NULL
4aggregate results4bdl_target_table3000000bdl_final_table1000000failed2024-10-01Memory error

2. Job Status Table Schema:

This table logs the overall status of the ETL job. It tracks each execution attempt, including the number of retries, job duration, and whether it succeeded or failed.

Job Status Table Schema

Column NameData TypeDescription
job_idINTUnique identifier for the entire ETL job run.
job_nameSTRINGName of the ETL job.
run_dateDATEDate when the job started.
start_timeTIMESTAMPTimestamp when the job execution started.
end_timeTIMESTAMPTimestamp when the job execution ended.
retry_countINTNumber of times the job was retried.
statusSTRINGOverall status of the job (‘pending’, ‘running’, ‘completed’, ‘failed’).
error_messageSTRINGError message in case of job failure.

Sample Data for Job Status Table

job_idjob_namerun_datestart_timeend_timeretry_countstatuserror_message
1001BDL_ETL_Job2024-10-012024-10-01 08:00:002024-10-01 09:30:000completedNULL
1002BDL_ETL_Job2024-10-022024-10-02 08:00:002024-10-02 10:00:002failedMemory error

3. Building and Using These Tables in the ETL Process

Here’s how you can build these tables in your Hive Metastore or your preferred data store and integrate them into the PySpark SQL-based ETL job.

Step 1: Creating Control Table and Job Status Table

You can create these tables in Hive using SQL commands. This will allow you to track the ETL progress.

-- Creating Control Table
CREATE TABLE IF NOT EXISTS control_table (
    step_id INT,
    step_name STRING,
    priority INT,
    source_table STRING,
    source_count BIGINT,
    target_table STRING,
    target_count BIGINT,
    status STRING,
    run_date DATE,
    error_message STRING
);

-- Creating Job Status Table
CREATE TABLE IF NOT EXISTS job_status_table (
    job_id INT,
    job_name STRING,
    run_date DATE,
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    retry_count INT,
    status STRING,
    error_message STRING
);

Step 2: Pre-Check for Data Availability (Checking Source Counts)

Before starting the ETL job, you’ll want to check that all source tables contain data for the specified month. If any source is missing data, the job should fail early.

def check_data_availability(sources, month):
    for source in sources:
        logger.info(f"Checking data availability for source: {source}")
        
        # Count rows in source table for the given month
        count_df = spark.sql(f"SELECT COUNT(1) FROM {source} WHERE month = '{month}'")
        count = count_df.collect()[0][0]

        # If no data exists for the current month, fail the job
        if count == 0:
            logger.error(f"No data available for {source} for month {month}. Failing the job.")
            raise Exception(f"Data not available for {source}")

        # Compare with previous month count
        prev_month = (datetime.now().replace(day=1) - timedelta(days=1)).strftime('%Y-%m')
        prev_count_df = spark.sql(f"SELECT COUNT(1) FROM {source} WHERE month = '{prev_month}'")
        prev_count = prev_count_df.collect()[0][0]

        logger.info(f"Source {source}: Current month count = {count}, Previous month count = {prev_count}")

        # Add source count to control table
        spark.sql(f"""
            INSERT INTO control_table 
            (step_id, step_name, priority, source_table, source_count, status, run_date) 
            VALUES (1, 'read {source}', 1, '{source}', {count}, 'pending', current_date())
        """)

Step 3: Tracking Job Status

You can track the status of each ETL job run using the job_status_table. Each time the job runs, it updates the job’s start time, end time, retry attempts, and whether the job succeeded or failed.

def update_job_status(job_id, status, error_message=None):
    end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    
    # Update job status in Hive table
    spark.sql(f"""
        UPDATE job_status_table
        SET status = '{status}', 
            end_time = '{end_time}', 
            error_message = '{error_message}' 
        WHERE job_id = {job_id}
    """)
    
    logger.info(f"Job {job_id} status updated to {status}")

Step 4: Performing the ETL Job

Below is an overview of the steps performed in the ETL job, including reading from source tables, checking data availability, performing joins, and updating the status.

def etl_job(job_id, sources, month):
    try:
        logger.info(f"Starting ETL job with Job ID: {job_id}")

        # Step 1: Check Data Availability
        check_data_availability(sources, month)

        # Step 2: Read data from sources (assuming they are in Hive tables)
        for source in sources:
            logger.info(f"Reading data from {source}")
            df = spark.sql(f"SELECT * FROM {source} WHERE month = '{month}'")
            df.createOrReplaceTempView(f"{source}_view")
        
        # Step 3: Perform joins and transformations
        logger.info("Performing data transformations and joins")
        transformed_df = spark.sql("""
            SELECT a.*, b.other_column
            FROM source1_view a
            JOIN source2_view b ON a.id = b.id
        """)
        
        # Cache the result to avoid recomputation
        transformed_df.cache()
        
        # Perform additional transformations
        final_df = spark.sql("""
            SELECT a.*, c.additional_info
            FROM transformed_df a
            JOIN source3_view c ON a.id = c.id
        """)
        
        # Write to target
        final_df.write.mode('overwrite').saveAsTable('bdl_final_table')

        # Step 4: Update job status
        update_job_status(job_id, 'completed')
        
    except Exception as e:
        logger.error(f"Job failed due to error: {str(e)}")
        update_job_status(job_id, 'failed', str(e))

Final Thoughts:

  • Control and Job Status Tables: Provide a robust mechanism to track and manage the execution of ETL jobs.
  • Pre-check Logic: Ensures data availability before starting the job to prevent unnecessary processing.
  • Job Status Updates: Help to track the job’s execution and easily identify issues.
  • Optimization: The job includes caching to optimize the execution time of chained transformations.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import logging
import os

# Initialize Spark session
spark = SparkSession.builder 
    .appName("BDL Processing Job") 
    .enableHiveSupport() 
    .getOrCreate()

# Initialize logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("BDL Processing Job")

# Configurations
input_table = "bdl.input_table"
output_table = "bdl.output_table"
control_table = "bdl.control_table"  # To track the state of file processing
max_retries = 3

# State management: Fetch unprocessed or failed files from the control table
def fetch_unprocessed_files():
    return spark.sql(f"SELECT * FROM {control_table} WHERE status = 'unprocessed' OR status = 'failed'")

# Retry mechanism: Retry failed files up to max_retries
def retry_failed_files(file_name, retries):
    return retries < max_retries

# Main processing function
def process_file(file_data):
    try:
        # Processing logic here (for example, transformations)
        processed_data = file_data.withColumn("processed", F.lit(True))
        
        # Write the processed data back to BDL table
        processed_data.write.mode("overwrite").insertInto(output_table)
        
        # Log success and update the control table
        logger.info(f"Processing completed for file: {file_data}")
        spark.sql(f"UPDATE {control_table} SET status = 'processed' WHERE file_name = '{file_data}'")
        
    except Exception as e:
        logger.error(f"Error processing file {file_data}: {str(e)}")
        spark.sql(f"UPDATE {control_table} SET status = 'failed' WHERE file_name = '{file_data}'")

        # Retry logic
        retries = spark.sql(f"SELECT retries FROM {control_table} WHERE file_name = '{file_data}'").collect()[0][0]
        if retry_failed_files(file_data, retries):
            logger.info(f"Retrying file {file_data} (Retry count: {retries+1})")
            spark.sql(f"UPDATE {control_table} SET retries = retries + 1 WHERE file_name = '{file_data}'")
            process_file(file_data)  # Retry
        else:
            logger.error(f"File {file_data} failed after {max_retries} retries")
            move_to_corrupt_folder(file_data)

# Function to move corrupt files to a folder for manual review
def move_to_corrupt_folder(file_data):
    corrupt_folder = "/path/to/corrupt_folder"
    os.rename(file_data, os.path.join(corrupt_folder, os.path.basename(file_data)))
    logger.info(f"Moved file {file_data} to corrupt folder")

# Function to send notifications in case of significant issues
def send_notification(message):
    # This can be integrated with email alerts or messaging systems (e.g., Slack)
    logger.info(f"Notification sent: {message}")

# Main Job Logic
if __name__ == "__main__":
    logger.info("Starting BDL Processing Job")
    
    # Fetch unprocessed or failed files from control table
    unprocessed_files = fetch_unprocessed_files()
    
    for file in unprocessed_files.collect():
        logger.info(f"Processing file: {file}")
        process_file(file)
    
    logger.info("BDL Processing Job completed")

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import logging
import os

# Initialize Spark session
spark = SparkSession.builder 
    .appName("BDL Processing Job") 
    .enableHiveSupport() 
    .getOrCreate()

# Initialize logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("BDL Processing Job")

# Configurations
input_table = "bdl.input_table"
output_table = "bdl.output_table"
control_table = "bdl.control_table"  # To track the state of file processing
max_retries = 3

# State management: Fetch unprocessed or failed files from the control table
def fetch_unprocessed_files():
    return spark.sql(f"SELECT * FROM {control_table} WHERE status = 'unprocessed' OR status = 'failed'")

# Retry mechanism: Retry failed files up to max_retries
def retry_failed_files(file_name, retries):
    return retries < max_retries

# Main processing function
def process_file(file_data):
    try:
        # Processing logic here (for example, transformations)
        processed_data = file_data.withColumn("processed", F.lit(True))
        
        # Write the processed data back to BDL table
        processed_data.write.mode("overwrite").insertInto(output_table)
        
        # Log success and update the control table
        logger.info(f"Processing completed for file: {file_data}")
        spark.sql(f"UPDATE {control_table} SET status = 'processed' WHERE file_name = '{file_data}'")
        
    except Exception as e:
        logger.error(f"Error processing file {file_data}: {str(e)}")
        spark.sql(f"UPDATE {control_table} SET status = 'failed' WHERE file_name = '{file_data}'")

        # Retry logic
        retries = spark.sql(f"SELECT retries FROM {control_table} WHERE file_name = '{file_data}'").collect()[0][0]
        if retry_failed_files(file_data, retries):
            logger.info(f"Retrying file {file_data} (Retry count: {retries+1})")
            spark.sql(f"UPDATE {control_table} SET retries = retries + 1 WHERE file_name = '{file_data}'")
            process_file(file_data)  # Retry
        else:
            logger.error(f"File {file_data} failed after {max_retries} retries")
            move_to_corrupt_folder(file_data)

# Function to move corrupt files to a folder for manual review
def move_to_corrupt_folder(file_data):
    corrupt_folder = "/path/to/corrupt_folder"
    os.rename(file_data, os.path.join(corrupt_folder, os.path.basename(file_data)))
    logger.info(f"Moved file {file_data} to corrupt folder")

# Function to send notifications in case of significant issues
def send_notification(message):
    # This can be integrated with email alerts or messaging systems (e.g., Slack)
    logger.info(f"Notification sent: {message}")

# Main Job Logic
if __name__ == "__main__":
    logger.info("Starting BDL Processing Job")
    
    # Fetch unprocessed or failed files from control table
    unprocessed_files = fetch_unprocessed_files()
    
    for file in unprocessed_files.collect():
        logger.info(f"Processing file: {file}")
        process_file(file)
    
    logger.info("BDL Processing Job completed")

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import logging
import os

# Initialize Spark session
spark = SparkSession.builder 
    .appName("BDL Processing Job") 
    .enableHiveSupport() 
    .getOrCreate()

# Initialize logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("BDL Processing Job")

# Configurations
input_table = "bdl.input_table"
output_table = "bdl.output_table"
control_table = "bdl.control_table"  # To track the state of file processing
max_retries = 3

# State management: Fetch unprocessed or failed files from the control table
def fetch_unprocessed_files():
    return spark.sql(f"SELECT * FROM {control_table} WHERE status = 'unprocessed' OR status = 'failed'")

# Retry mechanism: Retry failed files up to max_retries
def retry_failed_files(file_name, retries):
    return retries < max_retries

# Main processing function
def process_file(file_data):
    try:
        # Processing logic here (for example, transformations)
        processed_data = file_data.withColumn("processed", F.lit(True))
        
        # Write the processed data back to BDL table
        processed_data.write.mode("overwrite").insertInto(output_table)
        
        # Log success and update the control table
        logger.info(f"Processing completed for file: {file_data}")
        spark.sql(f"UPDATE {control_table} SET status = 'processed' WHERE file_name = '{file_data}'")
        
    except Exception as e:
        logger.error(f"Error processing file {file_data}: {str(e)}")
        spark.sql(f"UPDATE {control_table} SET status = 'failed' WHERE file_name = '{file_data}'")

        # Retry logic
        retries = spark.sql(f"SELECT retries FROM {control_table} WHERE file_name = '{file_data}'").collect()[0][0]
        if retry_failed_files(file_data, retries):
            logger.info(f"Retrying file {file_data} (Retry count: {retries+1})")
            spark.sql(f"UPDATE {control_table} SET retries = retries + 1 WHERE file_name = '{file_data}'")
            process_file(file_data)  # Retry
        else:
            logger.error(f"File {file_data} failed after {max_retries} retries")
            move_to_corrupt_folder(file_data)

# Function to move corrupt files to a folder for manual review
def move_to_corrupt_folder(file_data):
    corrupt_folder = "/path/to/corrupt_folder"
    os.rename(file_data, os.path.join(corrupt_folder, os.path.basename(file_data)))
    logger.info(f"Moved file {file_data} to corrupt folder")

# Function to send notifications in case of significant issues
def send_notification(message):
    # This can be integrated with email alerts or messaging systems (e.g., Slack)
    logger.info(f"Notification sent: {message}")

# Main Job Logic
if __name__ == "__main__":
    logger.info("Starting BDL Processing Job")
    
    # Fetch unprocessed or failed files from control table
    unprocessed_files = fetch_unprocessed_files()
    
    for file in unprocessed_files.collect():
        logger.info(f"Processing file: {file}")
        process_file(file)
    
    logger.info("BDL Processing Job completed")

# Read data from BDL table
input_data = spark.sql(f"SELECT * FROM {input_table}")

# Example processing in SQL
input_data.createOrReplaceTempView("input_data_view")
processed_data = spark.sql("""
    SELECT *, 
           TRUE AS processed 
    FROM input_data_view
""")

# Write processed data to output BDL table
processed_data.write.mode("overwrite").insertInto(output_table)

# Update the control table to mark the file as processed
spark.sql(f"UPDATE {control_table} SET status = 'processed' WHERE file_name = 'example_file'")

PySpark script incorporating optimizations for joining large tables, performing groupBy, transpose and writing output

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window

# Create SparkSession with optimizations
spark = SparkSession.builder.appName("Optimized Join and Transform") 
    .config("spark.sql.shuffle.partitions", 200) 
    .config("spark.driver.memory", "8g") 
    .config("spark.executor.memory", "16g") 
    .config("spark.executor.cores", 4) 
    .getOrCreate()

# Load 30GB table (e.g., sales data)
sales_df = spark.read.parquet("sales_data.parquet")

# Load 200MB look-up tables (e.g., customer, product)
customer_df = spark.read.parquet("customer_data.parquet").cache()
product_df = spark.read.parquet("product_data.parquet").cache()

# Broadcast smaller DataFrames for efficient joins
customer_bcast = customer_df.broadcast()
product_bcast = product_df.broadcast()

# Join sales data with look-up tables
joined_df = sales_df 
    .join(customer_bcast, "customer_id") 
    .join(product_bcast, "product_id")

# Perform groupBy and aggregation
grouped_df = joined_df 
    .groupBy("date", "region") 
    .agg(F.sum("sales").alias("total_sales"))

# Transpose data using pivot
transposed_df = grouped_df 
    .groupBy("date") 
    .pivot("region") 
    .sum("total_sales")

# Write output to Parquet file
transposed_df.write.parquet("output_data.parquet", compression="snappy")

# Estimate DataFrame sizes
def estimate_size(df):
    return df.estimatedRowCount * df.select(F.size(F.col("value")).alias("size")).first().size

print("Sales DataFrame size:", estimate_size(sales_df))
print("Customer DataFrame size:", estimate_size(customer_df))
print("Product DataFrame size:", estimate_size(product_df))
print("Joined DataFrame size:", estimate_size(joined_df))
print("Transposed DataFrame size:", estimate_size(transposed_df))

# Monitor Spark UI for execution plans and performance metrics
spark.sparkContext.uiWebUrl

Optimizations Used

  1. Broadcasting: Smaller DataFrames (customer_df, product_df) are broadcasted for efficient joins.
  2. Caching: Frequently used DataFrames (customer_df, product_df) are cached.
  3. Partitioning: spark.sql.shuffle.partitions is set to 200 for efficient shuffling.
  4. Memory Configuration: Driver and executor memory are configured for optimal performance.
  5. Executor Cores: spark.executor.cores is set to 4 for parallel processing.
  6. Parquet Compression: Output is written with Snappy compression.

Estimating DataFrame Sizes

The estimate_size function uses estimatedRowCount and size calculation to estimate DataFrame sizes.

Monitoring Performance

  1. Spark UI: spark.sparkContext.uiWebUrl provides execution plans and performance metrics.
  2. Spark History Server: Analyze completed application metrics.

Discover more from AI HitsToday

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

About the HintsToday

AI HintsToday is One Stop Adda to learn All about AI, Data, ML, Stat Learning, SAS, SQL, Python, Pyspark. AHT is Future!

Explore the Posts

Latest Comments

Latest posts

Discover more from AI HitsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading