Large Sample Dataset Example

Let’s create a larger dataset and apply window functions.

import random

# Create a larger dataset
large_data = [(i, f"Name_{i}", random.choice([1000, 1200, 950, 800])) for i in range(1, 101)]
large_df = spark.createDataFrame(large_data, schema=columns)
large_df.show(10)

# Apply window functions
large_windowSpec = Window.partitionBy("salary").orderBy("id")

large_df.withColumn("row_number", row_number().over(large_windowSpec)).show(10)
large_df.withColumn("rank", rank().over(large_windowSpec)).show(10)
large_df.withColumn("dense_rank", dense_rank().over(large_windowSpec)).show(10)
large_df.withColumn("percent_rank", percent_rank().over(large_windowSpec)).show(10)
large_df.withColumn("ntile", ntile(4).over(large_windowSpec)).show(10)
large_df.withColumn("lag", lag("salary", 1).over(large_windowSpec)).show(10)
large_df.withColumn("lead", lead("salary", 1).over(large_windowSpec)).show(10)
large_df.withColumn("cumulative_sum", sum("salary").over(large_windowSpec)).show(10)
large_df.withColumn("moving_avg", avg("salary").over(large_windowSpec)).show(10)

Window functions in PySpark and Spark SQL are powerful tools for data analysis. They allow you to perform complex calculations and transformations on subsets of your data, similar to SQL window functions. By using window functions, you can easily implement features like ranking, cumulative sums, and moving averages in your PySpark applications.

Examples:-

1.pyspark dataframes Remove duplicates based on specific columns and then order by different columns

To remove duplicates from a PySpark DataFrame based on specific columns and order the remaining rows by different columns, you can use a combination of the dropDuplicates() function and the orderBy() (or sort()) function.

Here is an example that demonstrates this process:

  1. Remove duplicates based on specific columns.
  2. Order the resulting DataFrame by different columns.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize SparkSession
spark = SparkSession.builder.appName("RemoveDuplicatesAndOrder").getOrCreate()

# Sample data
data = [
(1, "Alice", 29),
(2, "Bob", 30),
(3, "Alice", 29),
(4, "David", 35),
(5, "Alice", 25)
]

# Create DataFrame
columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)

# Show the original DataFrame
print("Original DataFrame:")
df.show()

# Step 1: Remove duplicates based on specific columns (e.g., "name", "age")
df_no_duplicates = df.dropDuplicates(["name", "age"])

# Step 2: Order the resulting DataFrame by different columns (e.g., "age" in descending order)
df_ordered = df_no_duplicates.orderBy(col("age").desc())

# Show the resulting DataFrame
print("DataFrame after removing duplicates and ordering:")
df_ordered.show()

# Stop SparkSession
spark.stop()

Explanation:

  1. Initialization and Data Preparation:
    • A SparkSession is created.
    • Sample data is provided, and a DataFrame is created from this data.
  2. Removing Duplicates:
    • The dropDuplicates() function is used to remove rows that have the same values in the specified columns ("name" and "age" in this case).
  3. Ordering Data:
    • The orderBy() function is used to sort the DataFrame by the specified columns. In this case, the DataFrame is ordered by "age" in descending order.
  4. Displaying Results:
    • The original and resulting DataFrames are displayed using the show() function.

Example Output:

Original DataFrame:
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|Alice| 29|
| 2| Bob| 30|
| 3|Alice| 29|
| 4|David| 35|
| 5|Alice| 25|
+---+-----+---+

DataFrame after removing duplicates and ordering:
+---+-----+---+
| id| name|age|
+---+-----+---+
| 4|David| 35|
| 2| Bob| 30|
| 1|Alice| 29|
| 5|Alice| 25|
+---+-----+---+

Additional Notes:

  • dropDuplicates(): This function removes duplicate rows based on the specified columns. If no columns are specified, it removes rows that are identical in all columns.
  • orderBy()/sort(): These functions are used to order the DataFrame. You can specify multiple columns and the sorting order (ascending or descending) for each column.
  • You can chain multiple DataFrame operations together. For example, you can combine dropDuplicates() and orderBy() in a single statement: df.dropDuplicates(["name", "age"]).orderBy(col("age").desc())

2.pyspark dataframes Remove duplicates based on specific columns while ordering by different columns

To remove duplicates from a PySpark DataFrame based on specific columns while ensuring the ordering of the data based on other columns, you can use the window functions in PySpark. This approach allows you to specify how to handle duplicates and maintain the desired order.

Below is an example to demonstrate how to remove duplicates based on specific columns (name, age) while ordering the rows by different columns (age descending and id ascending):

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

# Initialize SparkSession
spark = SparkSession.builder.appName("RemoveDuplicatesWithOrdering").getOrCreate()

# Sample data
data = [
    (1, "Alice", 29),
    (2, "Bob", 30),
    (3, "Alice", 29),
    (4, "David", 35),
    (5, "Alice", 25)
]

# Create DataFrame
columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)

# Show the original DataFrame
print("Original DataFrame:")
df.show()

# Define a window specification
windowSpec = Window.partitionBy("name", "age").orderBy(col("age").desc(), col("id").asc())

# Add a row number to each partition
df_with_row_number = df.withColumn("row_number", row_number().over(windowSpec))

# Filter the rows to keep only the first occurrence of each combination of name and age
df_no_duplicates = df_with_row_number.filter(col("row_number") == 1).drop("row_number")

# Show the resulting DataFrame
print("DataFrame after removing duplicates and ordering:")
df_no_duplicates.show()

# Stop SparkSession
spark.stop()

