Spark SQL windows Function and Best Usecases

Integration in PySpark

Spark SQL window functions can also be used within PySpark via .selectExpr():

df.selectExpr("EmpID", 
"Salary",
"AVG(Salary) OVER (PARTITION BY Department ORDER BY EmpID) AS avg_salary").show()

Best Use Cases for Window Functions

1. Running Totals and Moving Averages

Calculate running totals or moving averages over a specified window of rows.

SELECT
    customer_id,
    order_date,
    amount,
    SUM(amount) OVER (PARTITION BY customer_id ORDER BY order_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_total
FROM orders;

2. Ranking

Assign ranks to rows based on the values in a specific column.

SELECT
    customer_id,
    order_date,
    amount,
    RANK() OVER (PARTITION BY customer_id ORDER BY amount DESC) AS rank
FROM orders;

3. Time-Series Analysis

Perform operations like lead and lag for time-series data.

SELECT
    customer_id,
    order_date,
    amount,
    LAG(amount, 1) OVER (PARTITION BY customer_id ORDER BY order_date) AS previous_amount,
    LEAD(amount, 1) OVER (PARTITION BY customer_id ORDER BY order_date) AS next_amount
FROM orders;

4. Percentile and Distribution Analysis

Compute percentiles and cumulative distributions.

SELECT
    customer_id,
    order_date,
    amount,
    PERCENT_RANK() OVER (PARTITION BY customer_id ORDER BY amount) AS percent_rank,
    CUME_DIST() OVER (PARTITION BY customer_id ORDER BY amount) AS cumulative_distribution
FROM orders;

5. Identifying First and Last Records in Groups

Identify the first and last records within each group.

SELECT
    customer_id,
    order_date,
    amount,
    FIRST_VALUE(amount) OVER (PARTITION BY customer_id ORDER BY order_date) AS first_amount,
    LAST_VALUE(amount) OVER (PARTITION BY customer_id ORDER BY order_date) AS last_amount
FROM orders;

6.For SAS programmers:-How to translate sas first. and last. in spark sql?

In Spark SQL, there’s no direct equivalent of SAS FIRST. and LAST. variables. However, you can achieve similar functionality using window functions like row_number() or first_value() and last_value() depending on your specific needs.

Here’s how you can translate the concept in Spark SQL:

1. Using row_number():

This approach assigns a unique row number within each group defined by the BY variable(s). You can then use conditional logic to identify the first and last rows based on the row number.

SELECT *,
       CASE WHEN row_number() OVER (PARTITION BY variable1, variable2 ORDER BY sort_column) = 1 THEN 1 ELSE 0 END AS is_first,
       CASE WHEN row_number() OVER (PARTITION BY variable1, variable2 ORDER BY sort_column) = COUNT(*) OVER (PARTITION BY variable1, variable2) THEN 1 ELSE 0 END AS is_last
FROM your_data
ORDER BY variable1, variable2, sort_column;

Explanation:

  • We use row_number() with a PARTITION BY clause based on your BY variable(s) and an ORDER BY clause to define the order within each group (replace sort_column with the actual column for sorting).
  • The CASE statement assigns 1 to the first row (where row_number is 1) and 0 to others within the group for is_first.
  • Similarly, the second CASE statement identifies the last row by checking if the row_number is equal to the total count of rows within the group (using COUNT(*) OVER (PARTITION BY...)).

2. Using first_value() and last_value():

These functions can be used if you only need the values of specific columns for the first and last rows in each group.

SELECT *,
       first_value(column_to_get) OVER (PARTITION BY variable1, variable2 ORDER BY sort_column) AS first_value,
       last_value(column_to_get) OVER (PARTITION BY variable1, variable2 ORDER BY sort_column) AS last_value
FROM your_data
ORDER BY variable1, variable2, sort_column;

Explanation:

  • first_value() and last_value() are used within the SELECT clause with a similar windowing definition as before.
  • They return the value of the specified column_to_get for the first and last rows within each group, respectively.

Choosing the Right Approach:

  • If you need additional information beyond just identifying the first and last rows (like row numbers), use row_number().
  • If you only need the values of specific columns for the first and last rows, use first_value() and last_value().

Remember:

  • Ensure your data is sorted by the BY variable(s) before applying these window functions.
  • Adjust the windowing specifications (partitioning and ordering) based on your actual requirements.

By leveraging these window functions in Spark SQL, you can effectively achieve functionalities similar to SAS FIRST. and LAST. variables for identifying and processing the first and last observations within groups defined by your BY variable(s).

Scenario

Assume we have a dataset with columns group1, group2, order1, and order2, and we want to identify the first and last occurrence of each group based on the combined ordering of order1 and order2.

SAS Code Example

data example;
set example;
by group1 group2 order1 order2;
if first.group1 then first_group1 = 1;
else first_group1 = 0;
if last.group1 then last_group1 = 1;
else last_group1 = 0;
if first.group2 then first_group2 = 1;
else first_group2 = 0;
if last.group2 then last_group2 = 1;
else last_group2 = 0;
run;

Equivalent in Spark SQL

To translate this logic into Spark SQL, you can use the ROW_NUMBER function along with window functions to identify the first and last occurrence of each group.

Spark SQL Code Example

  1. Create the DataFrame and Register as a Temporary View:
    • This step involves creating a DataFrame and registering it as a temporary view for SQL operations.
  2. Define the Window Specification:
    • The window specification is partitioned by the grouping variables and ordered by the ordering variables.
  3. Identify the First and Last Occurrences:
    • Use ROW_NUMBER with appropriate ordering to mark the first and last occurrences.

Here’s how you can achieve this:

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("SAS First Last Equivalent").getOrCreate()

# Sample data
data = [
("group1", "subgroup1", "2023-01-01", "10:00", 100),
("group1", "subgroup1", "2023-01-01", "10:30", 200),
("group1", "subgroup2", "2023-01-01", "11:00", 300),
("group2", "subgroup1", "2023-01-01", "09:00", 150),
("group2", "subgroup1", "2023-01-01", "09:30", 250),
("group2", "subgroup2", "2023-01-02", "08:00", 130),
("group2", "subgroup2", "2023-01-02", "08:30", 170)
]

columns = ["group1", "group2", "order1", "order2", "amount"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Register the DataFrame as a temporary view
df.createOrReplaceTempView("example")

# Use Spark SQL to identify first and last occurrences
sql_query = """
WITH RankedData AS (
SELECT
group1,
group2,
order1,
order2,
amount,
ROW_NUMBER() OVER (PARTITION BY group1 ORDER BY order1, order2) AS rn_group1_asc,
ROW_NUMBER() OVER (PARTITION BY group1 ORDER BY order1 DESC, order2 DESC) AS rn_group1_desc,
ROW_NUMBER() OVER (PARTITION BY group2 ORDER BY order1, order2) AS rn_group2_asc,
ROW_NUMBER() OVER (PARTITION BY group2 ORDER BY order1 DESC, order2 DESC) AS rn_group2_desc
FROM example
)
SELECT
group1,
group2,
order1,
order2,
amount,
CASE WHEN rn_group1_asc = 1 THEN 1 ELSE 0 END AS first_group1,
CASE WHEN rn_group1_desc = 1 THEN 1 ELSE 0 END AS last_group1,
CASE WHEN rn_group2_asc = 1 THEN 1 ELSE 0 END AS first_group2,
CASE WHEN rn_group2_desc = 1 THEN 1 ELSE 0 END AS last_group2
FROM RankedData
"""

# Execute the SQL query
result_df = spark.sql(sql_query)

# Show the result
result_df.show()

Explanation

  1. RankedData Common Table Expression (CTE):
    • We create a CTE called RankedData where we calculate row numbers for each group based on the specified order.
    • ROW_NUMBER() OVER (PARTITION BY group1 ORDER BY order1, order2) assigns a row number to each row within group1 ordered by order1 and order2.
    • ROW_NUMBER() OVER (PARTITION BY group1 ORDER BY order1 DESC, order2 DESC) assigns a reverse row number to identify the last occurrence within group1.
    • The same logic is applied for group2.
  2. Select with CASE Statements:
    • We select from the RankedData CTE and use CASE statements to mark the first and last occurrences for each group.

Sample Output

The resulting DataFrame will include flags indicating the first and last occurrences for each group:

+-------+---------+----------+-------+------+------------+-----------+------------+-----------+
| group1| group2 | order1 | order2|amount|first_group1|last_group1|first_group2|last_group2|
+-------+---------+----------+-------+------+------------+-----------+------------+-----------+
| group1|subgroup1|2023-01-01| 10:00 | 100| 1| 0| 1| 0|
| group1|subgroup1|2023-01-01| 10:30 | 200| 0| 0| 0| 1|
| group1|subgroup2|2023-01-01| 11:00 | 300| 0| 1| 1| 1|
| group2|subgroup1|2023-01-01| 09:00 | 150| 1| 0| 1| 0|
| group2|subgroup1|2023-01-01| 09:30 | 250| 0| 1| 0| 1|
| group2|subgroup2|2023-01-02| 08:00 | 130| 0| 0| 1| 0|
| group2|subgroup2|2023-01-02| 08:30 | 170| 0| 1| 0| 1|
+-------+---------+----------+-------+------+------------+-----------+------------+-----------+

Examples from Official Spark SQL Doc:-
CREATE TABLE employees (name STRING, dept STRING, salary INT, age INT);

INSERT INTO employees VALUES ("Lisa", "Sales", 10000, 35);
INSERT INTO employees VALUES ("Evan", "Sales", 32000, 38);
INSERT INTO employees VALUES ("Fred", "Engineering", 21000, 28);
INSERT INTO employees VALUES ("Alex", "Sales", 30000, 33);
INSERT INTO employees VALUES ("Tom", "Engineering", 23000, 33);
INSERT INTO employees VALUES ("Jane", "Marketing", 29000, 28);
INSERT INTO employees VALUES ("Jeff", "Marketing", 35000, 38);
INSERT INTO employees VALUES ("Paul", "Engineering", 29000, 23);
INSERT INTO employees VALUES ("Chloe", "Engineering", 23000, 25);

SELECT * FROM employees;
+-----+-----------+------+-----+
| name| dept|salary| age|
+-----+-----------+------+-----+
|Chloe|Engineering| 23000| 25|
| Fred|Engineering| 21000| 28|
| Paul|Engineering| 29000| 23|
|Helen| Marketing| 29000| 40|
| Tom|Engineering| 23000| 33|
| Jane| Marketing| 29000| 28|
| Jeff| Marketing| 35000| 38|
| Evan| Sales| 32000| 38|
| Lisa| Sales| 10000| 35|
| Alex| Sales| 30000| 33|
+-----+-----------+------+-----+

SELECT name, dept, salary, RANK() OVER (PARTITION BY dept ORDER BY salary) AS rank FROM employees;
+-----+-----------+------+----+
| name| dept|salary|rank|
+-----+-----------+------+----+
| Lisa| Sales| 10000| 1|
| Alex| Sales| 30000| 2|
| Evan| Sales| 32000| 3|
| Fred|Engineering| 21000| 1|
| Tom|Engineering| 23000| 2|
|Chloe|Engineering| 23000| 2|
| Paul|Engineering| 29000| 4|
|Helen| Marketing| 29000| 1|
| Jane| Marketing| 29000| 1|
| Jeff| Marketing| 35000| 3|
+-----+-----------+------+----+

SELECT name, dept, salary, DENSE_RANK() OVER (PARTITION BY dept ORDER BY salary ROWS BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW) AS dense_rank FROM employees;
+-----+-----------+------+----------+
| name| dept|salary|dense_rank|
+-----+-----------+------+----------+
| Lisa| Sales| 10000| 1|
| Alex| Sales| 30000| 2|
| Evan| Sales| 32000| 3|
| Fred|Engineering| 21000| 1|
| Tom|Engineering| 23000| 2|
|Chloe|Engineering| 23000| 2|
| Paul|Engineering| 29000| 3|
|Helen| Marketing| 29000| 1|
| Jane| Marketing| 29000| 1|
| Jeff| Marketing| 35000| 2|
+-----+-----------+------+----------+

SELECT name, dept, age, CUME_DIST() OVER (PARTITION BY dept ORDER BY age
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cume_dist FROM employees;
+-----+-----------+------+------------------+
| name| dept|age | cume_dist|
+-----+-----------+------+------------------+
| Alex| Sales| 33|0.3333333333333333|
| Lisa| Sales| 35|0.6666666666666666|
| Evan| Sales| 38| 1.0|
| Paul|Engineering| 23| 0.25|
|Chloe|Engineering| 25| 0.75|
| Fred|Engineering| 28| 0.25|
| Tom|Engineering| 33| 1.0|
| Jane| Marketing| 28|0.3333333333333333|
| Jeff| Marketing| 38|0.6666666666666666|
|Helen| Marketing| 40| 1.0|
+-----+-----------+------+------------------+

SELECT name, dept, salary, MIN(salary) OVER (PARTITION BY dept ORDER BY salary) AS min
FROM employees;
+-----+-----------+------+-----+
| name| dept|salary| min|
+-----+-----------+------+-----+
| Lisa| Sales| 10000|10000|
| Alex| Sales| 30000|10000|
| Evan| Sales| 32000|10000|
|Helen| Marketing| 29000|29000|
| Jane| Marketing| 29000|29000|
| Jeff| Marketing| 35000|29000|
| Fred|Engineering| 21000|21000|
| Tom|Engineering| 23000|21000|
|Chloe|Engineering| 23000|21000|
| Paul|Engineering| 29000|21000|
+-----+-----------+------+-----+

6.Another Scenario in SAS:- How to translate a SAS scenario using PROC SORT with BY (ordering based on multiple keys) and NODUPKEY (removing duplicate observations) into Spark SQL:

SAS Scenario:

Imagine you have a dataset named sales_data containing columns like customer_id, product_id, date, and sales_amount. You want to:

  1. Sort the data by customer_id, product_id, and date (in ascending order).
  2. Remove any duplicate observations based on the combination of customer_id, product_id, and date.

SAS Code:

proc sort data=sales_data;
  by customer_id product_id date;
  nodupkey;
run;

Spark SQL Translation:

There are two main approaches to achieve this functionality in Spark SQL:

Approach 1: Using DISTINCT and ORDER BY:

SELECT DISTINCT customer_id, product_id, date, sales_amount
FROM sales_data
ORDER BY customer_id, product_id, date;

Explanation:

  • DISTINCT: This clause eliminates duplicate rows based on all columns in the SELECT clause (in this case, customer_id, product_id, and date).
  • ORDER BY: This clause sorts the resulting unique data based on the specified order (ascending for all three columns).

Approach 2: Using ROW_NUMBER() and WHERE:

SELECT customer_id, product_id, date, sales_amount
FROM (
  SELECT customer_id, product_id, date, sales_amount,
         row_number() OVER (PARTITION BY customer_id, product_id, date ORDER BY date) AS row_num
  FROM sales_data
) AS ranked_data
WHERE row_num = 1;

Explanation:

  1. Window Function (row_number):
    • A subquery assigns a unique row number (row_num) within each group defined by customer_id, product_id, and date.
    • The ORDER BY date ensures rows are ordered by the date for each customer/product combination.
  2. Filtering Unique Rows:
    • The outer query filters the ranked data to keep only rows where row_num is 1. This selects the first occurrence for each unique combination of customer_id, product_id, and date.

Choosing the Right Approach:

  • If your primary goal is sorting and eliminating duplicates efficiently, DISTINCT with ORDER BY might be a simpler solution.
  • If you need the row number information for further processing, or if the data volume is large and DISTINCT performance becomes an issue, the window function approach (ROW_NUMBER()) might be preferable.

Additional Considerations:

  • Both approaches achieve the desired outcome of removing duplicate observations based on the specified key combination and sorting the remaining data.
  • You can adjust the ORDER BY clause to define a different sorting order if needed.

By using either of these approaches in Spark SQL, you can effectively translate the functionality of SAS PROC SORT with BY and NODUPKEY for handling multiple sorting keys and deduplication within your Spark DataFrame.

7.SAS TO SQL

data xyz;set abc;
by xy;
retain x1 y1;
length x1 $400 y1 $100;
if first.xy then do;
x1=' ';
y1=' ';
end;
x1=catx('',trim(x1),x2); y1=catx("",trim(y1),y2);
if last.xy then output;
run;

provided SAS code is performing a group-by operation on xy, retaining values across rows within each group, concatenating those values, and outputting the final concatenated results for each group.

# Define and execute SQL query
sql_query = """
WITH concatenated AS (
    SELECT
        xy,
        COLLECT_LIST(x2) OVER (PARTITION BY xy) AS x2_list,
        COLLECT_LIST(y2) OVER (PARTITION BY xy) AS y2_list,
        ROW_NUMBER() OVER (PARTITION BY xy ORDER BY xy) AS row_num,
        COUNT(*) OVER (PARTITION BY xy) AS count_xy
    FROM abc
)
SELECT
    xy,
    CONCAT_WS('_', x2_list) AS x1,
    CONCAT_WS('_', y2_list) AS y1
FROM concatenated
WHERE row_num = count_xy
"""

# Execute the SQL query
result = spark.sql(sql_query)

# Show the results
result.show()

Pages: 1 2 3

Pages ( 3 of 3 ): « Previous12 3

One response

  1. Rajeev Avatar

    Good Content and Good looking website it is!

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Latest Posts

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading