Looping in PySpark
Looping Over Multiple Transformation Steps in PySpark
If you need to loop over multiple transformation steps and apply them, you can use a Python loop to apply the transformations step by step.
for: Iterate over a sequence (e.g., list, tuple).
while: Execute a block of code while a condition is true.
Python
# For loop
data = [1, 2, 3, 4, 5]
for num in data:
print(num)
# While loop
i = 0
while i < 5:
print(i)
i += 1
Example:
# Example: Loop over a list of transformations
transformation_steps = [
("score", F.col("score") * 2),
("name", F.upper(F.col("name")))
]
for col_name, transformation in transformation_steps:
df = df.withColumn(col_name, transformation)
df.show()
Looping in PySpark: Using foreach
, collect
, or map
You cannot directly use Python for
loops on a PySpark DataFrame. However, you can use PySpark’s distributed operations like foreach()
, collect()
, or map()
to process each row or iterate over columns.
Example of a loop using foreach()
:
You can use foreach()
to apply a function to each row of a DataFrame.
# Define a function to apply to each row
def process_row(row):
print(f"Name: {row['name']}, Score: {row['score']}")
# Apply the function using foreach
df.foreach(process_row)
This function will print each row, but it does not modify the DataFrame. foreach()
is used for side effects, such as writing to external systems.
Example of looping over columns:
If you want to loop over columns and apply some operation, you can use a simple Python for
loop to iterate over the column names.
# Example: trim whitespace from all string columns
from pyspark.sql.types import StringType
# Loop through columns and apply trim if the column is of StringType
for col_name, col_type in df.dtypes:
if col_type == 'string':
df = df.withColumn(col_name, F.trim(F.col(col_name)))
df.show()
Looping Over Data in PySpark Using collect()
You can use collect()
to bring the entire DataFrame to the driver node and then iterate over it. This is not recommended for large datasets, as it can lead to memory issues.
Example:
# Collect the DataFrame to the driver as a list of rows
rows = df.collect()
# Loop through the collected data
for row in rows:
print(f"Name: {row['name']}, Score: {row['score']}")
Discover more from HintsToday
Subscribe to get the latest posts sent to your email.