Explanation:

  1. Initialization and Data Preparation:
    • A SparkSession is created.
    • Sample data is provided, and a DataFrame is created from this data.
  2. Define Window Specification:
    • A window specification is defined using Window.partitionBy("name", "age").orderBy(col("age").desc(), col("id").asc()). This specifies that the data should be partitioned by the columns name and age and ordered within each partition by age in descending order and id in ascending order.
  3. Add Row Number:
    • The row_number() function is used to add a row number to each row within the specified window. This row number helps to identify and keep only the first occurrence of each partition.
  4. Filter Rows:
    • The DataFrame is filtered to keep only the rows where the row_number is 1. This effectively removes duplicates while maintaining the specified order.
  5. Display Results:
    • The original and resulting DataFrames are displayed using the show() function.

Example Output:

Original DataFrame:
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|Alice| 29|
| 2| Bob| 30|
| 3|Alice| 29|
| 4|David| 35|
| 5|Alice| 25|
+---+-----+---+

DataFrame after removing duplicates and ordering:
+---+-----+---+
| id| name|age|
+---+-----+---+
| 4|David| 35|
| 2| Bob| 30|
| 1|Alice| 29|
| 5|Alice| 25|
+---+-----+---+

Notes:

  • Window Specification: The Window specification defines how the data should be partitioned and ordered.
  • Row Number: The row_number() function assigns a unique row number within each window partition.
  • Filter and Drop: The resulting DataFrame is filtered to keep only the rows where the row_number is 1, and the temporary row_number column is dropped.

This approach ensures that duplicates are removed based on the specified columns while maintaining the desired order of the data. You can adjust the partitioning and ordering columns according to your specific requirements.


Codes for Revising Pyspark Windows


# Sample data
data = [
    (1, "Alice", 10, 8000, "New York"),
    (2, "Bob", 11, 9000, "New York"),
    (3, "Charlie", 10, 10000, "Chicago"),
    (4, "David", 12, 9000, "New York"),
    (6, "Eve", 13, 9000, "Chicago"),
    (7, "GEve", 13, 10000, "Chicago"),
    (8, "REve", 13, 5000, "Chicago"),
    (9, "ScEve", 14, 5600, "LA"),
    (10, "DEve", 15, 11000, "LA"),
    (11, "Ram", 14, 11000, "LA"),
    (12, "Hem", 10, 8000, "LA"),
    (13, "Hith", 11, 6000, "Chicago"),
    (14, "Pit", 15, 13000, "Chicago"),
    (15, "Evelyn", 15, 14000, "New York"),
    (16, "FteEve", 12, 9200, "New York"),
    (17, "sctuEve", 12, None, "Chicago"),
]

# Define schema
columns = ["EmpID", "Emp_name", "Manager_id", "Salary", "Location"]

df = spark.createDataFrame(data, schema=columns)
df.show()

from pyspark.sql.functions import row_number, rank, dense_rank
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, col

wf=Window.partitionBy("Location").orderBy("Salary")

# Calculate row_number, rank, and dense_rank separately
row_number_col = row_number().over(wf).alias("row_number")
rank_col = rank().over(wf).alias("rank")
dense_rank_col = dense_rank().over(wf).alias("dense_rank")

# Select columns including calculated window function results
df.select(
    "EmpID", 
    "Emp_name",
    "Manager_id",
    "salary",
    "Location", 
    row_number_col, 
    rank_col, 
    dense_rank_col
).show()


df.select(
    "EmpID", 
    "Emp_name",
    "Manager_id",
    "salary",
    "Location", 
    row_number().over(wf).alias("row_number"),
    rank().over(wf).alias("rank"),
    dense_rank().over(wf).alias("dense_rank")
).show()


#Using withColumn with window functions


df.withColumn("row_number", row_number().over(wf)) \
  .withColumn("rank", rank().over(wf)) \
  .withColumn("dense_rank", dense_rank().over(wf)) \
  .show()

#Using selectExpr with window functions

df.selectExpr(
    "EmpID",
    "Emp_name",
    "Manager_id",
    "salary",
    "Location",
    "row_number() OVER (PARTITION BY Location ORDER BY Salary) AS row_number",  # Define window here
    "rank() OVER (PARTITION BY Location ORDER BY Salary) AS rank",          # Define window here
    "dense_rank() OVER (PARTITION BY Location ORDER BY Salary) AS dense_rank"  # Define window here
).show()

#Using withColumn with window functions and chaining

df.withColumn("row_number", row_number().over(wf)) \
  .withColumn("rank", rank().over(wf)) \
  .withColumn("dense_rank", dense_rank().over(wf)) \
  .drop("salary") \
  .filter(col("row_number") == 1) \
  .show()

df.createOrReplaceTempView("dfview")
spark.sql(""" select EmpID,Emp_name,Manager_id,salary,Location,row_number() OVER (PARTITION BY Location ORDER BY Salary) AS row_number,
          rank() OVER (PARTITION BY Location ORDER BY Salary) AS rank,dense_rank() OVER (PARTITION BY Location ORDER BY Salary) AS dense_rank
           from dfview """ ) .show()


spark.sql("""
SELECT EmpID, Emp_name, Manager_id, salary, Location,
       row_number() OVER w AS row_number,
       rank() OVER w AS rank,
       dense_rank() OVER w AS dense_rank
FROM dfview
WINDOW w AS (PARTITION BY Location ORDER BY Salary)
""").show()           

Pages: 1 2

Pages ( 2 of 2 ): « Previous1 2

Discover more from AI HitsToday

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

About the HintsToday

AI HintsToday is One Stop Adda to learn All about AI, Data, ML, Stat Learning, SAS, SQL, Python, Pyspark. AHT is Future!

Explore the Posts

Latest Comments

Latest posts

Discover more from AI HitsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading