Here’s a detailed guide on PySpark DataFrame column and row manipulation with useful implementations:
1. Column Manipulation in PySpark DataFrames
1.1 Renaming Columns
Rename a Single Column
df = df.withColumnRenamed("old_column", "new_column")
Rename Multiple Columns
new_column_names = {"old1": "new1", "old2": "new2"}
for old, new in new_column_names.items():
df = df.withColumnRenamed(old, new)
Add a Suffix to All Column Names
df = df.toDF(*[col + "_v1" for col in df.columns])
1.2 Checking Data Types
Check Data Type of a Specific Column
print(df.schema["column_name"].dataType)
Get Data Types of All Columns
df.dtypes # Returns a list of (column_name, data_type)
Check Schema of DataFrame
df.printSchema()
1.3 Apply Dynamic Logic to All Columns
Example: Trim All String Columns
from pyspark.sql.functions import col, trim
df = df.select([trim(col(c)).alias(c) if dtype == "string" else col(c) for c, dtype in df.dtypes])
Example: Convert All Integer Columns to Double
from pyspark.sql.functions import col
df = df.select([col(c).cast("double") if dtype == "int" else col(c) for c, dtype in df.dtypes])
Example: Replace Nulls in All String Columns with “Unknown”
df = df.fillna("Unknown")
2. Row-Based DataFrame Manipulation
2.1 Collecting Rows One by One
Convert DataFrame to a List of Rows
rows = df.collect()
for row in rows:
print(row)
Using toLocalIterator()
for Large DataFrames (Efficient)
for row in df.toLocalIterator():
print(row)
2.2 Filtering Rows
Filter Rows Based on a Condition
df_filtered = df.filter(df["Age"] > 30)
Filter Multiple Conditions
df_filtered = df.filter((df["Age"] > 30) & (df["Gender"] == "Male"))
2.3 Sorting Rows
df_sorted = df.orderBy("Age", ascending=False)
2.4 Adding a New Row (Using Union)
from pyspark.sql import Row
new_row = Row(ID=100, Name="John Doe", Age=40)
df_new = df.union(spark.createDataFrame([new_row], df.schema))
3. Useful Implementations
3.1 Finding Duplicate Rows
df.groupBy(df.columns).count().filter("count > 1").show()
3.2 Removing Duplicate Rows
df = df.dropDuplicates()
3.3 Adding a New Column Dynamically
from pyspark.sql.functions import lit
df = df.withColumn("NewColumn", lit("DefaultValue"))
Conclusion
- PySpark allows flexible column manipulations like renaming, checking types, and applying transformations.
- Row operations like filtering, sorting, and iterating can be done efficiently.
- Collecting data should be handled carefully to avoid memory overload.
- Dynamic transformations make it easy to process large datasets.
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import col, trim, lit
# Initialize Spark Session
spark = SparkSession.builder.appName("DataFrame_Manipulation").getOrCreate()
# Sample Data
data = [(1, "Alice", 25, "Engineer"), (2, "Bob", 30, "Doctor"), (3, "Charlie", 35, "Teacher")]
df = spark.createDataFrame(data, ["ID", "Name", "Age", "Occupation"])
# 1. Column Manipulation
# Rename a Single Column
df = df.withColumnRenamed("Occupation", "Job")
# Rename Multiple Columns
column_rename_map = {"ID": "UserID", "Name": "FullName"}
for old, new in column_rename_map.items():
df = df.withColumnRenamed(old, new)
# Add a Suffix to All Columns
df = df.toDF(*[col + "_v1" for col in df.columns])
# Check Data Types
df.printSchema()
# Apply Dynamic Logic to All Columns: Trim All String Columns
df = df.select([trim(col(c)).alias(c) if dtype == "string" else col(c) for c, dtype in df.dtypes])
# Convert All Integer Columns to Double
df = df.select([col(c).cast("double") if dtype == "int" else col(c) for c, dtype in df.dtypes])
# 2. Row-Based Manipulation
# Collect Rows One by One
for row in df.collect():
print(row)
# Efficient Row Iteration
for row in df.toLocalIterator():
print(row)
# Filtering Rows
df_filtered = df.filter((df["Age_v1"] > 28.0) & (df["FullName_v1"] != "Bob"))
df_filtered.show()
# Sorting Rows
df_sorted = df.orderBy("Age_v1", ascending=False)
df_sorted.show()
# Adding a New Row
df_new = df.union(spark.createDataFrame([Row(UserID_v1=4.0, FullName_v1="David", Age_v1=40.0, Job_v1="Scientist")], df.schema))
df_new.show()
# Finding Duplicate Rows
df.groupBy(df.columns).count().filter("count > 1").show()
# Removing Duplicate Rows
df = df.dropDuplicates()
df.show()
# Adding a New Column Dynamically
df = df.withColumn("NewColumn_v1", lit("DefaultValue"))
df.show()
# Stop Spark Session
spark.stop()