Codes for Revising Pyspark Windows
# Sample data
data = [
(1, "Alice", 10, 8000, "New York"),
(2, "Bob", 11, 9000, "New York"),
(3, "Charlie", 10, 10000, "Chicago"),
(4, "David", 12, 9000, "New York"),
(6, "Eve", 13, 9000, "Chicago"),
(7, "GEve", 13, 10000, "Chicago"),
(8, "REve", 13, 5000, "Chicago"),
(9, "ScEve", 14, 5600, "LA"),
(10, "DEve", 15, 11000, "LA"),
(11, "Ram", 14, 11000, "LA"),
(12, "Hem", 10, 8000, "LA"),
(13, "Hith", 11, 6000, "Chicago"),
(14, "Pit", 15, 13000, "Chicago"),
(15, "Evelyn", 15, 14000, "New York"),
(16, "FteEve", 12, 9200, "New York"),
(17, "sctuEve", 12, None, "Chicago"),
]
# Define schema
columns = ["EmpID", "Emp_name", "Manager_id", "Salary", "Location"]
df = spark.createDataFrame(data, schema=columns)
df.show()
from pyspark.sql.functions import row_number, rank, dense_rank
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, col
wf=Window.partitionBy("Location").orderBy("Salary")
# Calculate row_number, rank, and dense_rank separately
row_number_col = row_number().over(wf).alias("row_number")
rank_col = rank().over(wf).alias("rank")
dense_rank_col = dense_rank().over(wf).alias("dense_rank")
# Select columns including calculated window function results
df.select(
"EmpID",
"Emp_name",
"Manager_id",
"salary",
"Location",
row_number_col,
rank_col,
dense_rank_col
).show()
df.select(
"EmpID",
"Emp_name",
"Manager_id",
"salary",
"Location",
row_number().over(wf).alias("row_number"),
rank().over(wf).alias("rank"),
dense_rank().over(wf).alias("dense_rank")
).show()
#Using withColumn with window functions
df.withColumn("row_number", row_number().over(wf))
.withColumn("rank", rank().over(wf))
.withColumn("dense_rank", dense_rank().over(wf))
.show()
#Using selectExpr with window functions
df.selectExpr(
"EmpID",
"Emp_name",
"Manager_id",
"salary",
"Location",
"row_number() OVER (PARTITION BY Location ORDER BY Salary) AS row_number", # Define window here
"rank() OVER (PARTITION BY Location ORDER BY Salary) AS rank", # Define window here
"dense_rank() OVER (PARTITION BY Location ORDER BY Salary) AS dense_rank" # Define window here
).show()
#Using withColumn with window functions and chaining
df.withColumn("row_number", row_number().over(wf))
.withColumn("rank", rank().over(wf))
.withColumn("dense_rank", dense_rank().over(wf))
.drop("salary")
.filter(col("row_number") == 1)
.show()
df.createOrReplaceTempView("dfview")
spark.sql(""" select EmpID,Emp_name,Manager_id,salary,Location,row_number() OVER (PARTITION BY Location ORDER BY Salary) AS row_number,
rank() OVER (PARTITION BY Location ORDER BY Salary) AS rank,dense_rank() OVER (PARTITION BY Location ORDER BY Salary) AS dense_rank
from dfview """ ) .show()
spark.sql("""
SELECT EmpID, Emp_name, Manager_id, salary, Location,
row_number() OVER w AS row_number,
rank() OVER w AS rank,
dense_rank() OVER w AS dense_rank
FROM dfview
WINDOW w AS (PARTITION BY Location ORDER BY Salary)
""").show()
Discover more from HintsToday
Subscribe to get the latest posts sent to your email.