How to code in Pyspark a Complete ETL job using only Pyspark sql api not dataframe specific API?
Here’s an example of a complete ETL (Extract, Transform, Load) job using PySpark SQL API:
from pyspark.sql import SparkSession
# Create SparkSession
spark = SparkSession.builder.appName("ETL Job").getOrCreate()
# Extract (Read from source)
spark.sql("CREATE TEMPORARY VIEW sales USING org.apache.spark.sql.parquet OPTIONS (path 'sales_data.parquet')")
spark.sql("CREATE TEMPORARY VIEW customers USING org.apache.spark.sql.parquet OPTIONS (path 'customer_data.parquet')")
# Transform (Join, Filter, Aggregate)
spark.sql("""
SELECT
s.order_id,
c.customer_name,
SUM(s.order_total) AS total_sales
FROM
sales s
JOIN
customers c ON s.customer_id = c.customer_id
WHERE
s.order_date >= '2022-01-01'
GROUP BY
s.order_id,
c.customer_name
""").createOrReplaceTempView("transformed_data")
# Load (Write to target)
spark.sql("CREATE TABLE IF NOT EXISTS transformed_sales USING parquet OPTIONS (path 'transformed_sales_data.parquet')")
spark.sql("INSERT INTO TABLE transformed_sales SELECT * FROM transformed_data")
# Verify loaded data
spark.sql("SELECT * FROM transformed_sales").show()
Explanation
- Extract: Read data from source (Parquet files) into temporary views.
- Transform: Perform transformations (join, filter, aggregate) using SQL queries.
- Load: Write transformed data to target (Parquet file).
Tips and Variations
- Use
CREATE TABLE
instead ofCREATE TEMPORARY VIEW
for persistent storage. - Specify storage formats (e.g.,
USING org.apache.spark.sql.json
). - Utilize SQL functions (e.g.,
DATE_TRUNC
,CONCAT
). - Leverage Spark SQL’s built-in functions (e.g.,
IN
,LIKE
). - Optimize performance with partitioning and caching.
PySpark ETL script that incorporates : control table management, job status tracking, data pre-checks, retries, dynamic broadcasting, caching, and logging.
Here’s a complete PySpark ETL script that incorporates all the features we discussed: control table management, job status tracking, data pre-checks, retries, dynamic broadcasting, caching, and logging. This is a robust framework for large-scale ETL jobs running in a production environment.
Complete PySpark ETL Code:
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
import logging
# Initialize logger
logger = logging.getLogger('ETL_Logger')
logger.setLevel(logging.INFO)
# Spark session setup
spark = SparkSession.builder
.appName("ETL_Job")
.enableHiveSupport()
.getOrCreate()
# Function to update job status in the job status table
def update_job_status(job_id, status, error_message=None):
end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Update job status in Hive table
spark.sql(f"""
UPDATE job_status_table
SET status = '{status}',
end_time = '{end_time}',
error_message = '{error_message}'
WHERE job_id = {job_id}
""")
logger.info(f"Job {job_id} status updated to {status}")
# Function to check data availability for all source tables
def check_data_availability(sources, month):
for source in sources:
logger.info(f"Checking data availability for source: {source}")
# Count rows in source table for the given month
count_df = spark.sql(f"SELECT COUNT(1) FROM {source} WHERE month = '{month}'")
count = count_df.collect()[0][0]
# If no data exists for the current month, fail the job
if count == 0:
logger.error(f"No data available for {source} for month {month}. Failing the job.")
raise Exception(f"Data not available for {source}")
# Compare with previous month count
prev_month = (datetime.now().replace(day=1) - timedelta(days=1)).strftime('%Y-%m')
prev_count_df = spark.sql(f"SELECT COUNT(1) FROM {source} WHERE month = '{prev_month}'")
prev_count = prev_count_df.collect()[0][0]
logger.info(f"Source {source}: Current month count = {count}, Previous month count = {prev_count}")
# Insert source count into control table
spark.sql(f"""
INSERT INTO control_table
(step_id, step_name, priority, source_table, source_count, status, run_date)
VALUES (1, 'read {source}', 1, '{source}', {count}, 'pending', current_date())
""")
# Main ETL job function
def etl_job(job_id, sources, month):
try:
logger.info(f"Starting ETL job with Job ID: {job_id}")
# Step 1: Check Data Availability
check_data_availability(sources, month)
# Step 2: Read data from sources (assuming they are in Hive tables)
dataframes = {}
for source in sources:
logger.info(f"Reading data from {source}")
df = spark.sql(f"SELECT * FROM {source} WHERE month = '{month}'")
df.createOrReplaceTempView(f"{source}_view")
dataframes[source] = df
# Dynamic broadcasting of smaller tables
for table in dataframes:
df_size = dataframes[table].rdd.map(lambda x: len(str(x))).sum()
if df_size < 100 * 1024 * 1024: # Size threshold for broadcasting (100MB)
logger.info(f"Broadcasting table: {table}")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", df_size)
# Step 3: Perform joins and transformations
logger.info("Performing data transformations and joins")
# First join
join_df1 = spark.sql("""
SELECT a.*, b.other_column
FROM source1_view a
JOIN source2_view b ON a.id = b.id
""")
join_df1.cache() # Caching intermediate result
# Second join with another table
join_df2 = spark.sql("""
SELECT a.*, c.additional_info
FROM join_df1 a
JOIN source3_view c ON a.id = c.id
""")
# Step 4: Write to target table
logger.info("Writing results to the target table")
join_df2.write.mode('overwrite').saveAsTable('bdl_final_table')
# Step 5: Clean up cache
join_df1.unpersist()
join_df2.unpersist()
# Step 6: Update job status to completed
update_job_status(job_id, 'completed')
except Exception as e:
logger.error(f"Job failed due to error: {str(e)}")
update_job_status(job_id, 'failed', str(e))
# Retry mechanism for running ETL jobs
def run_etl_with_retries(job_id, sources, month, max_retries=3):
retry_count = 0
while retry_count <= max_retries:
try:
etl_job(job_id, sources, month)
logger.info(f"Job {job_id} completed successfully.")
break
except Exception as e:
retry_count += 1
logger.error(f"Job {job_id} failed on attempt {retry_count}: {e}")
if retry_count > max_retries:
logger.error(f"Max retries reached for job {job_id}. Job failed.")
update_job_status(job_id, 'failed', f"Max retries reached: {e}")
else:
logger.info(f"Retrying job {job_id} (attempt {retry_count + 1}/{max_retries})")
# List of sources to read from
sources = ["bdl_source_table1", "bdl_source_table2", "bdl_source_table3"]
# Running the ETL job for the current month with retries
run_etl_with_retries(1001, sources, "2024-10", max_retries=3)
Explanation and Enhancements
- Data Availability Pre-checks:
- The function
check_data_availability
checks for data availability for all source tables for the current month and logs counts into the control table. - The job fails early if required data is missing.
- The function
- Dynamic Broadcasting:
- We dynamically broadcast small tables (less than 100MB) to optimize joins with larger tables.
- Caching and Memory Management:
- The results of the first join (
join_df1
) are cached, and unpersisted after usage to free up memory. - Caching is applied where intermediate data needs to be reused across transformations.
- The results of the first join (
- Job Status and Control Table:
- The
control_table
logs information about the source table counts and progress of the ETL job. - The
job_status_table
logs the job’s overall status, including success, failure, and retry attempts.
- The
- Retry Mechanism:
- The
run_etl_with_retries
function wraps the main ETL job execution with a retry mechanism. - It retries the job up to
max_retries
times in case of failures, logging each attempt.
- The
- Logging and Error Handling:
- The logger captures key information such as job start/end, data availability, errors, and retry attempts.
- In case of failure, the job status and error message are updated in the
job_status_table
.
Additional Enhancements
- Email/Notification Alerts:
- You can integrate email alerts (using a service like AWS SES or Spark’s alerting tool) to notify the team in case of job failures or retries.
- Corrupt Data Handling:
- If any data is found to be corrupt during reading or processing, you can implement a mechanism to move corrupt data into a “quarantine” folder for manual inspection.
- Audit Logs:
- Audit logs can be maintained for every ETL execution, logging details such as data transformations, count mismatches, and data quality checks for compliance reporting.
The solution includes:
- Schema creation for control and job status tables.
- Pre-checks for data availability.
- Data read, join, and transformation steps.
- Caching, broadcasting, and optimizations.
- Tracking ETL job progress and status updates.
- Handling retries and error logging.
- Enhancements such as auditing, retries, dynamic broadcasting, and memory management.
1. Table Schemas (Control and Job Status Tables)
Control Table Schema
CREATE TABLE IF NOT EXISTS control_table (
step_id INT,
step_name STRING,
priority INT,
source_table STRING,
source_count BIGINT,
target_table STRING,
target_count BIGINT,
status STRING,
run_date DATE,
error_message STRING
);
Job Status Table Schema
CREATE TABLE IF NOT EXISTS job_status_table (
job_id INT,
job_name STRING,
run_date DATE,
start_time TIMESTAMP,
end_time TIMESTAMP,
retry_count INT,
status STRING,
error_message STRING
);
2. Pre-Check Logic for Data Availability
We will first verify if all the required source data is available for the specified month. If any source table lacks data, the ETL job should fail before any processing starts.
def check_data_availability(sources, month):
for source in sources:
logger.info(f"Checking data availability for source: {source}")
# Count rows in source table for the given month
count_df = spark.sql(f"SELECT COUNT(1) FROM {source} WHERE month = '{month}'")
count = count_df.collect()[0][0]
# If no data exists for the current month, fail the job
if count == 0:
logger.error(f"No data available for {source} for month {month}. Failing the job.")
raise Exception(f"Data not available for {source}")
# Compare with previous month count
prev_month = (datetime.now().replace(day=1) - timedelta(days=1)).strftime('%Y-%m')
prev_count_df = spark.sql(f"SELECT COUNT(1) FROM {source} WHERE month = '{prev_month}'")
prev_count = prev_count_df.collect()[0][0]
logger.info(f"Source {source}: Current month count = {count}, Previous month count = {prev_count}")
# Insert source count into control table
spark.sql(f"""
INSERT INTO control_table
(step_id, step_name, priority, source_table, source_count, status, run_date)
VALUES (1, 'read {source}', 1, '{source}', {count}, 'pending', current_date())
""")
3. Job Status Tracking
We’ll maintain the job status (whether it is running, completed, or failed) in the job_status_table
. This allows us to track the entire job execution, including retries and errors.
def update_job_status(job_id, status, error_message=None):
end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Update job status in Hive table
spark.sql(f"""
UPDATE job_status_table
SET status = '{status}',
end_time = '{end_time}',
error_message = '{error_message}'
WHERE job_id = {job_id}
""")
logger.info(f"Job {job_id} status updated to {status}")
4. ETL Job with Data Reads, Joins, and Optimizations
This is the main body of the ETL job. We read the source tables, perform joins, apply transformations, and write the result to a target table.
Enhancements:
- Dynamic Broadcasting: If a table size is small, it should be broadcasted in joins.
- Caching: We cache intermediate results that will be reused multiple times.
- Error Handling: Logs and updates the status of the job and retries in case of failure.
- Memory Management: Cleans up after each step (cache, uncache).
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
import logging
# Initialize logger
logger = logging.getLogger('ETL_Logger')
# Spark session setup
spark = SparkSession.builder
.appName("ETL_Job")
.enableHiveSupport()
.getOrCreate()
# Main ETL job function
def etl_job(job_id, sources, month):
try:
logger.info(f"Starting ETL job with Job ID: {job_id}")
# Step 1: Check Data Availability
check_data_availability(sources, month)
# Step 2: Read data from sources (assuming they are in Hive tables)
dataframes = {}
for source in sources:
logger.info(f"Reading data from {source}")
df = spark.sql(f"SELECT * FROM {source} WHERE month = '{month}'")
df.createOrReplaceTempView(f"{source}_view")
dataframes[source] = df
# Dynamic broadcasting of smaller tables
for table in dataframes:
df_size = dataframes[table].rdd.map(lambda x: len(str(x))).sum()
if df_size < 100 * 1024 * 1024: # Size threshold for broadcasting (100MB)
logger.info(f"Broadcasting table: {table}")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", df_size)
# Step 3: Perform joins and transformations
logger.info("Performing data transformations and joins")
# First join
join_df1 = spark.sql("""
SELECT a.*, b.other_column
FROM source1_view a
JOIN source2_view b ON a.id = b.id
""")
join_df1.cache() # Caching intermediate result
# Second join with another table
join_df2 = spark.sql("""
SELECT a.*, c.additional_info
FROM join_df1 a
JOIN source3_view c ON a.id = c.id
""")
# Step 4: Write to target table
logger.info("Writing results to the target table")
join_df2.write.mode('overwrite').saveAsTable('bdl_final_table')
# Step 5: Clean up cache
join_df1.unpersist()
join_df2.unpersist()
# Step 6: Update job status to completed
update_job_status(job_id, 'completed')
except Exception as e:
logger.error(f"Job failed due to error: {str(e)}")
update_job_status(job_id, 'failed', str(e))
# List of sources to read from
sources = ["bdl_source_table1", "bdl_source_table2", "bdl_source_table3"]
# Running the ETL job for the current month
etl_job(1001, sources, "2024-10")
5. Retry Mechanism
In case of failure, the job should be retried a set number of times. Here’s how to add a retry mechanism:
pythonCopy codedef run_etl_with_retries(job_id, sources, month, max_retries=3):
retry_count = 0
while retry_count <= max_retries:
try:
etl_job(job_id, sources, month)
logger.info(f"Job {job_id} completed successfully.")
break
except Exception as e:
retry_count += 1
logger.error(f"Job {job_id} failed on attempt {retry_count}: {e}")
if retry_count > max_retries:
logger.error(f"Max retries reached for job {job_id}. Job failed.")
update_job_status(job_id, 'failed', f"Max retries reached: {e}")
else:
logger.info(f"Retrying job {job_id} (attempt {retry_count + 1}/{max_retries})")
6. Enhancements Added
- Dynamic Broadcasting: If a table size is less than 100 MB, it gets broadcasted to optimize joins with larger tables.
- Caching and Uncaching: Intermediate results are cached to optimize repeated access, and uncached after use to free up memory.
- Pre-check Data Availability: The job will fail at the start if data is not available for the current month.
- Job Status and Control Table: Each step and job’s status is logged and recorded, making it easier to monitor, audit, and retry failed jobs.
- Retry Mechanism: Added a retry mechanism in case of job failure, with configurable retries.
Final Thoughts
- Performance: Dynamic broadcasting and caching have been used to optimize joins and intermediate data processing.
- Error Handling: The job logs every failure, updates the job status in the database, and supports automatic retries.
- Monitoring: Both job status and step-level progress are tracked using the control and job status tables, giving visibility into the workflow’s state at any given point.
- Memory Management: Proper use of
cache()
andunpersist()
ensures that memory is efficiently managed.
This ETL framework is production-ready and is designed to handle large datasets, ensure robustness, and track job progress while providing opportunities for further customizations.
To successfully implement the ETL workflow described earlier, you need to have two essential tracking mechanisms in place: a control table and a job status table. These tables help to monitor the progress of the ETL job, track the job’s execution, log errors, and ensure data integrity.
Here, I’ll define the structure (schema) for both tables, provide sample data, and explain how to implement them in the PySpark SQL-based ETL job.
To successfully implement the ETL workflow described earlier, you need to have two essential tracking mechanisms in place: a control table and a job status table. These tables help to monitor the progress of the ETL job, track the job’s execution, log errors, and ensure data integrity.
Here, I’ll define the structure (schema) for both tables, provide sample data, and explain how to implement them in the PySpark SQL-based ETL job.
1. Control Table Schema:
This table tracks the details of each step of the ETL job, including its execution status, source data information, and any conditions for retry or failure.
Control Table Schema
Column Name | Data Type | Description |
---|---|---|
step_id | INT | Unique ID for each step in the ETL job. |
step_name | STRING | Name of the ETL step (e.g., ‘read source1’, ‘join tables’). |
priority | INT | Priority of the step (lower numbers mean higher priority). |
source_table | STRING | Name of the source table being read or transformed. |
source_count | BIGINT | Row count of the source data in the step. |
target_table | STRING | Name of the target table being written. |
target_count | BIGINT | Row count of the data being written. |
status | STRING | Status of the step (‘pending’, ‘in-progress’, ‘completed’, ‘failed’). |
run_date | DATE | Date the step was last run. |
error_message | STRING | Error message if the step fails. |
Sample Data for Control Table
step_id | step_name | priority | source_table | source_count | target_table | target_count | status | run_date | error_message |
---|---|---|---|---|---|---|---|---|---|
1 | read source1 | 1 | bdl_source_table1 | 1500000 | NULL | NULL | completed | 2024-10-01 | NULL |
2 | join source1_2 | 2 | bdl_source_table2 | 1700000 | NULL | NULL | completed | 2024-10-01 | NULL |
3 | write to target | 3 | NULL | NULL | bdl_target_table | 3000000 | completed | 2024-10-01 | NULL |
4 | aggregate results | 4 | bdl_target_table | 3000000 | bdl_final_table | 1000000 | failed | 2024-10-01 | Memory error |
2. Job Status Table Schema:
This table logs the overall status of the ETL job. It tracks each execution attempt, including the number of retries, job duration, and whether it succeeded or failed.
Job Status Table Schema
Column Name | Data Type | Description |
---|---|---|
job_id | INT | Unique identifier for the entire ETL job run. |
job_name | STRING | Name of the ETL job. |
run_date | DATE | Date when the job started. |
start_time | TIMESTAMP | Timestamp when the job execution started. |
end_time | TIMESTAMP | Timestamp when the job execution ended. |
retry_count | INT | Number of times the job was retried. |
status | STRING | Overall status of the job (‘pending’, ‘running’, ‘completed’, ‘failed’). |
error_message | STRING | Error message in case of job failure. |
Sample Data for Job Status Table
job_id | job_name | run_date | start_time | end_time | retry_count | status | error_message |
---|---|---|---|---|---|---|---|
1001 | BDL_ETL_Job | 2024-10-01 | 2024-10-01 08:00:00 | 2024-10-01 09:30:00 | 0 | completed | NULL |
1002 | BDL_ETL_Job | 2024-10-02 | 2024-10-02 08:00:00 | 2024-10-02 10:00:00 | 2 | failed | Memory error |
3. Building and Using These Tables in the ETL Process
Here’s how you can build these tables in your Hive Metastore or your preferred data store and integrate them into the PySpark SQL-based ETL job.
Step 1: Creating Control Table and Job Status Table
You can create these tables in Hive using SQL commands. This will allow you to track the ETL progress.
-- Creating Control Table
CREATE TABLE IF NOT EXISTS control_table (
step_id INT,
step_name STRING,
priority INT,
source_table STRING,
source_count BIGINT,
target_table STRING,
target_count BIGINT,
status STRING,
run_date DATE,
error_message STRING
);
-- Creating Job Status Table
CREATE TABLE IF NOT EXISTS job_status_table (
job_id INT,
job_name STRING,
run_date DATE,
start_time TIMESTAMP,
end_time TIMESTAMP,
retry_count INT,
status STRING,
error_message STRING
);
Step 2: Pre-Check for Data Availability (Checking Source Counts)
Before starting the ETL job, you’ll want to check that all source tables contain data for the specified month. If any source is missing data, the job should fail early.
def check_data_availability(sources, month):
for source in sources:
logger.info(f"Checking data availability for source: {source}")
# Count rows in source table for the given month
count_df = spark.sql(f"SELECT COUNT(1) FROM {source} WHERE month = '{month}'")
count = count_df.collect()[0][0]
# If no data exists for the current month, fail the job
if count == 0:
logger.error(f"No data available for {source} for month {month}. Failing the job.")
raise Exception(f"Data not available for {source}")
# Compare with previous month count
prev_month = (datetime.now().replace(day=1) - timedelta(days=1)).strftime('%Y-%m')
prev_count_df = spark.sql(f"SELECT COUNT(1) FROM {source} WHERE month = '{prev_month}'")
prev_count = prev_count_df.collect()[0][0]
logger.info(f"Source {source}: Current month count = {count}, Previous month count = {prev_count}")
# Add source count to control table
spark.sql(f"""
INSERT INTO control_table
(step_id, step_name, priority, source_table, source_count, status, run_date)
VALUES (1, 'read {source}', 1, '{source}', {count}, 'pending', current_date())
""")
Step 3: Tracking Job Status
You can track the status of each ETL job run using the job_status_table
. Each time the job runs, it updates the job’s start time, end time, retry attempts, and whether the job succeeded or failed.
def update_job_status(job_id, status, error_message=None):
end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Update job status in Hive table
spark.sql(f"""
UPDATE job_status_table
SET status = '{status}',
end_time = '{end_time}',
error_message = '{error_message}'
WHERE job_id = {job_id}
""")
logger.info(f"Job {job_id} status updated to {status}")
Step 4: Performing the ETL Job
Below is an overview of the steps performed in the ETL job, including reading from source tables, checking data availability, performing joins, and updating the status.
def etl_job(job_id, sources, month):
try:
logger.info(f"Starting ETL job with Job ID: {job_id}")
# Step 1: Check Data Availability
check_data_availability(sources, month)
# Step 2: Read data from sources (assuming they are in Hive tables)
for source in sources:
logger.info(f"Reading data from {source}")
df = spark.sql(f"SELECT * FROM {source} WHERE month = '{month}'")
df.createOrReplaceTempView(f"{source}_view")
# Step 3: Perform joins and transformations
logger.info("Performing data transformations and joins")
transformed_df = spark.sql("""
SELECT a.*, b.other_column
FROM source1_view a
JOIN source2_view b ON a.id = b.id
""")
# Cache the result to avoid recomputation
transformed_df.cache()
# Perform additional transformations
final_df = spark.sql("""
SELECT a.*, c.additional_info
FROM transformed_df a
JOIN source3_view c ON a.id = c.id
""")
# Write to target
final_df.write.mode('overwrite').saveAsTable('bdl_final_table')
# Step 4: Update job status
update_job_status(job_id, 'completed')
except Exception as e:
logger.error(f"Job failed due to error: {str(e)}")
update_job_status(job_id, 'failed', str(e))
Final Thoughts:
- Control and Job Status Tables: Provide a robust mechanism to track and manage the execution of ETL jobs.
- Pre-check Logic: Ensures data availability before starting the job to prevent unnecessary processing.
- Job Status Updates: Help to track the job’s execution and easily identify issues.
- Optimization: The job includes caching to optimize the execution time of chained transformations.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import logging
import os
# Initialize Spark session
spark = SparkSession.builder
.appName("BDL Processing Job")
.enableHiveSupport()
.getOrCreate()
# Initialize logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("BDL Processing Job")
# Configurations
input_table = "bdl.input_table"
output_table = "bdl.output_table"
control_table = "bdl.control_table" # To track the state of file processing
max_retries = 3
# State management: Fetch unprocessed or failed files from the control table
def fetch_unprocessed_files():
return spark.sql(f"SELECT * FROM {control_table} WHERE status = 'unprocessed' OR status = 'failed'")
# Retry mechanism: Retry failed files up to max_retries
def retry_failed_files(file_name, retries):
return retries < max_retries
# Main processing function
def process_file(file_data):
try:
# Processing logic here (for example, transformations)
processed_data = file_data.withColumn("processed", F.lit(True))
# Write the processed data back to BDL table
processed_data.write.mode("overwrite").insertInto(output_table)
# Log success and update the control table
logger.info(f"Processing completed for file: {file_data}")
spark.sql(f"UPDATE {control_table} SET status = 'processed' WHERE file_name = '{file_data}'")
except Exception as e:
logger.error(f"Error processing file {file_data}: {str(e)}")
spark.sql(f"UPDATE {control_table} SET status = 'failed' WHERE file_name = '{file_data}'")
# Retry logic
retries = spark.sql(f"SELECT retries FROM {control_table} WHERE file_name = '{file_data}'").collect()[0][0]
if retry_failed_files(file_data, retries):
logger.info(f"Retrying file {file_data} (Retry count: {retries+1})")
spark.sql(f"UPDATE {control_table} SET retries = retries + 1 WHERE file_name = '{file_data}'")
process_file(file_data) # Retry
else:
logger.error(f"File {file_data} failed after {max_retries} retries")
move_to_corrupt_folder(file_data)
# Function to move corrupt files to a folder for manual review
def move_to_corrupt_folder(file_data):
corrupt_folder = "/path/to/corrupt_folder"
os.rename(file_data, os.path.join(corrupt_folder, os.path.basename(file_data)))
logger.info(f"Moved file {file_data} to corrupt folder")
# Function to send notifications in case of significant issues
def send_notification(message):
# This can be integrated with email alerts or messaging systems (e.g., Slack)
logger.info(f"Notification sent: {message}")
# Main Job Logic
if __name__ == "__main__":
logger.info("Starting BDL Processing Job")
# Fetch unprocessed or failed files from control table
unprocessed_files = fetch_unprocessed_files()
for file in unprocessed_files.collect():
logger.info(f"Processing file: {file}")
process_file(file)
logger.info("BDL Processing Job completed")
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import logging
import os
# Initialize Spark session
spark = SparkSession.builder
.appName("BDL Processing Job")
.enableHiveSupport()
.getOrCreate()
# Initialize logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("BDL Processing Job")
# Configurations
input_table = "bdl.input_table"
output_table = "bdl.output_table"
control_table = "bdl.control_table" # To track the state of file processing
max_retries = 3
# State management: Fetch unprocessed or failed files from the control table
def fetch_unprocessed_files():
return spark.sql(f"SELECT * FROM {control_table} WHERE status = 'unprocessed' OR status = 'failed'")
# Retry mechanism: Retry failed files up to max_retries
def retry_failed_files(file_name, retries):
return retries < max_retries
# Main processing function
def process_file(file_data):
try:
# Processing logic here (for example, transformations)
processed_data = file_data.withColumn("processed", F.lit(True))
# Write the processed data back to BDL table
processed_data.write.mode("overwrite").insertInto(output_table)
# Log success and update the control table
logger.info(f"Processing completed for file: {file_data}")
spark.sql(f"UPDATE {control_table} SET status = 'processed' WHERE file_name = '{file_data}'")
except Exception as e:
logger.error(f"Error processing file {file_data}: {str(e)}")
spark.sql(f"UPDATE {control_table} SET status = 'failed' WHERE file_name = '{file_data}'")
# Retry logic
retries = spark.sql(f"SELECT retries FROM {control_table} WHERE file_name = '{file_data}'").collect()[0][0]
if retry_failed_files(file_data, retries):
logger.info(f"Retrying file {file_data} (Retry count: {retries+1})")
spark.sql(f"UPDATE {control_table} SET retries = retries + 1 WHERE file_name = '{file_data}'")
process_file(file_data) # Retry
else:
logger.error(f"File {file_data} failed after {max_retries} retries")
move_to_corrupt_folder(file_data)
# Function to move corrupt files to a folder for manual review
def move_to_corrupt_folder(file_data):
corrupt_folder = "/path/to/corrupt_folder"
os.rename(file_data, os.path.join(corrupt_folder, os.path.basename(file_data)))
logger.info(f"Moved file {file_data} to corrupt folder")
# Function to send notifications in case of significant issues
def send_notification(message):
# This can be integrated with email alerts or messaging systems (e.g., Slack)
logger.info(f"Notification sent: {message}")
# Main Job Logic
if __name__ == "__main__":
logger.info("Starting BDL Processing Job")
# Fetch unprocessed or failed files from control table
unprocessed_files = fetch_unprocessed_files()
for file in unprocessed_files.collect():
logger.info(f"Processing file: {file}")
process_file(file)
logger.info("BDL Processing Job completed")
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import logging
import os
# Initialize Spark session
spark = SparkSession.builder
.appName("BDL Processing Job")
.enableHiveSupport()
.getOrCreate()
# Initialize logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("BDL Processing Job")
# Configurations
input_table = "bdl.input_table"
output_table = "bdl.output_table"
control_table = "bdl.control_table" # To track the state of file processing
max_retries = 3
# State management: Fetch unprocessed or failed files from the control table
def fetch_unprocessed_files():
return spark.sql(f"SELECT * FROM {control_table} WHERE status = 'unprocessed' OR status = 'failed'")
# Retry mechanism: Retry failed files up to max_retries
def retry_failed_files(file_name, retries):
return retries < max_retries
# Main processing function
def process_file(file_data):
try:
# Processing logic here (for example, transformations)
processed_data = file_data.withColumn("processed", F.lit(True))
# Write the processed data back to BDL table
processed_data.write.mode("overwrite").insertInto(output_table)
# Log success and update the control table
logger.info(f"Processing completed for file: {file_data}")
spark.sql(f"UPDATE {control_table} SET status = 'processed' WHERE file_name = '{file_data}'")
except Exception as e:
logger.error(f"Error processing file {file_data}: {str(e)}")
spark.sql(f"UPDATE {control_table} SET status = 'failed' WHERE file_name = '{file_data}'")
# Retry logic
retries = spark.sql(f"SELECT retries FROM {control_table} WHERE file_name = '{file_data}'").collect()[0][0]
if retry_failed_files(file_data, retries):
logger.info(f"Retrying file {file_data} (Retry count: {retries+1})")
spark.sql(f"UPDATE {control_table} SET retries = retries + 1 WHERE file_name = '{file_data}'")
process_file(file_data) # Retry
else:
logger.error(f"File {file_data} failed after {max_retries} retries")
move_to_corrupt_folder(file_data)
# Function to move corrupt files to a folder for manual review
def move_to_corrupt_folder(file_data):
corrupt_folder = "/path/to/corrupt_folder"
os.rename(file_data, os.path.join(corrupt_folder, os.path.basename(file_data)))
logger.info(f"Moved file {file_data} to corrupt folder")
# Function to send notifications in case of significant issues
def send_notification(message):
# This can be integrated with email alerts or messaging systems (e.g., Slack)
logger.info(f"Notification sent: {message}")
# Main Job Logic
if __name__ == "__main__":
logger.info("Starting BDL Processing Job")
# Fetch unprocessed or failed files from control table
unprocessed_files = fetch_unprocessed_files()
for file in unprocessed_files.collect():
logger.info(f"Processing file: {file}")
process_file(file)
logger.info("BDL Processing Job completed")
# Read data from BDL table
input_data = spark.sql(f"SELECT * FROM {input_table}")
# Example processing in SQL
input_data.createOrReplaceTempView("input_data_view")
processed_data = spark.sql("""
SELECT *,
TRUE AS processed
FROM input_data_view
""")
# Write processed data to output BDL table
processed_data.write.mode("overwrite").insertInto(output_table)
# Update the control table to mark the file as processed
spark.sql(f"UPDATE {control_table} SET status = 'processed' WHERE file_name = 'example_file'")
PySpark script incorporating optimizations for joining large tables, performing groupBy, transpose and writing output
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
# Create SparkSession with optimizations
spark = SparkSession.builder.appName("Optimized Join and Transform")
.config("spark.sql.shuffle.partitions", 200)
.config("spark.driver.memory", "8g")
.config("spark.executor.memory", "16g")
.config("spark.executor.cores", 4)
.getOrCreate()
# Load 30GB table (e.g., sales data)
sales_df = spark.read.parquet("sales_data.parquet")
# Load 200MB look-up tables (e.g., customer, product)
customer_df = spark.read.parquet("customer_data.parquet").cache()
product_df = spark.read.parquet("product_data.parquet").cache()
# Broadcast smaller DataFrames for efficient joins
customer_bcast = customer_df.broadcast()
product_bcast = product_df.broadcast()
# Join sales data with look-up tables
joined_df = sales_df
.join(customer_bcast, "customer_id")
.join(product_bcast, "product_id")
# Perform groupBy and aggregation
grouped_df = joined_df
.groupBy("date", "region")
.agg(F.sum("sales").alias("total_sales"))
# Transpose data using pivot
transposed_df = grouped_df
.groupBy("date")
.pivot("region")
.sum("total_sales")
# Write output to Parquet file
transposed_df.write.parquet("output_data.parquet", compression="snappy")
# Estimate DataFrame sizes
def estimate_size(df):
return df.estimatedRowCount * df.select(F.size(F.col("value")).alias("size")).first().size
print("Sales DataFrame size:", estimate_size(sales_df))
print("Customer DataFrame size:", estimate_size(customer_df))
print("Product DataFrame size:", estimate_size(product_df))
print("Joined DataFrame size:", estimate_size(joined_df))
print("Transposed DataFrame size:", estimate_size(transposed_df))
# Monitor Spark UI for execution plans and performance metrics
spark.sparkContext.uiWebUrl
Optimizations Used
- Broadcasting: Smaller DataFrames (
customer_df
,product_df
) are broadcasted for efficient joins. - Caching: Frequently used DataFrames (
customer_df
,product_df
) are cached. - Partitioning:
spark.sql.shuffle.partitions
is set to 200 for efficient shuffling. - Memory Configuration: Driver and executor memory are configured for optimal performance.
- Executor Cores:
spark.executor.cores
is set to 4 for parallel processing. - Parquet Compression: Output is written with Snappy compression.
Estimating DataFrame Sizes
The estimate_size
function uses estimatedRowCount
and size
calculation to estimate DataFrame sizes.
Monitoring Performance
- Spark UI:
spark.sparkContext.uiWebUrl
provides execution plans and performance metrics. - Spark History Server: Analyze completed application metrics.
Leave a Reply