Here is a detailed approach for dividing a monthly PySpark script into multiple code steps. Each step will be saved in the code column of a control DataFrame and executed sequentially. The script will include error handling and pre-checks to ensure source tables are present or Oracle tables have data before starting each step. The control DataFrame and status DataFrame for each month will be appended to a Hive or Oracle table to maintain historical data.

Step 1: Prepare the Control DataFrame with Code Steps

Sample Monthly PySpark Script

Below is a sample PySpark script that reads data from Hive tables, performs transformations, and writes the output to Hive tables.

Sample PySpark Script

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from datetime import datetime

spark = SparkSession.builder \
    .appName("Monthly PySpark Script") \
    .enableHiveSupport() \
    .getOrCreate()

# Define the current month in MMyyyy format
current_month = datetime.now().strftime("%m%Y")

# Sample code steps to be executed sequentially
code_steps = [
    # Step 1: Check if source Hive table exists and has data
    f"""
    source_table = "source_db.source_table"
    if not spark.catalog.tableExists(source_table):
        raise ValueError("Source table does not exist.")
    df = spark.table(source_table)
    if df.count() == 0:
        raise ValueError("Source table is empty.")
    """,

    # Step 2: Read data from an Oracle table (Assuming JDBC connection properties are set)
    f"""
    oracle_url = "jdbc:oracle:thin:@hostname:port:service_name"
    oracle_properties = {{
        "user": "username",
        "password": "password"
    }}
    oracle_df = spark.read.jdbc(oracle_url, "oracle_schema.oracle_table",   properties=oracle_properties)
    if oracle_df.count() == 0:
        raise ValueError("Oracle table is empty.")
    print("Data read from Oracle table:", oracle_df.count(), "rows")
    """,

    # Step 3: Perform a transformation
    f"""
    transformed_df = df.withColumn("new_col", col("existing_col") * 2)
    print("Transformation applied:", transformed_df.count(), "rows")
    """,

    # Step 4: Join with Oracle data
    f"""
    final_df = transformed_df.join(oracle_df, "join_key")
    print("Joined data:", final_df.count(), "rows")
    """,

    # Step 5: Write data to the target Hive table
    f"""
    target_table = "target_db.target_table"
    final_df.write.mode("overwrite").saveAsTable(target_table)
    print("Data written to target table")
    """
]

# Create a Control DataFrame
from pyspark.sql import Row
control_data = [Row(serial_no=i+1, code=step, month_period=current_month) for i, step in enumerate(code_steps)]
control_df = spark.createDataFrame(control_data)
control_df.show(truncate=False)

Step 2: Create a Status Table

Create a status table to store the execution status of each code step. This table can be created in Hive for simplicity.

# Create a status DataFrame
status_schema = "serial_no INT, status STRING, message STRING, month_period STRING, row_count INT"
spark.sql(f"CREATE TABLE IF NOT EXISTS control_db.status_table ({status_schema})")

Step 3: Execute Each Code Step with Error Handling and Status Tracking

Execute each code step sequentially, log the status, any error messages, and the row count of the output tables.

# Function to log status
def log_status(serial_no, status, message, row_count=0):
spark.sql(f"""
INSERT INTO control_db.status_table
VALUES ({serial_no}, '{status}', '{message}', '{current_month}', {row_count})
""")

# Execute each code snippet
code_snippets = control_df.select("serial_no", "code").collect()

for row in code_snippets:
serial_no = row["serial_no"]
code_snippet = row["code"]

try:
print(f"Executing step {serial_no}...")
exec(code_snippet)

# Fetch the row count of the output table if this step writes to a table
if "write.mode" in code_snippet:
output_table = code_snippet.split('"')[1]
row_count = spark.table(output_table).count()
else:
row_count = 0

log_status(serial_no, "SUCCESS", "Executed successfully", row_count)
except Exception as e:
error_message = str(e).replace("'", "")
log_status(serial_no, "FAILED", error_message)
print(f"Error executing step {serial_no}: {error_message}")
break

# Example to display the status table
spark.sql("SELECT * FROM control_db.status_table").show()

Full Example Script

Here’s the complete script:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from datetime import datetime
from pyspark.sql import Row

# Initialize Spark session
spark = SparkSession.builder \
.appName("Execute Code Snippets from Control DataFrame") \
.enableHiveSupport() \
.getOrCreate()

# Define the current month in MMyyyy format
current_month = datetime.now().strftime("%m%Y")

# Define code snippets
code_steps = [
# Step 1: Check if source Hive table exists and has data
f"""
source_table = "source_db.source_table"
if not spark.catalog.tableExists(source_table):
raise ValueError("Source table does not exist.")
df = spark.table(source_table)
if df.count() == 0:
raise ValueError("Source table is empty.")
""",

# Step 2: Read data from an Oracle table
f"""
oracle_url = "jdbc:oracle:thin:@hostname:port:service_name"
oracle_properties = {{
"user": "username",
"password": "password"
}}
oracle_df = spark.read.jdbc(oracle_url, "oracle_schema.oracle_table", properties=oracle_properties)
if oracle_df.count() == 0:
raise ValueError("Oracle table is empty.")
print("Data read from Oracle table:", oracle_df.count(), "rows")
""",

# Step 3: Perform a transformation
f"""
transformed_df = df.withColumn("new_col", col("existing_col") * 2)
print("Transformation applied:", transformed_df.count(), "rows")
""",

# Step 4: Join with Oracle data
f"""
final_df = transformed_df.join(oracle_df, "join_key")
print("Joined data:", final_df.count(), "rows")
""",

# Step 5: Write data to the target Hive table
f"""
target_table = "target_db.target_table"
final_df.write.mode("overwrite").saveAsTable(target_table)
print("Data written to target table")
"""
]

# Create a Control DataFrame
control_data = [Row(serial_no=i+1, code=step, month_period=current_month) for i, step in enumerate(code_steps)]
control_df = spark.createDataFrame(control_data)
control_df.show(truncate=False)

# Create a status table
status_schema = "serial_no INT, status STRING, message STRING, month_period STRING, row_count INT"
spark.sql(f"CREATE TABLE IF NOT EXISTS control_db.status_table ({status_schema})")

# Function to log status
def log_status(serial_no, status, message, row_count=0):
spark.sql(f"""
INSERT INTO control_db.status_table
VALUES ({serial_no}, '{status}', '{message}', '{current_month}', {row_count})
""")

# Execute each code snippet
code_snippets = control_df.select("serial_no", "code").collect()

for row in code_snippets:
serial_no = row["serial_no"]
code_snippet = row["code"]

try:
print(f"Executing step {serial_no}...")
exec(code_snippet)

# Fetch the row count of the output table if this step writes to a table
if "write.mode" in code_snippet:
output_table = code_snippet.split('"')[1]
row_count = spark.table(output_table).count()
else:
row_count = 0

log_status(serial_no, "SUCCESS", "Executed successfully", row_count)
except Exception as e:
error_message = str(e).replace("'", "")
log_status(serial_no, "FAILED", error_message)
print(f"Error executing step {serial_no}: {error_message}")
break

# Example to display the status table
spark.sql("SELECT * FROM control_db.status_table").show()

To execute dynamic code snippets stored in a DataFrame (or any other source) in PySpark, you can use the exec function. This can be particularly useful when you have a control table that stores PySpark code snippets to be executed sequentially. Below is a complete example of how to achieve this, including error handling, logging, and updating status.

Example: Dynamic Code Execution in PySpark

1. Initialize Spark Session

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from datetime import datetime

# Initialize Spark session
spark = SparkSession.builder \
.appName("PySpark Dynamic Code Execution Example") \
.enableHiveSupport() \
.getOrCreate()

2. Create Sample Control DataFrame

# Sample control DataFrame
control_data = [
(1, "df1 = spark.range(0, 10)"),
(2, "df2 = df1.withColumn('square', F.col('id') ** 2)"),
(3, "df2.show()")
]
control_columns = ["serial_no", "code_snippet"]

control_df = spark.createDataFrame(control_data, control_columns)
control_df.show(truncate=False)

3. Create Log and Status DataFrames

# Define schema for log and status tables
log_schema = "timestamp STRING, message STRING, level STRING"
status_schema = "step INT, status STRING, row_count INT, timestamp STRING"

# Initialize empty log and status DataFrames
log_df = spark.createDataFrame([], schema=log_schema)
status_df = spark.createDataFrame([], schema=status_schema)

4. Define Functions for Error Handling and Logging

# Function to log messages
def log(message, level="INFO"):
global log_df
log_entry = [(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), message, level)]
log_df = log_df.union(spark.createDataFrame(log_entry, schema=log_schema))

# Function to update status
def update_status(step, status, row_count=0):
global status_df
status_entry = [(step, status, row_count, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))]
status_df = status_df.union(spark.createDataFrame(status_entry, schema=status_schema))

5. Execute Code Snippets Sequentially

# Function to execute code snippets
def execute_code(control_df):
for row in control_df.collect():
step = row["serial_no"]
code = row["code_snippet"]
try:
# Execute the code snippet
exec(code, globals())
log(f"Step {step} executed successfully", level="INFO")
# Optionally, you can get the row count of a DataFrame if the code produces one
row_count = eval(code.split('=')[0].strip()).count() if '=' in code else 0
update_status(step, "SUCCESS", row_count)
except Exception as e:
log(f"Step {step} failed: {e}", level="ERROR")
update_status(step, "FAILED")
break

# Execute the control DataFrame
execute_code(control_df)

# Show log and status tables
log_df.show(truncate=False)
status_df.show(truncate=False)

Explanation

  1. Initialization: A Spark session is initialized, and a sample control DataFrame is created with code snippets.
  2. Log and Status DataFrames: Empty DataFrames for logs and status are initialized with appropriate schemas.
  3. Logging Functions: Functions for logging messages and updating the status are defined.
  4. Execute Code Snippets: The execute_code function iterates over the control DataFrame, executing each code snippet using exec. It logs the execution status and updates the status table.
  5. Execution and Display: The control DataFrame is executed, and the log and status tables are displayed.

Scheduling the Script

To schedule the script to run automatically, you can use scheduling tools like Apache Airflow, cron jobs, or any other job scheduler that fits your environment. Here’s an example of how you might schedule it using a simple cron job in Linux:

  1. Create a Python script (e.g., pyspark_script.py) with the above code.
  2. Add a cron job to execute the script at the desired interval.
bashCopy code# Open the cron table for editing
crontab -e

# Add a line to run the script at the desired schedule (e.g., every day at midnight)
0 0 * * * /path/to/spark/bin/spark-submit /path/to/pyspark_script.py

This setup will run your PySpark script automatically according to the schedule specified in the cron job.


Discover more from AI HintsToday

Subscribe to get the latest posts sent to your email.

Discover more from AI HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading