Window functions in PySpark allow you to perform operations on a subset of your data using a “window” that defines a range of rows. These functions are similar to SQL window functions and are useful for tasks like ranking, cumulative sums, and moving averages. Let’s go through various PySpark DataFrame window functions, compare them with Spark SQL window functions, and provide examples with a large sample dataset.
PySpark’s window functions allow operations across a specified “window” of rows, such as performing aggregations, ranking, or comparisons. The functionality mimics SQL window functions but uses PySpark’s syntax.
Syntax Structure
Define a Window Specification: The Window
object specifies how rows are partitioned and ordered for the operation.
from pyspark.sql.window import Window
window_spec = Window.partitionBy("column1").orderBy("column2")
Apply the Window Function: Use PySpark functions like row_number()
, rank()
, dense_rank()
, etc., with the window specification.
from pyspark.sql.functions import row_number, rank, dense_rank, sum
df.withColumn("row_num", row_number().over(window_spec))
Window Specification Options
Option | Description | Syntax |
---|---|---|
partitionBy() | Divides the data into partitions for independent calculations. | Window.partitionBy("column1") |
orderBy() | Specifies the order of rows within each partition. | Window.orderBy("column2") |
rowsBetween() | Defines a window frame by rows relative to the current row. | .rowsBetween(-1, 1) |
rangeBetween() | Defines a window frame based on the range of values in the ordering column. | .rangeBetween(-10, 10) |
unboundedPreceding | Indicates all rows before the current row in the partition. | Window.rowsBetween(Window.unboundedPreceding, 0) |
unboundedFollowing | Indicates all rows after the current row in the partition. | Window.rowsBetween(0, Window.unboundedFollowing) |
currentRow | Refers to the current row in the partition. | Window.rowsBetween(Window.currentRow, Window.currentRow) |
Common PySpark Window Functions
Function | Description |
---|---|
row_number() | Assigns a unique number to each row in a window. |
rank() | Assigns a rank to each row, with gaps for ties. |
dense_rank() | Assigns a rank to each row, without gaps for ties. |
ntile(n) | Divides rows into n buckets and assigns a bucket number to each row. |
lead(column, n) | Returns the value of the column from n rows ahead of the current row. |
lag(column, n) | Returns the value of the column from n rows behind the current row. |
first() | Returns the first value in the window frame. |
last() | Returns the last value in the window frame. |
sum() | Computes the sum of the column over the window frame. |
avg() | Computes the average of the column over the window frame. |
max() | Returns the maximum value of the column over the window frame. |
min() | Returns the minimum value of the column over the window frame. |
count() | Returns the count of rows in the window frame. |
When using the RANK
window function in Spark SQL or DataFrame API, if there are duplicates within a partition, the behavior is as follows:
- The
RANK
function assigns the same rank to duplicate values. - The next rank value is skipped. For example, if two rows have the same value and are assigned rank 1, the next row will be assigned rank 3.
Here’s an example:
+--------+-------+
| value | rank |
+--------+-------+
| 10 | 1 |
| 10 | 1 |
| 9 | 3 |
| 8 | 4 |
+--------+-------+
In contrast, the DENSE_RANK
function does not skip rank values. If there are duplicates, the next rank value will be consecutive.
+--------+-----------+
| value | dense_rank|
+--------+-----------+
| 10 | 1 |
| 10 | 1 |
| 9 | 2 |
| 8 | 3 |
+--------+-----------+
Examples
1. Ranking Employees by Salary
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank
data = [(1, "Alice", 5000), (2, "Bob", 6000), (3, "Charlie", 4000), (4, "Alice", 7000)]
columns = ["EmpID", "Name", "Salary"]
df = spark.createDataFrame(data, columns)
window_spec = Window.partitionBy("Name").orderBy("Salary")
df = df.withColumn("row_number", row_number().over(window_spec))
.withColumn("rank", rank().over(window_spec))
.withColumn("dense_rank", dense_rank().over(window_spec))
df.show()
Output:
EmpID | Name | Salary | row_number | rank | dense_rank |
---|---|---|---|---|---|
3 | Charlie | 4000 | 1 | 1 | 1 |
1 | Alice | 5000 | 1 | 1 | 1 |
4 | Alice | 7000 | 2 | 2 | 2 |
2 | Bob | 6000 | 1 | 1 | 1 |
2. Cumulative Sum
from pyspark.sql.functions import sum
window_spec = Window.partitionBy("Name").orderBy("Salary").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn("cumulative_sum", sum("Salary").over(window_spec))
df.show()
Output:
EmpID | Name | Salary | cumulative_sum |
---|---|---|---|
3 | Charlie | 4000 | 4000 |
1 | Alice | 5000 | 5000 |
4 | Alice | 7000 | 12000 |
2 | Bob | 6000 | 6000 |
Options for Handling NULLs
- Exclude NULLs in Order: Use
NULLS FIRST
orNULLS LAST
inorderBy()
.Window.orderBy(col("Salary").desc().asc_nulls_last())
- Filter NULLs in Partition: Use
.filter()
before applying the window function.df.filter(col("Salary").isNotNull())
Important Notes
- PartitionBy: Breaks data into logical groups for independent calculations.
- OrderBy: Determines the order within each partition.
- Frame Specification: Allows cumulative, rolling, or specific-frame calculations using
rowsBetween
orrangeBetween
Setting Up the Environment
First, let’s set up the environment and create a sample dataset.
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, rank, dense_rank, percent_rank, ntile, lag, lead, sum, avg
# Initialize Spark session
spark = SparkSession.builder
.appName("PySpark Window Functions")
.getOrCreate()
# Create a sample dataset
data = [(1, "Alice", 1000),
(2, "Bob", 1200),
(3, "Catherine", 1200),
(4, "David", 800),
(5, "Eve", 950),
(6, "Frank", 800),
(7, "George", 1200),
(8, "Hannah", 1000),
(9, "Ivy", 950),
(10, "Jack", 1200)]
columns = ["id", "name", "salary"]
df = spark.createDataFrame(data, schema=columns)
df.show()
PySpark Window Functions
1. Row Number
The row_number
function assigns a unique number to each row within a window partition.
windowSpec = Window.partitionBy("salary").orderBy("id")
df.withColumn("row_number", row_number().over(windowSpec)).show()
2. Rank
The rank
function provides ranks to rows within a window partition, with gaps in ranking.
df.withColumn("rank", rank().over(windowSpec)).show()
3. Dense Rank
The dense_rank
function provides ranks to rows within a window partition, without gaps in ranking.
df.withColumn("dense_rank", dense_rank().over(windowSpec)).show()
4. Percent Rank
The percent_rank
function calculates the percentile rank of rows within a window partition.
df.withColumn("percent_rank", percent_rank().over(windowSpec)).show()
5. NTile
The ntile
function divides the rows within a window partition into n
buckets.
df.withColumn("ntile", ntile(4).over(windowSpec)).show()
6. Lag
The lag
function provides access to a row at a given physical offset before the current row within a window partition.
df.withColumn("lag", lag("salary", 1).over(windowSpec)).show()
7. Lead
The lead
function provides access to a row at a given physical offset after the current row within a window partition.
df.withColumn("lead", lead("salary", 1).over(windowSpec)).show()
8. Cumulative Sum
The sum
function calculates the cumulative sum of values within a window partition.
df.withColumn("cumulative_sum", sum("salary").over(windowSpec)).show()
9. Moving Average
The avg
function calculates the moving average of values within a window partition.
df.withColumn("moving_avg", avg("salary").over(windowSpec)).show()
Discover more from HintsToday
Subscribe to get the latest posts sent to your email.