PySpark supports various control statements to manage the flow of your Spark applications. PySpark supports using Python’s if-else-elif statements, but with limitations.

Supported Usage

  1. Conditional statements within PySpark scripts.
  2. Controlling flow of Spark operations.
  3. Dynamic filtering or data manipulation.

Unsupported Usage

  • Within Spark UDFs (User-Defined Functions).
  • Directly within Spark SQL queries.
  • As part of Spark DataFrame operations.
  • Conditional Logic in PySpark: Using if, elif, else

Conditional statements in Pyspark

1.Python’s if elif else

  1. if: Execute a block of code if a condition is true.
  2. elif: Execute a block of code if the initial condition is false and another condition is true.
  3. else: Execute a block of code if all conditions are false.
if spark.sparkContext.getConf().get("spark.executor.instances") == "4":
    print("Using 4 executor instances")
elif spark.sparkContext.getConf().get("spark.executor.instances") == "2":
    print("Using 2 executor instances")
else:
    print("Default configuration")
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.appName("Conditional Example").getOrCreate()

# Conditional statement
if spark.sparkContext.getConf().get("spark.executor.instances") == "4":
    print("Using 4 executor instances")
elif spark.sparkContext.getConf().get("spark.executor.instances") == "2":
    print("Using 2 executor instances")
else:
    print("Default configuration")

# Dynamic filtering
filter_column = "name"
if filter_column == "name":
    spark.sql("SELECT * FROM customers WHERE name = 'John'")
elif filter_column == "age":
    spark.sql("SELECT * FROM customers WHERE age > 30")

Above examples are great to explain How Python codes will be handled on Pyspark Distributed framework.

PySpark’s DataFrame operations are executed on the distributed framework, and Python’s if-else statements cannot be directly applied inside these operations.

However, you can use if-else statements in the driver code to dynamically construct the DataFrame operations or SQL queries, as shown in above example:

Python Code

# Dynamic filtering
filter_column = "name"

if filter_column == "name":
    df = spark.sql("SELECT * FROM customers WHERE name = 'John'")
elif filter_column == "age":
    df = spark.sql("SELECT * FROM customers WHERE age > 30")

df.show()

In this example, the if-else statement is executed on the driver, and the resulting SQL query is executed on the distributed framework.

Alternatively, you can use Python’s conditional expressions to construct the filter conditions:

Python

filter_column = "name"
filter_value = "John"

condition = f"{filter_column} = '{filter_value}'" if filter_column == "name" else f"{filter_column} > 30"

df = spark.sql(f"SELECT * FROM customers WHERE {condition}")
df.show()

This approach allows you to dynamically construct the filter condition based on the filter_column variable.

Note that in both examples, the actual filtering operation is executed on the distributed framework, while the if-else statement is executed on the driver.

2. Use when and otherwise for Simple Conditions:

In PySpark, the when and otherwise functions replace traditional if-else logic. They are chainable, meaning you can achieve the effect of elif as well.

Syntax

from pyspark.sql.functions import when, col

df.withColumn("new_column", when(condition, value_if_true).otherwise(value_if_false))
Example:

Here’s how you would implement an if-elif-else logic using when and otherwise:

Python if-else code (NOT supported in PySpark -Directly inside a Dataframe Operation which will be executing on Distributed framework):

if score >= 70:
    grade = 'A'
elif score >= 50:
    grade = 'B'
else:
    grade = 'C'

Equivalent PySpark code:

from pyspark.sql import functions as F

df = df.withColumn("grade", 
                   F.when(F.col("score") >= 70, "A")
                    .when(F.col("score") >= 50, "B")
                    .otherwise("C"))
df.show()

This will create a new column grade based on the value of the score column.

you can use when and otherwise in various PySpark scenarios beyond withColumn and aggregate functions.

Scenarios

1. select statement

df.select(
    df.name,
    when(df.age > 30, "adult").otherwise("minor").alias("age_group")
)

2. where clause

df.where(when(df.country == "USA", df.sales > 1000).otherwise(df.sales > 500))

3. join conditions

df1.join(
    df2,
    when(df1.id == df2.id, df1.id).otherwise(df1.id_null).equalTo(df2.id)
)

4. case statements (Spark SQL)

spark.sql("""
    SELECT *,
    CASE
        WHEN age > 30 THEN 'adult'
        ELSE 'minor'
    END AS age_group
    FROM customers
""")

5. filter method

df.filter(when(df.country == "USA", df.sales > 1000).otherwise(df.sales > 500))

6. union method

df.union(
    df2.select(
        when(df2.id == df.id, df2.name).otherwise(df.name).alias("name")
    )
)

7. withColumnRenamed

df.withColumnRenamed(
    "age",
    when(df.age > 30, "adult_age").otherwise("minor_age")
)

Tips and Variations

  1. Chain multiple when statements for complex conditions.
  2. Use otherwise to specify default values.
  3. Combine with other PySpark functions (e.g., and, or).
  4. Leverage Spark SQL’s CASE statement for more complex logic.

3. Using UDFs (User Defined Functions) for Complex Logic:

If you need more complex logic in your conditions that cannot easily be handled by when() and otherwise(), you can use UDFs to apply Python if-else logic to the DataFrame.

Example:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define the Python function
def grade(score):
    if score >= 70:
        return 'A'
    elif score >= 50:
        return 'B'
    else:
        return 'C'

# Convert the Python function to a UDF
grade_udf = udf(grade, StringType())

# Apply the UDF to the DataFrame
df = df.withColumn("grade", grade_udf(F.col("score")))
df.show()
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def age_group(age):
    if age > 30:
        return "adult"
    else:
        return "minor"

age_group_udf = udf(age_group, StringType())
customers_df.withColumn("age_group", age_group_udf("age"))

Here, the function grade behaves like a Python if-elif-else statement and is applied to the DataFrame using udf.

4. Using F.expr() for SQL-like Conditions:

Another way to handle conditional logic is by writing SQL-like expressions using F.expr().

Example:
df = df.withColumn(
    "grade", 
    F.expr("CASE WHEN score >= 70 THEN 'A' WHEN score >= 50 THEN 'B' ELSE 'C' END")
)
df.show()

This CASE WHEN syntax is equivalent to if-elif-else in SQL and is supported within PySpark’s F.expr() function.

5.Spark SQL queries: Use CASE statements or IF functions.

spark.sql("SELECT *, CASE WHEN age > 30 THEN 'adult' ELSE 'minor' END AS age_group

Summary:

  • Direct Python if-else or elif statements cannot be used in PySpark DataFrame operations because PySpark processes data across a cluster in a distributed fashion.
  • Use when and otherwise for conditional logic in PySpark DataFrames.
  • For more complex logic, use UDFs (User Defined Functions).
  • SQL-style CASE WHEN expressions are supported through F.expr() for more complex conditions.

Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Pages ( 1 of 3 ): 1 23Next »

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading