I have divided a pyspark big script in many steps –by using steps1=”’ some codes”’ till steps7, i want to execute all these steps one after another and also if needed some steps can be not be executed. if any steps fails then then next step only get executed if mentioned to run even if previous has failed.

To execute multiple steps (defined as strings of code) sequentially in PySpark, while controlling which steps should execute next based on success or failure, you can implement a mechanism for error handling and conditional execution.

Here’s a sample approach using Python’s exec() function to run the steps dynamically and adding a control structure to handle step execution and failures.

Step-by-Step Approach:

  1. Define the steps as strings (already done).
  2. Create a control table or list to specify:
    • Whether to run the step.
    • Whether the next step should run even if the current step fails.
  3. Use try and except blocks to handle execution and log errors.
  4. Conditional execution of the next steps based on success/failure.

Example:

pythonCopy code# Defining your steps as code blocks
step1 = '''
print("Executing Step 1")
df = spark.read.csv("data1.csv")
'''

step2 = '''
print("Executing Step 2")
df = df.filter(df['age'] > 30)
'''

step3 = '''
print("Executing Step 3")
df = df.groupBy("country").count()
'''

step4 = '''
print("Executing Step 4")
# Assume this step fails
raise ValueError("Simulated failure in Step 4")
'''

# Control table: run_step=True means execute, run_on_failure=True means continue if previous failed
control_table = [
    {"step": step1, "run_step": True, "run_on_failure": True},   # Step 1
    {"step": step2, "run_step": True, "run_on_failure": False},  # Step 2
    {"step": step3, "run_step": True, "run_on_failure": True},   # Step 3
    {"step": step4, "run_step": True, "run_on_failure": False},  # Step 4
]

# Function to execute the steps
def execute_steps(control_table):
    for index, step_info in enumerate(control_table):
        step_code = step_info["step"]
        run_step = step_info["run_step"]
        run_on_failure = step_info["run_on_failure"]

        # If the step is marked to run
        if run_step:
            try:
                print(f"nRunning Step {index + 1}...")
                exec(step_code)
                print(f"Step {index + 1} executed successfully.")
            except Exception as e:
                print(f"Step {index + 1} failed with error: {e}")

                # Check if the next step should be skipped if failure occurs
                if not run_on_failure:
                    print(f"Skipping next step due to failure in Step {index + 1}.")
                    break
        else:
            print(f"Skipping Step {index + 1}.")

# Execute all steps based on the control table
execute_steps(control_table)

Key Points:

  1. Control Table: The control_table holds information on whether each step should run and whether the next step should run even if the current step fails (run_on_failure).
    • run_step=True: Indicates the step should be executed.
    • run_on_failure=True: If the previous step fails, continue executing this step.
  2. Error Handling: The try and except block ensures that if a step fails, you can control whether to continue with the next steps.
  3. Execution Logic: Steps are executed with exec(step_code) dynamically. Each step’s execution is logged with success or failure information.

This allows you to flexibly handle step execution and error management in your PySpark scripts.

to maintain a log of successes and failures, or add a retry mechanism for certain steps. You can also store execution results in a control table like the one you mentioned in your earlier queries (to save step results in Oracle).

from time import sleep
import cx_Oracle  # You'll need cx_Oracle library to connect to Oracle

# Example Steps
step1 = '''
print("Executing Step 1")
df = spark.read.csv("data1.csv")
'''

step2 = '''
print("Executing Step 2")
df = df.filter(df['age'] > 30)
'''

step3 = '''
print("Executing Step 3")
df = df.groupBy("country").count()
'''

step4 = '''
print("Executing Step 4")
# Simulated failure
raise ValueError("Simulated failure in Step 4")
'''

# Control table: run_step=True means execute, run_on_failure=True means continue if previous failed
control_table = [
    {"step": step1, "run_step": True, "run_on_failure": True, "retry_count": 2},  # Step 1
    {"step": step2, "run_step": True, "run_on_failure": False, "retry_count": 1}, # Step 2
    {"step": step3, "run_step": True, "run_on_failure": True, "retry_count": 2},  # Step 3
    {"step": step4, "run_step": True, "run_on_failure": False, "retry_count": 1}, # Step 4
]

# Function to connect to Oracle and insert logs
def insert_into_control_table(step_number, status, retry_count):
    connection = cx_Oracle.connect('user/password@hostname:port/service_name')
    cursor = connection.cursor()

    # Insert execution details into control table
    insert_query = '''
        INSERT INTO control_table (step_number, status, retry_count, executed_at)
        VALUES (:1, :2, :3, SYSTIMESTAMP)
    '''
    cursor.execute(insert_query, (step_number, status, retry_count))
    connection.commit()
    cursor.close()
    connection.close()

# Function to execute the steps with logging and retry mechanism
def execute_steps_with_retry(control_table):
    for index, step_info in enumerate(control_table):
        step_code = step_info["step"]
        run_step = step_info["run_step"]
        run_on_failure = step_info["run_on_failure"]
        max_retries = step_info["retry_count"]
        step_number = index + 1
        retries = 0
        success = False

        if run_step:
            while retries <= max_retries:
                try:
                    print(f"nRunning Step {step_number} (Attempt {retries + 1})...")
                    exec(step_code)  # Execute the step
                    success = True
                    print(f"Step {step_number} executed successfully.")
                    insert_into_control_table(step_number, "SUCCESS", retries)  # Log success
                    break  # Exit loop if successful
                except Exception as e:
                    retries += 1
                    print(f"Step {step_number} failed with error: {e}")
                    if retries > max_retries:
                        insert_into_control_table(step_number, "FAILURE", retries)  # Log failure
                        print(f"Max retries exceeded for Step {step_number}.")
                        if not run_on_failure:
                            print(f"Skipping remaining steps due to failure in Step {step_number}.")
                            return  # Exit if we shouldn't continue after failure
                    else:
                        print(f"Retrying Step {step_number} in 5 seconds...")
                        sleep(5)  # Wait before retrying
        else:
            print(f"Skipping Step {step_number}.")

# Execute steps with logging and retry
execute_steps_with_retry(control_table)

control table as dataframe and can be saved in Hive or export as excel

from time import sleep
import pandas as pd

# Sample PySpark DataFrame for control table
from pyspark.sql import Row
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder 
    .appName("Step Execution Logging") 
    .enableHiveSupport() 
    .getOrCreate()

# Control table as a list of dictionaries
control_data = [
    {"step_number": 1, "step_name": "Step 1", "run_step": True, "run_on_failure": True, "retry_count": 2},
    {"step_number": 2, "step_name": "Step 2", "run_step": True, "run_on_failure": False, "retry_count": 1},
    {"step_number": 3, "step_name": "Step 3", "run_step": True, "run_on_failure": True, "retry_count": 2},
    {"step_number": 4, "step_name": "Step 4", "run_step": True, "run_on_failure": False, "retry_count": 1},
]

# Convert control data to a DataFrame
control_table_df = spark.createDataFrame(Row(**x) for x in control_data)

# Function to update control table DataFrame after each step
def update_control_table(step_number, status, retries):
    global control_table_df
    control_table_df = control_table_df.withColumn(
        "status", control_table_df["step_number"].cast("string"))  # Simulated update, replace with real status

    control_table_df = control_table_df.withColumn(
        "retry_count", control_table_df["step_number"].cast("int"))  # Simulated update, replace retries

# Steps and retry logic remains the same
def execute_steps_with_retry(control_table_df):
    for row in control_table_df.collect():
        step_number = row['step_number']
        run_step = row['run_step']
        run_on_failure = row['run_on_failure']
        retries = 0
        max_retries = row['retry_count']
        step_code = f"Step_{step_number}"

        if run_step:
            while retries <= max_retries:
                try:
                    print(f"nRunning Step {step_number} (Attempt {retries + 1})...")
                    exec(step_code)  # Execute the step
                    update_control_table(step_number, "SUCCESS", retries)  # Log success
                    break
                except Exception as e:
                    retries += 1
                    print(f"Step {step_number} failed with error: {e}")
                    if retries > max_retries:
                        update_control_table(step_number, "FAILURE", retries)  # Log failure
                        print(f"Max retries exceeded for Step {step_number}.")
                        if not run_on_failure:
                            print(f"Skipping remaining steps due to failure in Step {step_number}.")
                            return
                    else:
                        print(f"Retrying Step {step_number} in 5 seconds...")
                        sleep(5)
        else:
            print(f"Skipping Step {step_number}.")

# Execute steps
execute_steps_with_retry(control_table_df)

# 1. Save to Hive
control_table_df.write.mode("overwrite").saveAsTable("your_hive_database.control_table")

# 2. Export to Excel (using pandas)
# Convert to Pandas DataFrame
control_table_pandas = control_table_df.toPandas()

# Export to Excel
control_table_pandas.to_excel("control_table_log.xlsx", index=False)

print("Execution results saved to Hive and exported as Excel.")
from time import sleep
import pandas as pd

# Sample PySpark DataFrame for control table
from pyspark.sql import Row
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder 
    .appName("Step Execution Logging") 
    .enableHiveSupport() 
    .getOrCreate()

# Control table as a list of dictionaries
control_data = [
    {"step_number": 1, "step_name": "Step 1", "run_step": True, "run_on_failure": True, "retry_count": 2},
    {"step_number": 2, "step_name": "Step 2", "run_step": True, "run_on_failure": False, "retry_count": 1},
    {"step_number": 3, "step_name": "Step 3", "run_step": True, "run_on_failure": True, "retry_count": 2},
    {"step_number": 4, "step_name": "Step 4", "run_step": True, "run_on_failure": False, "retry_count": 1},
]

# Convert control data to a DataFrame
control_table_df = spark.createDataFrame(Row(**x) for x in control_data)

# Function to update control table DataFrame after each step
def update_control_table(step_number, status, retries):
    global control_table_df
    control_table_df = control_table_df.withColumn(
        "status", control_table_df["step_number"].cast("string"))  # Simulated update, replace with real status

    control_table_df = control_table_df.withColumn(
        "retry_count", control_table_df["step_number"].cast("int"))  # Simulated update, replace retries

# Steps and retry logic remains the same
def execute_steps_with_retry(control_table_df):
    for row in control_table_df.collect():
        step_number = row['step_number']
        run_step = row['run_step']
        run_on_failure = row['run_on_failure']
        retries = 0
        max_retries = row['retry_count']
        step_code = "Step_{}".format(step_number)  # Replaced f-string with .format()

        if run_step:
            while retries <= max_retries:
                try:
                    print("nRunning Step {} (Attempt {})...".format(step_number, retries + 1))  # Replaced f-string
                    exec(step_code)  # Execute the step
                    update_control_table(step_number, "SUCCESS", retries)  # Log success
                    break
                except Exception as e:
                    retries += 1
                    print("Step {} failed with error: {}".format(step_number, e))  # Replaced f-string
                    if retries > max_retries:
                        update_control_table(step_number, "FAILURE", retries)  # Log failure
                        print("Max retries exceeded for Step {}.".format(step_number))  # Replaced f-string
                        if not run_on_failure:
                            print("Skipping remaining steps due to failure in Step {}.".format(step_number))  # Replaced f-string
                            return
                    else:
                        print("Retrying Step {} in 5 seconds...".format(step_number))  # Replaced f-string
                        sleep(5)
        else:
            print("Skipping Step {}.".format(step_number))  # Replaced f-string

# Execute steps
execute_steps_with_retry(control_table_df)

# 1. Save to Hive
control_table_df.write.mode("overwrite").saveAsTable("your_hive_database.control_table")

# 2. Export to Excel (using pandas)
# Convert to Pandas DataFrame
control_table_pandas = control_table_df.toPandas()

# Export to Excel
control_table_pandas.to_excel("control_table_log.xlsx", index=False)

print("Execution results saved to Hive and exported as Excel.")
# Define your step functions
def Step_1():
    print("Executing Step 1")

def Step_2():
    print("Executing Step 2")

# Your step_code contains the name of the function to execute
step_code = "Step_1"

# Fetch the function from globals and execute it
exec(globals()[step_code]())
from time import sleep
import pandas as pd
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder 
    .appName("Step Execution Logging") 
    .enableHiveSupport() 
    .getOrCreate()

# Define schema and control data directly
schema = ["step_number", "step_name", "run_step", "run_on_failure", "retry_count"]
control_data = [
    (1, "Step 1", True, True, 2),
    (2, "Step 2", True, False, 1),
    (3, "Step 3", True, True, 2),
    (4, "Step 4", True, False, 1)
]
control_table_df = spark.createDataFrame(control_data, schema=schema)

# Initialize a DataFrame to log job statuses
job_status_data = []
job_status_df = spark.createDataFrame(job_status_data, schema=["step_number", "table_name", "action", "count"])

def update_control_table(step_number, status, retries):
    global control_table_df
    control_table_df = control_table_df.withColumn(
        "status", col("step_number").cast("string"))  # Simulated update, replace with real status

    control_table_df = control_table_df.withColumn(
        "retry_count", col("step_number").cast("int"))  # Simulated update, replace retries

def log_job_status(step_number, table_name, action, count):
    global job_status_df
    new_status = [(step_number, table_name, action, count)]
    new_status_df = spark.createDataFrame(new_status, schema=["step_number", "table_name", "action", "count"])
    job_status_df = job_status_df.union(new_status_df)

def execute_steps_with_retry(control_table_df):
    for row in control_table_df.collect():
        step_number = row['step_number']
        run_step = row['run_step']
        run_on_failure = row['run_on_failure']
        retries = 0
        max_retries = row['retry_count']
        step_code = "Step_{}".format(step_number)  # Using .format()

        if run_step:
            while retries <= max_retries:
                try:
                    print("nRunning Step {} (Attempt {})...".format(step_number, retries + 1))
                    
                    # Assume the step_code variable contains the function name
                    # Fetch the function from globals and execute it
                    exec(globals()[step_code]())
                    
                    # Log table actions and counts here (dummy values used)
                    log_job_status(step_number, "some_table", "save", 1000)
                    
                    update_control_table(step_number, "SUCCESS", retries)  # Log success
                    break
                except Exception as e:
                    retries += 1
                    print("Step {} failed with error: {}".format(step_number, e))
                    if retries > max_retries:
                        update_control_table(step_number, "FAILURE", retries)  # Log failure
                        print("Max retries exceeded for Step {}.".format(step_number))
                        if not run_on_failure:
                            print("Skipping remaining steps due to failure in Step {}.".format(step_number))
                            return
                    else:
                        print("Retrying Step {} in 5 seconds...".format(step_number))
                        sleep(5)
        else:
            print("Skipping Step {}.".format(step_number))

        # Cache and uncache DataFrames at the end of the step
        df_to_cache = spark.table("some_table")  # Replace with the DataFrame you are working with
        df_to_cache.cache()
        df_to_cache.unpersist()  # Uncache after step completion

# Define step functions for demonstration
def Step_1():
    # Your Step_1 logic here
    print("Executing Step 1")

def Step_2():
    # Your Step_2 logic here
    print("Executing Step 2")

# Execute steps
execute_steps_with_retry(control_table_df)

# 1. Save control table and job status to Hive
control_table_df.write.mode("overwrite").saveAsTable("your_hive_database.control_table")
job_status_df.write.mode("overwrite").saveAsTable("your_hive_database.job_status")

# 2. Export to Excel (using pandas)
# Convert to Pandas DataFrames
control_table_pandas = control_table_df.toPandas()
job_status_pandas = job_status_df.toPandas()

# Export to Excel
with pd.ExcelWriter("execution_results.xlsx") as writer:
    control_table_pandas.to_excel(writer, sheet_name='Control Table', index=False)
    job_status_pandas.to_excel(writer, sheet_name='Job Status', index=False)

print("Execution results saved to Hive and exported as Excel.")
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_add, date_format, lit
from pyspark.sql.types import IntegerType
from datetime import datetime

# Initialize Spark session
spark = SparkSession.builder 
    .appName("Generate YearMonth List") 
    .getOrCreate()

# Get current date
current_date = datetime.now().strftime("%Y-%m-%d")

# Create a DataFrame with current date
df = spark.createDataFrame([(current_date,)], ["current_date"])

# Generate a sequence of -12 to +12 months (last and next 12 months)
months_range = list(range(-12, 13))

# Use date_add to calculate past and future months
months_df = df.select([date_format(date_add(lit(current_date), 30 * i), 'yyyyMM').alias(f'month_{i}') for i in months_range])

# Collect values into a list (or you can keep them as separate variables)
months_list = months_df.collect()[0]

# Print the list of last and next 12 months
for i, month in enumerate(months_list):
    print(f'Month_{i-12}: {month}')

# Example of accessing each month as separate variables
month_vars = {f"month_{i}": months_list[i] for i in range(len(months_list))}

# Now you can access individual months
next_month = month_vars['month_1']  # Next month
last_month = month_vars['month_-1']  # Previous month
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql import Row

# Initialize Spark session
spark = SparkSession.builder 
    .appName("Generate YearMonth List") 
    .getOrCreate()

# Get the current date
current_date = datetime.now()

# Function to generate year-month (yyyyMM) format for a given date
def generate_year_month(date):
    return date.strftime('%Y%m')

# Create a list to store the year-month values
months_list = []

# Generate the next 12 months (including the current month)
for i in range(0, 13):
    future_date = current_date + timedelta(days=30 * i)  # Approximate to 30 days per month
    months_list.append(generate_year_month(future_date))

# Generate the last 12 months
for i in range(1, 13):
    past_date = current_date - timedelta(days=30 * i)  # Approximate to 30 days per month
    months_list.append(generate_year_month(past_date))

# Convert the list into a DataFrame for further operations in PySpark
months_rdd = spark.sparkContext.parallelize(months_list)
months_df = spark.createDataFrame(months_rdd.map(lambda x: Row(month=x)))

# Show the DataFrame
months_df.show()

# Example of accessing specific months in the list
print("Next month: ", months_list[1])
print("Previous month: ", months_list[-1])

Discover more from AI HitsToday

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

About the HintsToday

AI HintsToday is One Stop Adda to learn All about AI, Data, ML, Stat Learning, SAS, SQL, Python, Pyspark. AHT is Future!

Explore the Posts

Latest Comments

Latest posts

Discover more from AI HitsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading