Codes for Revising Pyspark Windows


# Sample data
data = [
    (1, "Alice", 10, 8000, "New York"),
    (2, "Bob", 11, 9000, "New York"),
    (3, "Charlie", 10, 10000, "Chicago"),
    (4, "David", 12, 9000, "New York"),
    (6, "Eve", 13, 9000, "Chicago"),
    (7, "GEve", 13, 10000, "Chicago"),
    (8, "REve", 13, 5000, "Chicago"),
    (9, "ScEve", 14, 5600, "LA"),
    (10, "DEve", 15, 11000, "LA"),
    (11, "Ram", 14, 11000, "LA"),
    (12, "Hem", 10, 8000, "LA"),
    (13, "Hith", 11, 6000, "Chicago"),
    (14, "Pit", 15, 13000, "Chicago"),
    (15, "Evelyn", 15, 14000, "New York"),
    (16, "FteEve", 12, 9200, "New York"),
    (17, "sctuEve", 12, None, "Chicago"),
]

# Define schema
columns = ["EmpID", "Emp_name", "Manager_id", "Salary", "Location"]

df = spark.createDataFrame(data, schema=columns)
df.show()

from pyspark.sql.functions import row_number, rank, dense_rank
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, col

wf=Window.partitionBy("Location").orderBy("Salary")

# Calculate row_number, rank, and dense_rank separately
row_number_col = row_number().over(wf).alias("row_number")
rank_col = rank().over(wf).alias("rank")
dense_rank_col = dense_rank().over(wf).alias("dense_rank")

# Select columns including calculated window function results
df.select(
    "EmpID", 
    "Emp_name",
    "Manager_id",
    "salary",
    "Location", 
    row_number_col, 
    rank_col, 
    dense_rank_col
).show()


df.select(
    "EmpID", 
    "Emp_name",
    "Manager_id",
    "salary",
    "Location", 
    row_number().over(wf).alias("row_number"),
    rank().over(wf).alias("rank"),
    dense_rank().over(wf).alias("dense_rank")
).show()


#Using withColumn with window functions


df.withColumn("row_number", row_number().over(wf)) 
  .withColumn("rank", rank().over(wf)) 
  .withColumn("dense_rank", dense_rank().over(wf)) 
  .show()

#Using selectExpr with window functions

df.selectExpr(
    "EmpID",
    "Emp_name",
    "Manager_id",
    "salary",
    "Location",
    "row_number() OVER (PARTITION BY Location ORDER BY Salary) AS row_number",  # Define window here
    "rank() OVER (PARTITION BY Location ORDER BY Salary) AS rank",          # Define window here
    "dense_rank() OVER (PARTITION BY Location ORDER BY Salary) AS dense_rank"  # Define window here
).show()

#Using withColumn with window functions and chaining

df.withColumn("row_number", row_number().over(wf)) 
  .withColumn("rank", rank().over(wf)) 
  .withColumn("dense_rank", dense_rank().over(wf)) 
  .drop("salary") 
  .filter(col("row_number") == 1) 
  .show()

df.createOrReplaceTempView("dfview")
spark.sql(""" select EmpID,Emp_name,Manager_id,salary,Location,row_number() OVER (PARTITION BY Location ORDER BY Salary) AS row_number,
          rank() OVER (PARTITION BY Location ORDER BY Salary) AS rank,dense_rank() OVER (PARTITION BY Location ORDER BY Salary) AS dense_rank
           from dfview """ ) .show()


spark.sql("""
SELECT EmpID, Emp_name, Manager_id, salary, Location,
       row_number() OVER w AS row_number,
       rank() OVER w AS rank,
       dense_rank() OVER w AS dense_rank
FROM dfview
WINDOW w AS (PARTITION BY Location ORDER BY Salary)
""").show()           

Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Pages ( 4 of 4 ): « Previous123 4

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading