Window functions in PySpark allow you to perform operations on a subset of your data using a “window” that defines a range of rows. These functions are similar to SQL window functions and are useful for tasks like ranking, cumulative sums, and moving averages. Let’s go through various PySpark DataFrame window functions, compare them with Spark SQL window functions, and provide examples with a large sample dataset.

PySpark’s window functions allow operations across a specified “window” of rows, such as performing aggregations, ranking, or comparisons. The functionality mimics SQL window functions but uses PySpark’s syntax.


Syntax Structure

  1. Define a Window Specification: The Window object specifies how rows are partitioned and ordered for the operation.from pyspark.sql.window import Window window_spec = Window.partitionBy("column1").orderBy("column2")
  2. Apply the Window Function: Use PySpark functions like row_number()rank()dense_rank(), etc., with the window specification.from pyspark.sql.functions import row_number, rank, dense_rank, sum df.withColumn("row_num", row_number().over(window_spec))

Window Specification Options

OptionDescriptionSyntax
partitionBy()Divides the data into partitions for independent calculations.Window.partitionBy("column1")
orderBy()Specifies the order of rows within each partition.Window.orderBy("column2")
rowsBetween()Defines a window frame by rows relative to the current row..rowsBetween(-1, 1)
rangeBetween()Defines a window frame based on the range of values in the ordering column..rangeBetween(-10, 10)
unboundedPrecedingIndicates all rows before the current row in the partition.Window.rowsBetween(Window.unboundedPreceding, 0)
unboundedFollowingIndicates all rows after the current row in the partition.Window.rowsBetween(0, Window.unboundedFollowing)
currentRowRefers to the current row in the partition.Window.rowsBetween(Window.currentRow, Window.currentRow)

Common PySpark Window Functions

FunctionDescription
row_number()Assigns a unique number to each row in a window.
rank()Assigns a rank to each row, with gaps for ties.
dense_rank()Assigns a rank to each row, without gaps for ties.
ntile(n)Divides rows into n buckets and assigns a bucket number to each row.
lead(column, n)Returns the value of the column from n rows ahead of the current row.
lag(column, n)Returns the value of the column from n rows behind the current row.
first()Returns the first value in the window frame.
last()Returns the last value in the window frame.
sum()Computes the sum of the column over the window frame.
avg()Computes the average of the column over the window frame.
max()Returns the maximum value of the column over the window frame.
min()Returns the minimum value of the column over the window frame.
count()Returns the count of rows in the window frame.

Examples

1. Ranking Employees by Salary

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank

data = [(1, "Alice", 5000), (2, "Bob", 6000), (3, "Charlie", 4000), (4, "Alice", 7000)]
columns = ["EmpID", "Name", "Salary"]

df = spark.createDataFrame(data, columns)

window_spec = Window.partitionBy("Name").orderBy("Salary")

df = df.withColumn("row_number", row_number().over(window_spec)) \
       .withColumn("rank", rank().over(window_spec)) \
       .withColumn("dense_rank", dense_rank().over(window_spec))

df.show()

Output:

EmpIDNameSalaryrow_numberrankdense_rank
3Charlie4000111
1Alice5000111
4Alice7000222
2Bob6000111

2. Cumulative Sum

from pyspark.sql.functions import sum

window_spec = Window.partitionBy("Name").orderBy("Salary").rowsBetween(Window.unboundedPreceding, Window.currentRow)

df = df.withColumn("cumulative_sum", sum("Salary").over(window_spec))
df.show()

Output:

EmpIDNameSalarycumulative_sum
3Charlie40004000
1Alice50005000
4Alice700012000
2Bob60006000

Options for Handling NULLs

  1. Exclude NULLs in Order: Use NULLS FIRST or NULLS LAST in orderBy().Window.orderBy(col("Salary").desc().asc_nulls_last())
  2. Filter NULLs in Partition: Use .filter() before applying the window function.df.filter(col("Salary").isNotNull())

Important Notes

  • PartitionBy: Breaks data into logical groups for independent calculations.
  • OrderBy: Determines the order within each partition.
  • Frame Specification: Allows cumulative, rolling, or specific-frame calculations using rowsBetween or rangeBetween

Setting Up the Environment

First, let’s set up the environment and create a sample dataset.

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, rank, dense_rank, percent_rank, ntile, lag, lead, sum, avg

# Initialize Spark session
spark = SparkSession.builder
.appName("PySpark Window Functions")
.getOrCreate()

# Create a sample dataset
data = [(1, "Alice", 1000),
(2, "Bob", 1200),
(3, "Catherine", 1200),
(4, "David", 800),
(5, "Eve", 950),
(6, "Frank", 800),
(7, "George", 1200),
(8, "Hannah", 1000),
(9, "Ivy", 950),
(10, "Jack", 1200)]
columns = ["id", "name", "salary"]

df = spark.createDataFrame(data, schema=columns)
df.show()

PySpark Window Functions

1. Row Number

The row_number function assigns a unique number to each row within a window partition.

windowSpec = Window.partitionBy("salary").orderBy("id")
df.withColumn("row_number", row_number().over(windowSpec)).show()

2. Rank

The rank function provides ranks to rows within a window partition, with gaps in ranking.

df.withColumn("rank", rank().over(windowSpec)).show()

3. Dense Rank

The dense_rank function provides ranks to rows within a window partition, without gaps in ranking.

df.withColumn("dense_rank", dense_rank().over(windowSpec)).show()

4. Percent Rank

The percent_rank function calculates the percentile rank of rows within a window partition.

df.withColumn("percent_rank", percent_rank().over(windowSpec)).show()

5. NTile

The ntile function divides the rows within a window partition into n buckets.

df.withColumn("ntile", ntile(4).over(windowSpec)).show()

6. Lag

The lag function provides access to a row at a given physical offset before the current row within a window partition.

df.withColumn("lag", lag("salary", 1).over(windowSpec)).show()

7. Lead

The lead function provides access to a row at a given physical offset after the current row within a window partition.

df.withColumn("lead", lead("salary", 1).over(windowSpec)).show()

8. Cumulative Sum

The sum function calculates the cumulative sum of values within a window partition.

df.withColumn("cumulative_sum", sum("salary").over(windowSpec)).show()

9. Moving Average

The avg function calculates the moving average of values within a window partition.

df.withColumn("moving_avg", avg("salary").over(windowSpec)).show()


There are multiple ways to apply window functions on DataFrames in PySpark. While withColumn is the most commonly used method to add a new column with a window function, there are other approaches to apply window functions, depending on the specific use case.

Here are different methods for applying window functions:

1. Using select() with window functions

Instead of withColumn(), you can use select() to directly apply a window function to the columns of the DataFrame. This is useful when you only want to return a subset of columns along with the windowed column.

from pyspark.sql.functions import row_number
from pyspark.sql import Window

# Define the window specification
windowSpec = Window.partitionBy("salary").orderBy("id")

# Use select to apply the window function
df.select("id", "salary", row_number().over(windowSpec).alias("row_number")).show()

2. Using agg() with window functions

Window functions can also be applied when performing aggregations (agg()). This is useful when you want to calculate aggregated metrics (e.g., sum, avg) over a window.

from pyspark.sql.functions import sum
from pyspark.sql import Window

# Define the window specification
windowSpec = Window.partitionBy("salary")

# Apply window function during aggregation
df.groupBy("id").agg(sum("salary").over(windowSpec).alias("total_salary")).show()

3. Using filter() or where()

Sometimes, window functions are used in conjunction with filters to extract specific rows, such as filtering the first or last row per partition.

from pyspark.sql.functions import row_number

# Define the window specification
windowSpec = Window.partitionBy("salary").orderBy("id")

# Apply window function and filter based on the rank
ranked_df = df.withColumn("row_number", row_number().over(windowSpec))
ranked_df.filter(ranked_df["row_number"] == 1).show()  # Filter to get the first row per partition

4. Using groupBy() with window functions

Though groupBy() is usually used for aggregations, you can combine it with window functions. Window functions won’t replace groupBy(), but you can apply them after aggregations.

from pyspark.sql.functions import rank

# Define the window specification
windowSpec = Window.partitionBy("salary").orderBy("id")

# First, group by some column and then apply a window function
grouped_df = df.groupBy("salary").count()
grouped_df.withColumn("rank", rank().over(windowSpec)).show()

5. Using withColumnRenamed() with window functions

You can also rename the result of a window function when adding it as a new column.

from pyspark.sql.functions import row_number

# Define the window specification
windowSpec = Window.partitionBy("salary").orderBy("id")

# Apply the window function and rename the column
df.withColumn("row_number", row_number().over(windowSpec)).withColumnRenamed("row_number", "rank").show()

6. Combining multiple window functions in one step

You can apply multiple window functions in a single step using either select() or withColumn().

from pyspark.sql.functions import row_number, rank

# Define the window specification
windowSpec = Window.partitionBy("salary").orderBy("id")

# Apply multiple window functions
df.select("id", "salary",
          row_number().over(windowSpec).alias("row_number"),
          rank().over(windowSpec).alias("rank")
         ).show()

Comparison with Spark SQL Window Functions

All the above operations can also be performed using Spark SQL. Here are the equivalent SQL queries:

1. Row Number

SELECT id, name, salary,
       ROW_NUMBER() OVER (PARTITION BY salary ORDER BY id) AS row_number
FROM df

2. Rank

SELECT id, name, salary,
       RANK() OVER (PARTITION BY salary ORDER BY id) AS rank
FROM df

3. Dense Rank

SELECT id, name, salary,
       DENSE_RANK() OVER (PARTITION BY salary ORDER BY id) AS dense_rank
FROM df

4. Percent Rank

SELECT id, name, salary,
       PERCENT_RANK() OVER (PARTITION BY salary ORDER BY id) AS percent_rank
FROM df

5. NTile

SELECT id, name, salary,
       NTILE(4) OVER (PARTITION BY salary ORDER BY id) AS ntile
FROM df

6. Lag

SELECT id, name, salary,
       LAG(salary, 1) OVER (PARTITION BY salary ORDER BY id) AS lag
FROM df

7. Lead

SELECT id, name, salary,
       LEAD(salary, 1) OVER (PARTITION BY salary ORDER BY id) AS lead
FROM df

8. Cumulative Sum

SELECT id, name, salary,
       SUM(salary) OVER (PARTITION BY salary ORDER BY id ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_sum
FROM df

9. Moving Average

SELECT id, name, salary,
AVG(salary) OVER (PARTITION BY salary ORDER BY id ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS moving_avg
FROM df

Head to the Next Page

Pages: 1 2

Pages ( 1 of 2 ): 1 2Next »

Discover more from AI HitsToday

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

About the HintsToday

AI HintsToday is One Stop Adda to learn All about AI, Data, ML, Stat Learning, SAS, SQL, Python, Pyspark. AHT is Future!

Explore the Posts

Latest Comments

Latest posts

Discover more from AI HitsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading