contact@hintstoday.com  |  (123)-456-7890

Pyspark Dataframe programming – operations, functions, all statements, syntax with Examples

by lochan2014 | Jul 2, 2024 | Pyspark | 0 comments

Here’s a detailed guide on PySpark DataFrame column and row manipulation with useful implementations:


1. Column Manipulation in PySpark DataFrames

1.1 Renaming Columns

Rename a Single Column

df = df.withColumnRenamed("old_column", "new_column")

Rename Multiple Columns

new_column_names = {"old1": "new1", "old2": "new2"}
for old, new in new_column_names.items():
    df = df.withColumnRenamed(old, new)

Add a Suffix to All Column Names

df = df.toDF(*[col + "_v1" for col in df.columns])

1.2 Checking Data Types

Check Data Type of a Specific Column

print(df.schema["column_name"].dataType)

Get Data Types of All Columns

df.dtypes  # Returns a list of (column_name, data_type)

Check Schema of DataFrame

df.printSchema()

1.3 Apply Dynamic Logic to All Columns

Example: Trim All String Columns

from pyspark.sql.functions import col, trim

df = df.select([trim(col(c)).alias(c) if dtype == "string" else col(c) for c, dtype in df.dtypes])

Example: Convert All Integer Columns to Double

from pyspark.sql.functions import col

df = df.select([col(c).cast("double") if dtype == "int" else col(c) for c, dtype in df.dtypes])

Example: Replace Nulls in All String Columns with “Unknown”

df = df.fillna("Unknown")

2. Row-Based DataFrame Manipulation

2.1 Collecting Rows One by One

Convert DataFrame to a List of Rows

rows = df.collect()
for row in rows:
    print(row)

Using toLocalIterator() for Large DataFrames (Efficient)

for row in df.toLocalIterator():
    print(row)

2.2 Filtering Rows

Filter Rows Based on a Condition

df_filtered = df.filter(df["Age"] > 30)

Filter Multiple Conditions

df_filtered = df.filter((df["Age"] > 30) & (df["Gender"] == "Male"))

2.3 Sorting Rows

df_sorted = df.orderBy("Age", ascending=False)

2.4 Adding a New Row (Using Union)

from pyspark.sql import Row

new_row = Row(ID=100, Name="John Doe", Age=40)
df_new = df.union(spark.createDataFrame([new_row], df.schema))

3. Useful Implementations

3.1 Finding Duplicate Rows

df.groupBy(df.columns).count().filter("count > 1").show()

3.2 Removing Duplicate Rows

df = df.dropDuplicates()

3.3 Adding a New Column Dynamically

from pyspark.sql.functions import lit

df = df.withColumn("NewColumn", lit("DefaultValue"))

Conclusion

  • PySpark allows flexible column manipulations like renaming, checking types, and applying transformations.
  • Row operations like filtering, sorting, and iterating can be done efficiently.
  • Collecting data should be handled carefully to avoid memory overload.
  • Dynamic transformations make it easy to process large datasets.
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import col, trim, lit

# Initialize Spark Session
spark = SparkSession.builder.appName("DataFrame_Manipulation").getOrCreate()

# Sample Data
data = [(1, "Alice", 25, "Engineer"), (2, "Bob", 30, "Doctor"), (3, "Charlie", 35, "Teacher")]
df = spark.createDataFrame(data, ["ID", "Name", "Age", "Occupation"])

# 1. Column Manipulation

# Rename a Single Column
df = df.withColumnRenamed("Occupation", "Job")

# Rename Multiple Columns
column_rename_map = {"ID": "UserID", "Name": "FullName"}
for old, new in column_rename_map.items():
    df = df.withColumnRenamed(old, new)

# Add a Suffix to All Columns
df = df.toDF(*[col + "_v1" for col in df.columns])

# Check Data Types
df.printSchema()

# Apply Dynamic Logic to All Columns: Trim All String Columns
df = df.select([trim(col(c)).alias(c) if dtype == "string" else col(c) for c, dtype in df.dtypes])

# Convert All Integer Columns to Double
df = df.select([col(c).cast("double") if dtype == "int" else col(c) for c, dtype in df.dtypes])

# 2. Row-Based Manipulation

# Collect Rows One by One
for row in df.collect():
    print(row)

# Efficient Row Iteration
for row in df.toLocalIterator():
    print(row)

# Filtering Rows
df_filtered = df.filter((df["Age_v1"] > 28.0) & (df["FullName_v1"] != "Bob"))
df_filtered.show()

# Sorting Rows
df_sorted = df.orderBy("Age_v1", ascending=False)
df_sorted.show()

# Adding a New Row
df_new = df.union(spark.createDataFrame([Row(UserID_v1=4.0, FullName_v1="David", Age_v1=40.0, Job_v1="Scientist")], df.schema))
df_new.show()

# Finding Duplicate Rows
df.groupBy(df.columns).count().filter("count > 1").show()

# Removing Duplicate Rows
df = df.dropDuplicates()
df.show()

# Adding a New Column Dynamically
df = df.withColumn("NewColumn_v1", lit("DefaultValue"))
df.show()

# Stop Spark Session
spark.stop()

Written By

undefined

Related Posts

Python Programming Language Specials

Python is a popular high-level, interpreted programming language known for its readability and ease of use. Python was invented by Guido Van Rossum and it was first released in February, 1991. The name python is inspired from Monte Python Flying Circus,…

Read More

Submit a Comment

Your email address will not be published. Required fields are marked *