contact@hintstoday.comΒ  |Β  (123)-456-7890

Pyspark Dataframe programming – operations, functions, all statements, syntax with Examples

by lochan2014 | Jul 2, 2024 | Pyspark | 0 comments

Most Important PySpark DataFrame Transformation Operations πŸš€

PySpark transformations are lazy operations that create a new DataFrame without modifying the original one. Here are the most essential transformation operations in PySpark:


1. Selecting and Renaming Columns

Select Specific Columns

df_selected = df.select("column1", "column2")

Rename Columns

df_renamed = df.withColumnRenamed("old_col", "new_col")

2. Filtering Data

Filter Based on Condition

df_filtered = df.filter(df["age"] > 25)

Multiple Conditions

df_filtered = df.filter((df["age"] > 25) & (df["gender"] == "Male"))

3. Adding or Modifying Columns

Create a New Column

from pyspark.sql.functions import lit

df_new = df.withColumn("new_column", lit("default_value"))

Modify Existing Column

df_modified = df.withColumn("salary_incremented", df["salary"] * 1.10)

4. Dropping Columns

df_dropped = df.drop("column_to_remove")

5. Handling Missing Data

Fill Missing Values

df_filled = df.fillna({"age": 0, "name": "Unknown"})

Drop Rows with Nulls

df_cleaned = df.dropna()

6. Aggregations & Grouping

Group By and Aggregate

from pyspark.sql.functions import sum, avg, count

df_grouped = df.groupBy("department").agg(sum("salary").alias("total_salary"), avg("age"))

Count Distinct Values

df.select("department").distinct().count()

7. Sorting Data

df_sorted = df.orderBy("age", ascending=False)

8. Joining DataFrames

Inner Join

df_joined = df1.join(df2, df1["id"] == df2["id"], "inner")

Left Join

df_left = df1.join(df2, df1["id"] == df2["id"], "left")

9. Union (Appending DataFrames)

df_combined = df1.union(df2)

10. Exploding Nested Data

from pyspark.sql.functions import explode

df_exploded = df.withColumn("exploded_column", explode(df["nested_column"]))

Conclusion

  • These transformations do not modify the original DataFrame but return a new one.
  • PySpark applies lazy evaluation, meaning transformations are only executed when an action is performed.

Most Useful PySpark DataFrame Functions

PySpark provides many built-in functions for column aliasing, distinct values, transformations, and aggregations. Here’s a collection of the most useful ones:


1. Column Aliasing (alias())

  • Used to rename a column temporarily within a query.
from pyspark.sql.functions import col

df_alias = df.select(col("name").alias("full_name"), col("age"))
df_alias.show()

2. Removing Duplicates (distinct())

  • Removes duplicate rows from the DataFrame.
df_distinct = df.distinct()
df_distinct.show()
  • Count distinct values in a column:
df.select("department").distinct().count()

3. Filtering Data (filter() & where())

  • Using .filter():
df_filtered = df.filter(df["age"] > 25)
  • Using .where() (same as filter but SQL-like syntax):
df_filtered = df.where("age > 25")

4. Column Operations

withColumn() – Create or Modify Columns

from pyspark.sql.functions import lit

df_new = df.withColumn("new_column", lit("default_value"))

cast() – Change Data Type

df_casted = df.withColumn("salary", df["salary"].cast("double"))

5. Aggregations

groupBy() with Aggregations

from pyspark.sql.functions import sum, avg, count

df_grouped = df.groupBy("department").agg(
    sum("salary").alias("total_salary"),
    avg("age").alias("average_age")
)
df_grouped.show()

6. Sorting (orderBy())

df_sorted = df.orderBy("age", ascending=False)
df_sorted.show()

7. Joins

df_joined = df1.join(df2, df1["id"] == df2["id"], "inner")

8. Exploding Nested Data (explode())

from pyspark.sql.functions import explode

df_exploded = df.withColumn("exploded_column", explode(df["nested_column"]))
df_exploded.show()

9. Collecting Rows

rows = df.collect()
for row in rows:
    print(row)

10. Row Numbering & Ranking

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

windowSpec = Window.partitionBy("department").orderBy("salary")

df_ranked = df.withColumn("rank", row_number().over(windowSpec))
df_ranked.show()

Conclusion

  • .alias() is useful for renaming columns temporarily.
  • .distinct() removes duplicates.
  • .filter() and .where() allow conditional selection.
  • .groupBy() and .orderBy() are useful for aggregations and sorting.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, sum, avg, count, explode, row_number
from pyspark.sql.window import Window

# Initialize Spark Session
spark = SparkSession.builder.appName("PySpark_Useful_Functions").getOrCreate()

# Sample Data
data = [(1, "Alice", 25, "HR", 50000),
        (2, "Bob", 30, "IT", 60000),
        (3, "Charlie", 35, "IT", 70000),
        (4, "David", 40, "Finance", 80000),
        (5, "Eve", 45, "Finance", 90000)]
columns = ["ID", "Name", "Age", "Department", "Salary"]

df = spark.createDataFrame(data, columns)

# 1. Alias (Renaming Columns Temporarily)
df_alias = df.select(col("Name").alias("Full_Name"), col("Age"))
df_alias.show()

# 2. Distinct (Remove Duplicates)
df_distinct = df.select("Department").distinct()
df_distinct.show()

# 3. Filtering Data
df_filtered = df.filter((df["Age"] > 30) & (df["Department"] == "IT"))
df_filtered.show()

# 4. Adding & Modifying Columns
df_new = df.withColumn("New_Column", lit("DefaultValue"))
df_casted = df.withColumn("Salary", df["Salary"].cast("double"))
df_new.show()

df_casted.printSchema()

# 5. Aggregations (Sum, Average, Count)
df_grouped = df.groupBy("Department").agg(
    sum("Salary").alias("Total_Salary"),
    avg("Age").alias("Average_Age")
)
df_grouped.show()

# 6. Sorting
df_sorted = df.orderBy("Age", ascending=False)
df_sorted.show()

# 7. Joining DataFrames
extra_data = [(1, "US"), (2, "Canada"), (3, "UK"), (4, "Germany"), (5, "India")]
columns_extra = ["ID", "Country"]
df_extra = spark.createDataFrame(extra_data, columns_extra)
df_joined = df.join(df_extra, "ID", "inner")
df_joined.show()

# 8. Exploding Nested Data
df_nested = df.withColumn("Hobbies", lit("['Reading', 'Sports']"))
df_exploded = df_nested.withColumn("Hobby", explode(lit(["Reading", "Sports"])))
df_exploded.show()

# 9. Collecting Rows
rows = df.collect()
for row in rows:
    print(row)

# 10. Row Numbering & Ranking
windowSpec = Window.partitionBy("Department").orderBy("Salary")
df_ranked = df.withColumn("Rank", row_number().over(windowSpec))
df_ranked.show()

# Stop Spark Session
spark.stop()

Written By

undefined

Related Posts

Useful Code Snippets in Python and Pyspark

#1. create a sample dataframe # create a sample dataframe data = [ (“Sam”,”Sales”, 50000), (“Ram”,”Sales”, 60000), (“Dan”,”Sales”, 70000), (“Gam”,”Marketing”, 40000), (“Ham”,”Marketing”, 55000), (“RAM”,”IT”, 45000), (“Mam”,”IT”, 65000), (“MAM”,”IT”, 75000) ] df =…

Read More

Spark SQL- operators Cheatsheet- Explanation with Usecases

Spark SQL Operators Cheatsheet 1. Arithmetic Operators OperatorSyntaxDescriptionExample+a + bAdds two valuesSELECT 5 + 3;-a – bSubtracts one value from anotherSELECT 5 – 3;*a * bMultiplies two valuesSELECT 5 * 3;/a / bDivides one value by anotherSELECT 6 / 2;%a %…

Read More

Submit a Comment

Your email address will not be published. Required fields are marked *