Integration in PySpark
Spark SQL window functions can also be used within PySpark via .selectExpr()
:
df.selectExpr("EmpID",
"Salary",
"AVG(Salary) OVER (PARTITION BY Department ORDER BY EmpID) AS avg_salary").show()
Best Use Cases for Window Functions
1. Running Totals and Moving Averages
Calculate running totals or moving averages over a specified window of rows.
SELECT
customer_id,
order_date,
amount,
SUM(amount) OVER (PARTITION BY customer_id ORDER BY order_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_total
FROM orders;
2. Ranking
Assign ranks to rows based on the values in a specific column.
SELECT
customer_id,
order_date,
amount,
RANK() OVER (PARTITION BY customer_id ORDER BY amount DESC) AS rank
FROM orders;
3. Time-Series Analysis
Perform operations like lead and lag for time-series data.
SELECT
customer_id,
order_date,
amount,
LAG(amount, 1) OVER (PARTITION BY customer_id ORDER BY order_date) AS previous_amount,
LEAD(amount, 1) OVER (PARTITION BY customer_id ORDER BY order_date) AS next_amount
FROM orders;
4. Percentile and Distribution Analysis
Compute percentiles and cumulative distributions.
SELECT
customer_id,
order_date,
amount,
PERCENT_RANK() OVER (PARTITION BY customer_id ORDER BY amount) AS percent_rank,
CUME_DIST() OVER (PARTITION BY customer_id ORDER BY amount) AS cumulative_distribution
FROM orders;
5. Identifying First and Last Records in Groups
Identify the first and last records within each group.
SELECT
customer_id,
order_date,
amount,
FIRST_VALUE(amount) OVER (PARTITION BY customer_id ORDER BY order_date) AS first_amount,
LAST_VALUE(amount) OVER (PARTITION BY customer_id ORDER BY order_date) AS last_amount
FROM orders;
6.For SAS programmers:-How to translate sas first. and last. in spark sql?
In Spark SQL, there’s no direct equivalent of SAS FIRST.
and LAST.
variables. However, you can achieve similar functionality using window functions like row_number()
or first_value()
and last_value()
depending on your specific needs.
Here’s how you can translate the concept in Spark SQL:
1. Using row_number()
:
This approach assigns a unique row number within each group defined by the BY
variable(s). You can then use conditional logic to identify the first and last rows based on the row number.
SELECT *,
CASE WHEN row_number() OVER (PARTITION BY variable1, variable2 ORDER BY sort_column) = 1 THEN 1 ELSE 0 END AS is_first,
CASE WHEN row_number() OVER (PARTITION BY variable1, variable2 ORDER BY sort_column) = COUNT(*) OVER (PARTITION BY variable1, variable2) THEN 1 ELSE 0 END AS is_last
FROM your_data
ORDER BY variable1, variable2, sort_column;
Explanation:
- We use
row_number()
with aPARTITION BY
clause based on yourBY
variable(s) and anORDER BY
clause to define the order within each group (replacesort_column
with the actual column for sorting). - The
CASE
statement assigns 1 to the first row (whererow_number
is 1) and 0 to others within the group foris_first
. - Similarly, the second
CASE
statement identifies the last row by checking if therow_number
is equal to the total count of rows within the group (usingCOUNT(*) OVER (PARTITION BY...)
).
2. Using first_value()
and last_value()
:
These functions can be used if you only need the values of specific columns for the first and last rows in each group.
SELECT *,
first_value(column_to_get) OVER (PARTITION BY variable1, variable2 ORDER BY sort_column) AS first_value,
last_value(column_to_get) OVER (PARTITION BY variable1, variable2 ORDER BY sort_column) AS last_value
FROM your_data
ORDER BY variable1, variable2, sort_column;
Explanation:
first_value()
andlast_value()
are used within theSELECT
clause with a similar windowing definition as before.- They return the value of the specified
column_to_get
for the first and last rows within each group, respectively.
Choosing the Right Approach:
- If you need additional information beyond just identifying the first and last rows (like row numbers), use
row_number()
. - If you only need the values of specific columns for the first and last rows, use
first_value()
andlast_value()
.
Remember:
- Ensure your data is sorted by the
BY
variable(s) before applying these window functions. - Adjust the windowing specifications (partitioning and ordering) based on your actual requirements.
By leveraging these window functions in Spark SQL, you can effectively achieve functionalities similar to SAS FIRST.
and LAST.
variables for identifying and processing the first and last observations within groups defined by your BY
variable(s).
Scenario
Assume we have a dataset with columns group1
, group2
, order1
, and order2
, and we want to identify the first and last occurrence of each group based on the combined ordering of order1
and order2
.
SAS Code Example
data example;
set example;
by group1 group2 order1 order2;
if first.group1 then first_group1 = 1;
else first_group1 = 0;
if last.group1 then last_group1 = 1;
else last_group1 = 0;
if first.group2 then first_group2 = 1;
else first_group2 = 0;
if last.group2 then last_group2 = 1;
else last_group2 = 0;
run;
Equivalent in Spark SQL
To translate this logic into Spark SQL, you can use the ROW_NUMBER
function along with window functions to identify the first and last occurrence of each group.
Spark SQL Code Example
- Create the DataFrame and Register as a Temporary View:
- This step involves creating a DataFrame and registering it as a temporary view for SQL operations.
- Define the Window Specification:
- The window specification is partitioned by the grouping variables and ordered by the ordering variables.
- Identify the First and Last Occurrences:
- Use
ROW_NUMBER
with appropriate ordering to mark the first and last occurrences.
- Use
Here’s how you can achieve this:
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("SAS First Last Equivalent").getOrCreate()
# Sample data
data = [
("group1", "subgroup1", "2023-01-01", "10:00", 100),
("group1", "subgroup1", "2023-01-01", "10:30", 200),
("group1", "subgroup2", "2023-01-01", "11:00", 300),
("group2", "subgroup1", "2023-01-01", "09:00", 150),
("group2", "subgroup1", "2023-01-01", "09:30", 250),
("group2", "subgroup2", "2023-01-02", "08:00", 130),
("group2", "subgroup2", "2023-01-02", "08:30", 170)
]
columns = ["group1", "group2", "order1", "order2", "amount"]
# Create DataFrame
df = spark.createDataFrame(data, columns)
# Register the DataFrame as a temporary view
df.createOrReplaceTempView("example")
# Use Spark SQL to identify first and last occurrences
sql_query = """
WITH RankedData AS (
SELECT
group1,
group2,
order1,
order2,
amount,
ROW_NUMBER() OVER (PARTITION BY group1 ORDER BY order1, order2) AS rn_group1_asc,
ROW_NUMBER() OVER (PARTITION BY group1 ORDER BY order1 DESC, order2 DESC) AS rn_group1_desc,
ROW_NUMBER() OVER (PARTITION BY group2 ORDER BY order1, order2) AS rn_group2_asc,
ROW_NUMBER() OVER (PARTITION BY group2 ORDER BY order1 DESC, order2 DESC) AS rn_group2_desc
FROM example
)
SELECT
group1,
group2,
order1,
order2,
amount,
CASE WHEN rn_group1_asc = 1 THEN 1 ELSE 0 END AS first_group1,
CASE WHEN rn_group1_desc = 1 THEN 1 ELSE 0 END AS last_group1,
CASE WHEN rn_group2_asc = 1 THEN 1 ELSE 0 END AS first_group2,
CASE WHEN rn_group2_desc = 1 THEN 1 ELSE 0 END AS last_group2
FROM RankedData
"""
# Execute the SQL query
result_df = spark.sql(sql_query)
# Show the result
result_df.show()
Explanation
- RankedData Common Table Expression (CTE):
- We create a CTE called
RankedData
where we calculate row numbers for each group based on the specified order. ROW_NUMBER() OVER (PARTITION BY group1 ORDER BY order1, order2)
assigns a row number to each row withingroup1
ordered byorder1
andorder2
.ROW_NUMBER() OVER (PARTITION BY group1 ORDER BY order1 DESC, order2 DESC)
assigns a reverse row number to identify the last occurrence withingroup1
.- The same logic is applied for
group2
.
- We create a CTE called
- Select with CASE Statements:
- We select from the
RankedData
CTE and useCASE
statements to mark the first and last occurrences for each group.
- We select from the
Sample Output
The resulting DataFrame will include flags indicating the first and last occurrences for each group:
+-------+---------+----------+-------+------+------------+-----------+------------+-----------+
| group1| group2 | order1 | order2|amount|first_group1|last_group1|first_group2|last_group2|
+-------+---------+----------+-------+------+------------+-----------+------------+-----------+
| group1|subgroup1|2023-01-01| 10:00 | 100| 1| 0| 1| 0|
| group1|subgroup1|2023-01-01| 10:30 | 200| 0| 0| 0| 1|
| group1|subgroup2|2023-01-01| 11:00 | 300| 0| 1| 1| 1|
| group2|subgroup1|2023-01-01| 09:00 | 150| 1| 0| 1| 0|
| group2|subgroup1|2023-01-01| 09:30 | 250| 0| 1| 0| 1|
| group2|subgroup2|2023-01-02| 08:00 | 130| 0| 0| 1| 0|
| group2|subgroup2|2023-01-02| 08:30 | 170| 0| 1| 0| 1|
+-------+---------+----------+-------+------+------------+-----------+------------+-----------+
Examples from Official Spark SQL Doc:-
CREATE TABLE employees (name STRING, dept STRING, salary INT, age INT);
INSERT INTO employees VALUES ("Lisa", "Sales", 10000, 35);
INSERT INTO employees VALUES ("Evan", "Sales", 32000, 38);
INSERT INTO employees VALUES ("Fred", "Engineering", 21000, 28);
INSERT INTO employees VALUES ("Alex", "Sales", 30000, 33);
INSERT INTO employees VALUES ("Tom", "Engineering", 23000, 33);
INSERT INTO employees VALUES ("Jane", "Marketing", 29000, 28);
INSERT INTO employees VALUES ("Jeff", "Marketing", 35000, 38);
INSERT INTO employees VALUES ("Paul", "Engineering", 29000, 23);
INSERT INTO employees VALUES ("Chloe", "Engineering", 23000, 25);
SELECT * FROM employees;
+-----+-----------+------+-----+
| name| dept|salary| age|
+-----+-----------+------+-----+
|Chloe|Engineering| 23000| 25|
| Fred|Engineering| 21000| 28|
| Paul|Engineering| 29000| 23|
|Helen| Marketing| 29000| 40|
| Tom|Engineering| 23000| 33|
| Jane| Marketing| 29000| 28|
| Jeff| Marketing| 35000| 38|
| Evan| Sales| 32000| 38|
| Lisa| Sales| 10000| 35|
| Alex| Sales| 30000| 33|
+-----+-----------+------+-----+
SELECT name, dept, salary, RANK() OVER (PARTITION BY dept ORDER BY salary) AS rank FROM employees;
+-----+-----------+------+----+
| name| dept|salary|rank|
+-----+-----------+------+----+
| Lisa| Sales| 10000| 1|
| Alex| Sales| 30000| 2|
| Evan| Sales| 32000| 3|
| Fred|Engineering| 21000| 1|
| Tom|Engineering| 23000| 2|
|Chloe|Engineering| 23000| 2|
| Paul|Engineering| 29000| 4|
|Helen| Marketing| 29000| 1|
| Jane| Marketing| 29000| 1|
| Jeff| Marketing| 35000| 3|
+-----+-----------+------+----+
SELECT name, dept, salary, DENSE_RANK() OVER (PARTITION BY dept ORDER BY salary ROWS BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW) AS dense_rank FROM employees;
+-----+-----------+------+----------+
| name| dept|salary|dense_rank|
+-----+-----------+------+----------+
| Lisa| Sales| 10000| 1|
| Alex| Sales| 30000| 2|
| Evan| Sales| 32000| 3|
| Fred|Engineering| 21000| 1|
| Tom|Engineering| 23000| 2|
|Chloe|Engineering| 23000| 2|
| Paul|Engineering| 29000| 3|
|Helen| Marketing| 29000| 1|
| Jane| Marketing| 29000| 1|
| Jeff| Marketing| 35000| 2|
+-----+-----------+------+----------+
SELECT name, dept, age, CUME_DIST() OVER (PARTITION BY dept ORDER BY age
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cume_dist FROM employees;
+-----+-----------+------+------------------+
| name| dept|age | cume_dist|
+-----+-----------+------+------------------+
| Alex| Sales| 33|0.3333333333333333|
| Lisa| Sales| 35|0.6666666666666666|
| Evan| Sales| 38| 1.0|
| Paul|Engineering| 23| 0.25|
|Chloe|Engineering| 25| 0.75|
| Fred|Engineering| 28| 0.25|
| Tom|Engineering| 33| 1.0|
| Jane| Marketing| 28|0.3333333333333333|
| Jeff| Marketing| 38|0.6666666666666666|
|Helen| Marketing| 40| 1.0|
+-----+-----------+------+------------------+
SELECT name, dept, salary, MIN(salary) OVER (PARTITION BY dept ORDER BY salary) AS min
FROM employees;
+-----+-----------+------+-----+
| name| dept|salary| min|
+-----+-----------+------+-----+
| Lisa| Sales| 10000|10000|
| Alex| Sales| 30000|10000|
| Evan| Sales| 32000|10000|
|Helen| Marketing| 29000|29000|
| Jane| Marketing| 29000|29000|
| Jeff| Marketing| 35000|29000|
| Fred|Engineering| 21000|21000|
| Tom|Engineering| 23000|21000|
|Chloe|Engineering| 23000|21000|
| Paul|Engineering| 29000|21000|
+-----+-----------+------+-----+
6.Another Scenario in SAS:- How to translate a SAS scenario using PROC SORT
with BY
(ordering based on multiple keys) and NODUPKEY
(removing duplicate observations) into Spark SQL:
SAS Scenario:
Imagine you have a dataset named sales_data
containing columns like customer_id
, product_id
, date
, and sales_amount
. You want to:
- Sort the data by
customer_id
,product_id
, anddate
(in ascending order). - Remove any duplicate observations based on the combination of
customer_id
,product_id
, anddate
.
SAS Code:
proc sort data=sales_data;
by customer_id product_id date;
nodupkey;
run;
Spark SQL Translation:
There are two main approaches to achieve this functionality in Spark SQL:
Approach 1: Using DISTINCT
and ORDER BY
:
SELECT DISTINCT customer_id, product_id, date, sales_amount
FROM sales_data
ORDER BY customer_id, product_id, date;
Explanation:
DISTINCT
: This clause eliminates duplicate rows based on all columns in theSELECT
clause (in this case,customer_id
,product_id
, anddate
).ORDER BY
: This clause sorts the resulting unique data based on the specified order (ascending for all three columns).
Approach 2: Using ROW_NUMBER()
and WHERE
:
SELECT customer_id, product_id, date, sales_amount
FROM (
SELECT customer_id, product_id, date, sales_amount,
row_number() OVER (PARTITION BY customer_id, product_id, date ORDER BY date) AS row_num
FROM sales_data
) AS ranked_data
WHERE row_num = 1;
Explanation:
- Window Function (row_number):
- A subquery assigns a unique row number (
row_num
) within each group defined bycustomer_id
,product_id
, anddate
. - The
ORDER BY date
ensures rows are ordered by thedate
for each customer/product combination.
- A subquery assigns a unique row number (
- Filtering Unique Rows:
- The outer query filters the ranked data to keep only rows where
row_num
is 1. This selects the first occurrence for each unique combination ofcustomer_id
,product_id
, anddate
.
- The outer query filters the ranked data to keep only rows where
Choosing the Right Approach:
- If your primary goal is sorting and eliminating duplicates efficiently,
DISTINCT
withORDER BY
might be a simpler solution. - If you need the row number information for further processing, or if the data volume is large and
DISTINCT
performance becomes an issue, the window function approach (ROW_NUMBER()
) might be preferable.
Additional Considerations:
- Both approaches achieve the desired outcome of removing duplicate observations based on the specified key combination and sorting the remaining data.
- You can adjust the
ORDER BY
clause to define a different sorting order if needed.
By using either of these approaches in Spark SQL, you can effectively translate the functionality of SAS PROC SORT
with BY
and NODUPKEY
for handling multiple sorting keys and deduplication within your Spark DataFrame.
7.SAS TO SQL
data xyz;set abc;
by xy;
retain x1 y1;
length x1 $400 y1 $100;
if first.xy then do;
x1=' ';
y1=' ';
end;
x1=catx('',trim(x1),x2); y1=catx("",trim(y1),y2);
if last.xy then output;
run;
provided SAS code is performing a group-by operation on xy
, retaining values across rows within each group, concatenating those values, and outputting the final concatenated results for each group.
# Define and execute SQL query
sql_query = """
WITH concatenated AS (
SELECT
xy,
COLLECT_LIST(x2) OVER (PARTITION BY xy) AS x2_list,
COLLECT_LIST(y2) OVER (PARTITION BY xy) AS y2_list,
ROW_NUMBER() OVER (PARTITION BY xy ORDER BY xy) AS row_num,
COUNT(*) OVER (PARTITION BY xy) AS count_xy
FROM abc
)
SELECT
xy,
CONCAT_WS('_', x2_list) AS x1,
CONCAT_WS('_', y2_list) AS y1
FROM concatenated
WHERE row_num = count_xy
"""
# Execute the SQL query
result = spark.sql(sql_query)
# Show the results
result.show()
Leave a Reply