Here’s a detailed guide on PySpark DataFrame column and row manipulation with useful implementations:
1. Column Manipulation in PySpark DataFrames
1.1 Renaming Columns
Rename a Single Column
df = df.withColumnRenamed("old_column", "new_column")
Rename Multiple Columns
new_column_names = {"old1": "new1", "old2": "new2"}
for old, new in new_column_names.items():
df = df.withColumnRenamed(old, new)
Add a Suffix to All Column Names
df = df.toDF(*[col + "_v1" for col in df.columns])
1.2 Checking Data Types
Check Data Type of a Specific Column
print(df.schema["column_name"].dataType)
Get Data Types of All Columns
df.dtypes # Returns a list of (column_name, data_type)
Check Schema of DataFrame
df.printSchema()
1.3 Apply Dynamic Logic to All Columns
Example: Trim All String Columns
from pyspark.sql.functions import col, trim
df = df.select([trim(col(c)).alias(c) if dtype == "string" else col(c) for c, dtype in df.dtypes])
Example: Convert All Integer Columns to Double
from pyspark.sql.functions import col
df = df.select([col(c).cast("double") if dtype == "int" else col(c) for c, dtype in df.dtypes])
Example: Replace Nulls in All String Columns with “Unknown”
df = df.fillna("Unknown")
2. Row-Based DataFrame Manipulation
2.1 Collecting Rows One by One
Convert DataFrame to a List of Rows
rows = df.collect()
for row in rows:
print(row)
Using toLocalIterator() for Large DataFrames (Efficient)
for row in df.toLocalIterator():
print(row)
2.2 Filtering Rows
Filter Rows Based on a Condition
df_filtered = df.filter(df["Age"] > 30)
Filter Multiple Conditions
df_filtered = df.filter((df["Age"] > 30) & (df["Gender"] == "Male"))
2.3 Sorting Rows
df_sorted = df.orderBy("Age", ascending=False)
2.4 Adding a New Row (Using Union)
from pyspark.sql import Row
new_row = Row(ID=100, Name="John Doe", Age=40)
df_new = df.union(spark.createDataFrame([new_row], df.schema))
3. Useful Implementations
3.1 Finding Duplicate Rows
df.groupBy(df.columns).count().filter("count > 1").show()
3.2 Removing Duplicate Rows
df = df.dropDuplicates()
3.3 Adding a New Column Dynamically
from pyspark.sql.functions import lit
df = df.withColumn("NewColumn", lit("DefaultValue"))
Conclusion
- PySpark allows flexible column manipulations like renaming, checking types, and applying transformations.
- Row operations like filtering, sorting, and iterating can be done efficiently.
- Collecting data should be handled carefully to avoid memory overload.
- Dynamic transformations make it easy to process large datasets.
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import col, trim, lit
# Initialize Spark Session
spark = SparkSession.builder.appName("DataFrame_Manipulation").getOrCreate()
# Sample Data
data = [(1, "Alice", 25, "Engineer"), (2, "Bob", 30, "Doctor"), (3, "Charlie", 35, "Teacher")]
df = spark.createDataFrame(data, ["ID", "Name", "Age", "Occupation"])
# 1. Column Manipulation
# Rename a Single Column
df = df.withColumnRenamed("Occupation", "Job")
# Rename Multiple Columns
column_rename_map = {"ID": "UserID", "Name": "FullName"}
for old, new in column_rename_map.items():
df = df.withColumnRenamed(old, new)
# Add a Suffix to All Columns
df = df.toDF(*[col + "_v1" for col in df.columns])
# Check Data Types
df.printSchema()
# Apply Dynamic Logic to All Columns: Trim All String Columns
df = df.select([trim(col(c)).alias(c) if dtype == "string" else col(c) for c, dtype in df.dtypes])
# Convert All Integer Columns to Double
df = df.select([col(c).cast("double") if dtype == "int" else col(c) for c, dtype in df.dtypes])
# 2. Row-Based Manipulation
# Collect Rows One by One
for row in df.collect():
print(row)
# Efficient Row Iteration
for row in df.toLocalIterator():
print(row)
# Filtering Rows
df_filtered = df.filter((df["Age_v1"] > 28.0) & (df["FullName_v1"] != "Bob"))
df_filtered.show()
# Sorting Rows
df_sorted = df.orderBy("Age_v1", ascending=False)
df_sorted.show()
# Adding a New Row
df_new = df.union(spark.createDataFrame([Row(UserID_v1=4.0, FullName_v1="David", Age_v1=40.0, Job_v1="Scientist")], df.schema))
df_new.show()
# Finding Duplicate Rows
df.groupBy(df.columns).count().filter("count > 1").show()
# Removing Duplicate Rows
df = df.dropDuplicates()
df.show()
# Adding a New Column Dynamically
df = df.withColumn("NewColumn_v1", lit("DefaultValue"))
df.show()
# Stop Spark Session
spark.stop()
Most Important PySpark DataFrame Transformation Operations 🚀
PySpark transformations are lazy operations that create a new DataFrame without modifying the original one. Here are the most essential transformation operations in PySpark:
1. Selecting and Renaming Columns
Select Specific Columns
df_selected = df.select("column1", "column2")
Rename Columns
df_renamed = df.withColumnRenamed("old_col", "new_col")
2. Filtering Data
Filter Based on Condition
df_filtered = df.filter(df["age"] > 25)
Multiple Conditions
df_filtered = df.filter((df["age"] > 25) & (df["gender"] == "Male"))
3. Adding or Modifying Columns
Create a New Column
from pyspark.sql.functions import lit
df_new = df.withColumn("new_column", lit("default_value"))
Modify Existing Column
df_modified = df.withColumn("salary_incremented", df["salary"] * 1.10)
4. Dropping Columns
df_dropped = df.drop("column_to_remove")
5. Handling Missing Data
Fill Missing Values
df_filled = df.fillna({"age": 0, "name": "Unknown"})
Drop Rows with Nulls
df_cleaned = df.dropna()
6. Aggregations & Grouping
Group By and Aggregate
from pyspark.sql.functions import sum, avg, count
df_grouped = df.groupBy("department").agg(sum("salary").alias("total_salary"), avg("age"))
Count Distinct Values
df.select("department").distinct().count()
7. Sorting Data
df_sorted = df.orderBy("age", ascending=False)
8. Joining DataFrames
Inner Join
df_joined = df1.join(df2, df1["id"] == df2["id"], "inner")
Left Join
df_left = df1.join(df2, df1["id"] == df2["id"], "left")
9. Union (Appending DataFrames)
df_combined = df1.union(df2)
10. Exploding Nested Data
from pyspark.sql.functions import explode
df_exploded = df.withColumn("exploded_column", explode(df["nested_column"]))
Conclusion
- These transformations do not modify the original DataFrame but return a new one.
- PySpark applies lazy evaluation, meaning transformations are only executed when an action is performed.
Most Useful PySpark DataFrame Functions
PySpark provides many built-in functions for column aliasing, distinct values, transformations, and aggregations. Here’s a collection of the most useful ones:
1. Column Aliasing (alias())
- Used to rename a column temporarily within a query.
from pyspark.sql.functions import col
df_alias = df.select(col("name").alias("full_name"), col("age"))
df_alias.show()
2. Removing Duplicates (distinct())
- Removes duplicate rows from the DataFrame.
df_distinct = df.distinct()
df_distinct.show()
- Count distinct values in a column:
df.select("department").distinct().count()
3. Filtering Data (filter() & where())
- Using
.filter():
df_filtered = df.filter(df["age"] > 25)
- Using
.where()(same asfilterbut SQL-like syntax):
df_filtered = df.where("age > 25")
4. Column Operations
withColumn() – Create or Modify Columns
from pyspark.sql.functions import lit
df_new = df.withColumn("new_column", lit("default_value"))
cast() – Change Data Type
df_casted = df.withColumn("salary", df["salary"].cast("double"))
5. Aggregations
groupBy() with Aggregations
from pyspark.sql.functions import sum, avg, count
df_grouped = df.groupBy("department").agg(
sum("salary").alias("total_salary"),
avg("age").alias("average_age")
)
df_grouped.show()
6. Sorting (orderBy())
df_sorted = df.orderBy("age", ascending=False)
df_sorted.show()
7. Joins
df_joined = df1.join(df2, df1["id"] == df2["id"], "inner")
8. Exploding Nested Data (explode())
from pyspark.sql.functions import explode
df_exploded = df.withColumn("exploded_column", explode(df["nested_column"]))
df_exploded.show()
9. Collecting Rows
rows = df.collect()
for row in rows:
print(row)
10. Row Numbering & Ranking
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window.partitionBy("department").orderBy("salary")
df_ranked = df.withColumn("rank", row_number().over(windowSpec))
df_ranked.show()
Conclusion
.alias()is useful for renaming columns temporarily..distinct()removes duplicates..filter()and.where()allow conditional selection..groupBy()and.orderBy()are useful for aggregations and sorting.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, sum, avg, count, explode, row_number
from pyspark.sql.window import Window
# Initialize Spark Session
spark = SparkSession.builder.appName("PySpark_Useful_Functions").getOrCreate()
# Sample Data
data = [(1, "Alice", 25, "HR", 50000),
(2, "Bob", 30, "IT", 60000),
(3, "Charlie", 35, "IT", 70000),
(4, "David", 40, "Finance", 80000),
(5, "Eve", 45, "Finance", 90000)]
columns = ["ID", "Name", "Age", "Department", "Salary"]
df = spark.createDataFrame(data, columns)
# 1. Alias (Renaming Columns Temporarily)
df_alias = df.select(col("Name").alias("Full_Name"), col("Age"))
df_alias.show()
# 2. Distinct (Remove Duplicates)
df_distinct = df.select("Department").distinct()
df_distinct.show()
# 3. Filtering Data
df_filtered = df.filter((df["Age"] > 30) & (df["Department"] == "IT"))
df_filtered.show()
# 4. Adding & Modifying Columns
df_new = df.withColumn("New_Column", lit("DefaultValue"))
df_casted = df.withColumn("Salary", df["Salary"].cast("double"))
df_new.show()
df_casted.printSchema()
# 5. Aggregations (Sum, Average, Count)
df_grouped = df.groupBy("Department").agg(
sum("Salary").alias("Total_Salary"),
avg("Age").alias("Average_Age")
)
df_grouped.show()
# 6. Sorting
df_sorted = df.orderBy("Age", ascending=False)
df_sorted.show()
# 7. Joining DataFrames
extra_data = [(1, "US"), (2, "Canada"), (3, "UK"), (4, "Germany"), (5, "India")]
columns_extra = ["ID", "Country"]
df_extra = spark.createDataFrame(extra_data, columns_extra)
df_joined = df.join(df_extra, "ID", "inner")
df_joined.show()
# 8. Exploding Nested Data
df_nested = df.withColumn("Hobbies", lit("['Reading', 'Sports']"))
df_exploded = df_nested.withColumn("Hobby", explode(lit(["Reading", "Sports"])))
df_exploded.show()
# 9. Collecting Rows
rows = df.collect()
for row in rows:
print(row)
# 10. Row Numbering & Ranking
windowSpec = Window.partitionBy("Department").orderBy("Salary")
df_ranked = df.withColumn("Rank", row_number().over(windowSpec))
df_ranked.show()
# Stop Spark Session
spark.stop()
Leave a Reply