Combines results of two queries (removes duplicates)
SELECT col FROM table1 UNION SELECT col FROM table2;
UNION ALL
query1 UNION ALL query2
Combines results of two queries (includes duplicates)
SELECT col FROM table1 UNION ALL SELECT col FROM table2;
INTERSECT
query1 INTERSECT query2
Returns common rows from both queries
SELECT col FROM table1 INTERSECT SELECT col FROM table2;
EXCEPT
query1 EXCEPT query2
Returns rows from the first query not in the second
SELECT col FROM table1 EXCEPT SELECT col FROM table2;
7. Aggregate Functions (Operators)
Function
Syntax
Description
Example
COUNT
COUNT(*) or COUNT(col)
Counts rows or non-NULL values
SELECT COUNT(*) FROM table;
SUM
SUM(col)
Sums up numeric values
SELECT SUM(sales) FROM table;
AVG
AVG(col)
Calculates the average
SELECT AVG(price) FROM table;
MIN
MIN(col)
Finds the minimum value
SELECT MIN(price) FROM table;
MAX
MAX(col)
Finds the maximum value
SELECT MAX(price) FROM table;
GROUP BY
GROUP BY col
Groups rows based on a column value
SELECT category, SUM(sales) FROM table GROUP BY category;
8. Window Functions
Function
Syntax
Description
Example
ROW_NUMBER
ROW_NUMBER() OVER (PARTITION BY col ORDER BY col2)
Assigns a unique number to each row within a partition
SELECT ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales) AS rank FROM table;
RANK
RANK() OVER (PARTITION BY col ORDER BY col2)
Assigns rank to rows with gaps for duplicates
SELECT RANK() OVER (PARTITION BY category ORDER BY sales DESC) AS rank FROM table;
DENSE_RANK
DENSE_RANK() OVER (PARTITION BY col ORDER BY col2)
Assigns rank to rows without gaps
SELECT DENSE_RANK() OVER (PARTITION BY category ORDER BY sales DESC) AS rank FROM table;
NTILE
NTILE(n) OVER (PARTITION BY col ORDER BY col2)
Divides rows into n buckets
SELECT NTILE(4) OVER (ORDER BY sales) AS quartile FROM table;
LEAD
LEAD(col, offset, default) OVER (PARTITION BY col ORDER BY col2)
Accesses the value of the next row
SELECT LEAD(sales, 1, 0) OVER (ORDER BY date) FROM table;
LAG
LAG(col, offset, default) OVER (PARTITION BY col ORDER BY col2)
Accesses the value of the previous row
SELECT LAG(sales, 1, 0) OVER (ORDER BY date) FROM table;
9. Miscellaneous Operators
Operator
Syntax
Description
Example
CAST
CAST(expr AS type)
Converts a value to a specified type
SELECT CAST(price AS STRING) FROM table;
CASE
CASE WHEN condition THEN result ELSE result END
Conditional logic
SELECT CASE WHEN sales > 100 THEN 'High' ELSE 'Low' END AS category FROM table;
DISTINCT
DISTINCT col
Returns unique values
SELECT DISTINCT category FROM table;
This cheatsheet provides a comprehensive overview of commonly used Spark SQL operators and functions with their syntax, descriptions, and examples. Use it as a reference to efficiently write and optimize Spark SQL queries!
Natural Language: Use simple and clear natural language to describe steps.
Keywords: Use standard control flow keywords such as:
IF, ELSE, ENDIF
FOR, WHILE, ENDWHILE
FUNCTION, CALL
INPUT, OUTPUT
Indentation: Indent blocks within loops or conditionals to signify nesting.
Capitalization: Write pseudocode keywords in UPPERCASE to distinguish them from variables and logic descriptions.
Variables: Use meaningful and descriptive names (e.g., customerName, totalSales).
Standards:
Use simple language: Avoid complex sentences and focus on concise, easy-to-understand statements.
Be consistent: Use the same terminology and formatting throughout the pseudocode.
Use indentation: Indent code blocks to show hierarchy and improve readability.
Terms
INPUT: Used to describe user input or data input. Example: INPUT name
OUTPUT: Used to describe output or results. Example: OUTPUT "Hello, " + name
SET: Used to assign a value to a variable. Example: SET count = 0
IF: Used to describe conditional statements. Example: IF age > 18 THEN ...
WHILE: Used to describe loops. Example: WHILE count < 10 DO ...
FOR: Used to describe loops with a counter. Example: FOR i = 1 TO 10 DO ...
REPEAT: Used to describe loops that repeat until a condition is met. Example: REPEAT ... UNTIL count = 10
UNTIL: Used to describe loops that repeat until a condition is met. Example: REPEAT ... UNTIL count = 10
CASE: Used to describe multi-branch conditional statements. Example: CASE color OF ...
PROCEDURE: Used to describe a subroutine or function. Example: PROCEDURE greet(name) ...
Common Terms and Constructs in Pseudocode
Control Structures
Conditionals:
IF condition THEN // Actions ELSE // Alternative actions ENDIF
LoopsFOR variable FROM start TO end DO // Actions ENDFOR WHILE condition DO // Actions ENDWHILE
Input/Output
Use clear and direct terms for input and output:
INPUT "Enter a number:", user Number OUTPUT "The result is:", result
Functions and Procedures
Define reusable components:FUNCTION calculateSum(a, b) RETURN a + b ENDFUNCTION
Error Handling
Describe how to handle errors in a readable way:IF error occurs THEN OUTPUT "Error encountered. Exiting process." EXIT ENDIF
Data Manipulation
Data operations like adding, updating, or deleting: SET total TO total + itemCost REMOVE item FROM shoppingCart
Example of Good Pseudocodes
Here’s an example of a simple algorithm to calculate the average of three numbers:
INPUT num1, num2, num3
SET sum = num1 + num2 + num3
SET average = sum / 3
OUTPUT "The average is: " + average
Problem: Calculate the factorial of a given number.
START
INPUT "Enter a positive integer:", num
IF num < 0 THEN
OUTPUT "Error: Number must be positive."
EXIT
ENDIF
SET factorial TO 1
FOR i FROM 1 TO num DO
SET factorial TO factorial * i
ENDFOR
OUTPUT "The factorial of", num, "is", factorial
END
By following these guidelines, syntax, standards, and terms, you can write perfect pseudocode that is easy to read, understand, and implement.
Window functions in PySpark allow you to perform operations on a subset of your data using a “window” that defines a range of rows. These functions are similar to SQL window functions and are useful for tasks like ranking, cumulative sums, and moving averages. Let’s go through various PySpark DataFrame window functions, compare them with Spark SQL window functions, and provide examples with a large sample dataset.
PySpark’s window functions allow operations across a specified “window” of rows, such as performing aggregations, ranking, or comparisons. The functionality mimics SQL window functions but uses PySpark’s syntax.
Syntax Structure
Define a Window Specification: The Window object specifies how rows are partitioned and ordered for the operation.
from pyspark.sql.window import Window window_spec = Window.partitionBy("column1").orderBy("column2")
Apply the Window Function: Use PySpark functions like row_number(), rank(), dense_rank(), etc., with the window specification.
from pyspark.sql.functions import row_number, rank, dense_rank, sum df.withColumn("row_num", row_number().over(window_spec))
Window Specification Options
Option
Description
Syntax
partitionBy()
Divides the data into partitions for independent calculations.
Window.partitionBy("column1")
orderBy()
Specifies the order of rows within each partition.
Window.orderBy("column2")
rowsBetween()
Defines a window frame by rows relative to the current row.
.rowsBetween(-1, 1)
rangeBetween()
Defines a window frame based on the range of values in the ordering column.
.rangeBetween(-10, 10)
unboundedPreceding
Indicates all rows before the current row in the partition.
Window.rowsBetween(Window.unboundedPreceding, 0)
unboundedFollowing
Indicates all rows after the current row in the partition.
There are multiple ways to apply window functions on DataFrames in PySpark. While withColumn is the most commonly used method to add a new column with a window function, there are other approaches to apply window functions, depending on the specific use case.
Here are different methods for applying window functions:
1. Using select() with window functions
Instead of withColumn(), you can use select() to directly apply a window function to the columns of the DataFrame. This is useful when you only want to return a subset of columns along with the windowed column.
from pyspark.sql.functions import row_number
from pyspark.sql import Window
# Define the window specification
windowSpec = Window.partitionBy("salary").orderBy("id")
# Use select to apply the window function
df.select("id", "salary", row_number().over(windowSpec).alias("row_number")).show()
2. Using agg() with window functions
Window functions can also be applied when performing aggregations (agg()). This is useful when you want to calculate aggregated metrics (e.g., sum, avg) over a window.
from pyspark.sql.functions import sum
from pyspark.sql import Window
# Define the window specification
windowSpec = Window.partitionBy("salary")
# Apply window function during aggregation
df.groupBy("id").agg(sum("salary").over(windowSpec).alias("total_salary")).show()
3. Using filter() or where()
Sometimes, window functions are used in conjunction with filters to extract specific rows, such as filtering the first or last row per partition.
from pyspark.sql.functions import row_number
# Define the window specification
windowSpec = Window.partitionBy("salary").orderBy("id")
# Apply window function and filter based on the rank
ranked_df = df.withColumn("row_number", row_number().over(windowSpec))
ranked_df.filter(ranked_df["row_number"] == 1).show() # Filter to get the first row per partition
4. Using groupBy() with window functions
Though groupBy() is usually used for aggregations, you can combine it with window functions. Window functions won’t replace groupBy(), but you can apply them after aggregations.
from pyspark.sql.functions import rank
# Define the window specification
windowSpec = Window.partitionBy("salary").orderBy("id")
# First, group by some column and then apply a window function
grouped_df = df.groupBy("salary").count()
grouped_df.withColumn("rank", rank().over(windowSpec)).show()
5. Using withColumnRenamed() with window functions
You can also rename the result of a window function when adding it as a new column.
from pyspark.sql.functions import row_number
# Define the window specification
windowSpec = Window.partitionBy("salary").orderBy("id")
# Apply the window function and rename the column
df.withColumn("row_number", row_number().over(windowSpec)).withColumnRenamed("row_number", "rank").show()
6. Combining multiple window functions in one step
You can apply multiple window functions in a single step using either select() or withColumn().
Window functions in PySpark and Spark SQL are powerful tools for data analysis. They allow you to perform complex calculations and transformations on subsets of your data, similar to SQL window functions. By using window functions, you can easily implement features like ranking, cumulative sums, and moving averages in your PySpark applications.
Examples:-
1.pyspark dataframes Remove duplicates based on specific columns and then order by different columns
To remove duplicates from a PySpark DataFrame based on specific columns and order the remaining rows by different columns, you can use a combination of the dropDuplicates() function and the orderBy() (or sort()) function.
Here is an example that demonstrates this process:
Remove duplicates based on specific columns.
Order the resulting DataFrame by different columns.
from pyspark.sql import SparkSession from pyspark.sql.functions import col
# Show the original DataFrame print("Original DataFrame:") df.show()
# Step 1: Remove duplicates based on specific columns (e.g., "name", "age") df_no_duplicates = df.dropDuplicates(["name", "age"])
# Step 2: Order the resulting DataFrame by different columns (e.g., "age" in descending order) df_ordered = df_no_duplicates.orderBy(col("age").desc())
# Show the resulting DataFrame print("DataFrame after removing duplicates and ordering:") df_ordered.show()
# Stop SparkSession spark.stop()
Explanation:
Initialization and Data Preparation:
A SparkSession is created.
Sample data is provided, and a DataFrame is created from this data.
Removing Duplicates:
The dropDuplicates() function is used to remove rows that have the same values in the specified columns ("name" and "age" in this case).
Ordering Data:
The orderBy() function is used to sort the DataFrame by the specified columns. In this case, the DataFrame is ordered by "age" in descending order.
Displaying Results:
The original and resulting DataFrames are displayed using the show() function.
dropDuplicates(): This function removes duplicate rows based on the specified columns. If no columns are specified, it removes rows that are identical in all columns.
orderBy()/sort(): These functions are used to order the DataFrame. You can specify multiple columns and the sorting order (ascending or descending) for each column.
You can chain multiple DataFrame operations together. For example, you can combine dropDuplicates() and orderBy() in a single statement: df.dropDuplicates(["name", "age"]).orderBy(col("age").desc())
2.pyspark dataframes Remove duplicates based on specific columns while ordering by different columns
To remove duplicates from a PySpark DataFrame based on specific columns while ensuring the ordering of the data based on other columns, you can use the window functions in PySpark. This approach allows you to specify how to handle duplicates and maintain the desired order.
Below is an example to demonstrate how to remove duplicates based on specific columns (name, age) while ordering the rows by different columns (age descending and id ascending):
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
# Initialize SparkSession
spark = SparkSession.builder.appName("RemoveDuplicatesWithOrdering").getOrCreate()
# Sample data
data = [
(1, "Alice", 29),
(2, "Bob", 30),
(3, "Alice", 29),
(4, "David", 35),
(5, "Alice", 25)
]
# Create DataFrame
columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)
# Show the original DataFrame
print("Original DataFrame:")
df.show()
# Define a window specification
windowSpec = Window.partitionBy("name", "age").orderBy(col("age").desc(), col("id").asc())
# Add a row number to each partition
df_with_row_number = df.withColumn("row_number", row_number().over(windowSpec))
# Filter the rows to keep only the first occurrence of each combination of name and age
df_no_duplicates = df_with_row_number.filter(col("row_number") == 1).drop("row_number")
# Show the resulting DataFrame
print("DataFrame after removing duplicates and ordering:")
df_no_duplicates.show()
# Stop SparkSession
spark.stop()
Explanation:
Initialization and Data Preparation:
A SparkSession is created.
Sample data is provided, and a DataFrame is created from this data.
Define Window Specification:
A window specification is defined using Window.partitionBy("name", "age").orderBy(col("age").desc(), col("id").asc()). This specifies that the data should be partitioned by the columns name and age and ordered within each partition by age in descending order and id in ascending order.
Add Row Number:
The row_number() function is used to add a row number to each row within the specified window. This row number helps to identify and keep only the first occurrence of each partition.
Filter Rows:
The DataFrame is filtered to keep only the rows where the row_number is 1. This effectively removes duplicates while maintaining the specified order.
Display Results:
The original and resulting DataFrames are displayed using the show() function.
Window Specification: The Window specification defines how the data should be partitioned and ordered.
Row Number: The row_number() function assigns a unique row number within each window partition.
Filter and Drop: The resulting DataFrame is filtered to keep only the rows where the row_number is 1, and the temporary row_number column is dropped.
This approach ensures that duplicates are removed based on the specified columns while maintaining the desired order of the data. You can adjust the partitioning and ordering columns according to your specific requirements.
Codes for Revising Pyspark Windows
# Sample data
data = [
(1, "Alice", 10, 8000, "New York"),
(2, "Bob", 11, 9000, "New York"),
(3, "Charlie", 10, 10000, "Chicago"),
(4, "David", 12, 9000, "New York"),
(6, "Eve", 13, 9000, "Chicago"),
(7, "GEve", 13, 10000, "Chicago"),
(8, "REve", 13, 5000, "Chicago"),
(9, "ScEve", 14, 5600, "LA"),
(10, "DEve", 15, 11000, "LA"),
(11, "Ram", 14, 11000, "LA"),
(12, "Hem", 10, 8000, "LA"),
(13, "Hith", 11, 6000, "Chicago"),
(14, "Pit", 15, 13000, "Chicago"),
(15, "Evelyn", 15, 14000, "New York"),
(16, "FteEve", 12, 9200, "New York"),
(17, "sctuEve", 12, None, "Chicago"),
]
# Define schema
columns = ["EmpID", "Emp_name", "Manager_id", "Salary", "Location"]
df = spark.createDataFrame(data, schema=columns)
df.show()
from pyspark.sql.functions import row_number, rank, dense_rank
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, col
wf=Window.partitionBy("Location").orderBy("Salary")
# Calculate row_number, rank, and dense_rank separately
row_number_col = row_number().over(wf).alias("row_number")
rank_col = rank().over(wf).alias("rank")
dense_rank_col = dense_rank().over(wf).alias("dense_rank")
# Select columns including calculated window function results
df.select(
"EmpID",
"Emp_name",
"Manager_id",
"salary",
"Location",
row_number_col,
rank_col,
dense_rank_col
).show()
df.select(
"EmpID",
"Emp_name",
"Manager_id",
"salary",
"Location",
row_number().over(wf).alias("row_number"),
rank().over(wf).alias("rank"),
dense_rank().over(wf).alias("dense_rank")
).show()
#Using withColumn with window functions
df.withColumn("row_number", row_number().over(wf))
.withColumn("rank", rank().over(wf))
.withColumn("dense_rank", dense_rank().over(wf))
.show()
#Using selectExpr with window functions
df.selectExpr(
"EmpID",
"Emp_name",
"Manager_id",
"salary",
"Location",
"row_number() OVER (PARTITION BY Location ORDER BY Salary) AS row_number", # Define window here
"rank() OVER (PARTITION BY Location ORDER BY Salary) AS rank", # Define window here
"dense_rank() OVER (PARTITION BY Location ORDER BY Salary) AS dense_rank" # Define window here
).show()
#Using withColumn with window functions and chaining
df.withColumn("row_number", row_number().over(wf))
.withColumn("rank", rank().over(wf))
.withColumn("dense_rank", dense_rank().over(wf))
.drop("salary")
.filter(col("row_number") == 1)
.show()
df.createOrReplaceTempView("dfview")
spark.sql(""" select EmpID,Emp_name,Manager_id,salary,Location,row_number() OVER (PARTITION BY Location ORDER BY Salary) AS row_number,
rank() OVER (PARTITION BY Location ORDER BY Salary) AS rank,dense_rank() OVER (PARTITION BY Location ORDER BY Salary) AS dense_rank
from dfview """ ) .show()
spark.sql("""
SELECT EmpID, Emp_name, Manager_id, salary, Location,
row_number() OVER w AS row_number,
rank() OVER w AS rank,
dense_rank() OVER w AS dense_rank
FROM dfview
WINDOW w AS (PARTITION BY Location ORDER BY Salary)
""").show()
Window functions in Spark SQL are powerful tools that allow you to perform calculations across a set of table rows that are somehow related to the current row. These functions are particularly useful for tasks that require a global view of the data, such as running totals, ranking, and time-series analysis.
Overview of Window Functions
In Spark SQL, window functions are used with the OVER clause and can be categorized into several types:
Ranking Functions:
ROW_NUMBER(): Assigns a unique number to each row within the partition of a result set.
RANK(): Assigns a rank to each row within the partition of a result set, with gaps for ties.
DENSE_RANK(): Similar to RANK(), but without gaps between the ranks.
NTILE(n): Distributes the rows into a specified number of groups and assigns a group number to each row.
Analytic Functions:
CUME_DIST(): Computes the cumulative distribution of a value in a group of values.
PERCENT_RANK(): Computes the rank of a value in a group of values as a percentage.
Aggregate Functions:
SUM(), AVG(), MIN(), MAX(), COUNT(): These functions can be used as window functions to perform aggregations over a set of rows.
Value Functions:
LAG(): Accesses data from a previous row in the same result set.
LEAD(): Accesses data from a subsequent row in the same result set.
FIRST_VALUE(): Returns the first value in an ordered set of values.
LAST_VALUE(): Returns the last value in an ordered set of values.
Window Specification
To use a window function, you need to define a window specification that includes:
Partitioning: Defines the subsets of data to which the function is applied.
Ordering: Defines the order of rows within each partition.
Frame: Defines the subset of rows relative to the current row.
Basic Syntax of Window Fuction in Spark SQL
SELECT column1, column2, aggregate_function(column) OVER (PARTITION BY column_name ORDER BY column_name [ROWS/RANGE frame_clause]) AS alias FROM table_name;
Components of a Window Function
Aggregate or Analytical Function:
Examples: SUM(), AVG(), COUNT(), ROW_NUMBER(), RANK(), etc.
OVER Clause:
Specifies the window over which the function operates.
PARTITION BY Clause (optional):
Divides the dataset into partitions.
Each partition is processed independently.
ORDER BY Clause (optional):
Specifies the ordering of rows within a partition.
Frame Specification (optional):
Defines the range of rows within a partition for computation.
Options: ROWS, RANGE.
1. PARTITION BY Clause
Divides the dataset into groups (partitions) for which the window function is applied independently.
Syntax:PARTITION BY column_name
Example:AVG(Salary) OVER (PARTITION BY Department) This computes the average salary separately for each department.
2. ORDER BY Clause
Defines the order of rows within each partition.
Syntax:ORDER BY column_name [ASC | DESC] [NULLS FIRST | NULLS LAST]
ASC: Ascending order (default).
DESC: Descending order.
NULLS FIRST: Places NULL values at the beginning of the order.
NULLS LAST: Places NULL values at the end of the order.
Example:RANK() OVER (PARTITION BY Department ORDER BY Salary DESC NULLS LAST)
3. Frame Specification
Defines the range of rows to consider for the window function.
Frame types:
ROWS: Based on row positions.
RANGE: Based on value ranges.
Frame Syntax
[ROWS | RANGE] BETWEEN frame_start AND frame_end
Frame Options
UNBOUNDED PRECEDING:
Includes all rows from the beginning of the partition.
Example: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW.
CURRENT ROW:
Includes the current row only.
Example: ROWS BETWEEN CURRENT ROW AND CURRENT ROW.
UNBOUNDED FOLLOWING:
Includes all rows until the end of the partition.
Example: ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING.
Specific Number of Rows:
Includes a fixed number of preceding or following rows.
Example: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING.
Examples
Cumulative sum:SUM(Sales) OVER (PARTITION BY CustomerID ORDER BY OrderDate ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
Moving average:AVG(Salary) OVER (PARTITION BY Department ORDER BY Salary ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
4. Handling NULL Values in ORDER BY
By default:
NULLS FIRST for ascending order.
NULLS LAST for descending order.
Example
RANK() OVER (PARTITION BY Department ORDER BY Salary DESC NULLS FIRST)
5. Supported Functions with OVER Clause
Spark SQL supports many window functions, including:
SELECT EmpID,
Department,
Salary,
AVG(Salary) OVER (PARTITION BY Department ORDER BY Salary ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS running_avg,
RANK() OVER (PARTITION BY Department ORDER BY Salary DESC NULLS LAST) AS salary_rank
FROM Employee;
This example:
Computes a running average for the last three salaries within each department.
Ranks employees by salary in descending order, placing NULL salaries at the end.
Summary of Options
Clause
Description
Syntax Example
PARTITION BY
Divides rows into partitions
PARTITION BY Department
ORDER BY
Orders rows within partitions
ORDER BY Salary DESC NULLS FIRST
ROWS
Row-based range for window frame
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
RANGE
Value-based range for window frame
RANGE BETWEEN INTERVAL 1 DAY PRECEDING AND CURRENT ROW
NULLS
Specifies how nulls are handled in ordering
ORDER BY Salary DESC NULLS LAST
By combining these options, Spark SQL allows flexible and efficient computation over distributed datasets!
Examples
1. Rank Employees by Salary in Each Department
SELECT EmpID,
Emp_name,
Department,
Salary,
RANK() OVER (PARTITION BY Department ORDER BY Salary DESC) AS rank
FROM Employee;
Explanation:
PARTITION BY Department: Separate ranking by department.
ORDER BY Salary DESC: Ranks employees by salary in descending order.
2. Calculate Cumulative Sum of Sales
SELECT CustomerID, OrderDate, ProductID, SUM(Sales) OVER (PARTITION BY CustomerID ORDER BY OrderDate) AS cumulative_sales FROM Orders;
Explanation:
PARTITION BY CustomerID: Cumulative sales calculated per customer.
ORDER BY OrderDate: Sales are accumulated in chronological order.
3. Find Running Average Salary
SELECT EmpID,
Emp_name,
Department,
AVG(Salary) OVER (PARTITION BY Department ORDER BY Salary ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS running_avg
FROM Employee;
Explanation:
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW: Calculates the average salary of the current row and the two preceding rows.
4. Lead and Lag Example
SELECT EmpID,
Emp_name,
Department,
Salary,
LAG(Salary, 1, 0) OVER (PARTITION BY Department ORDER BY Salary) AS previous_salary,
LEAD(Salary, 1, 0) OVER (PARTITION BY Department ORDER BY Salary) AS next_salary
FROM Employee;
Explanation:
LAG(Salary, 1, 0): Fetches the previous salary within the partition. Default is 0 if there is no previous value.
LEAD(Salary, 1, 0): Fetches the next salary within the partition. Default is 0 if there is no next value.
5. Find Employees Above Average Salary in Their Department
SELECT EmpID,
Emp_name,
Department,
Salary,
AVG(Salary) OVER (PARTITION BY Department) AS avg_salary
FROM Employee
WHERE Salary > AVG(Salary) OVER (PARTITION BY Department);
Explanation:
AVG(Salary) OVER (PARTITION BY Department): Calculates average salary for each department.
The WHERE clause filters employees whose salary is above the department average.
Notes
Spark SQL does not support DISTINCT in window functions.
For example, COUNT(DISTINCT column) OVER (...) is not allowed in Spark SQL.
Optimized for Distributed Computing:
Unlike traditional SQL, Spark SQL handles large-scale datasets by distributing computations across the cluster.
Frame Specification:
Use ROWS for row-level computations.
Use RANGE for value-based computations.
Integration in PySpark
Spark SQL window functions can also be used within PySpark via .selectExpr():
df.selectExpr("EmpID", "Salary", "AVG(Salary) OVER (PARTITION BY Department ORDER BY EmpID) AS avg_salary").show()
Best Use Cases for Window Functions
1. Running Totals and Moving Averages
Calculate running totals or moving averages over a specified window of rows.
SELECT
customer_id,
order_date,
amount,
SUM(amount) OVER (PARTITION BY customer_id ORDER BY order_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_total
FROM orders;
2. Ranking
Assign ranks to rows based on the values in a specific column.
SELECT
customer_id,
order_date,
amount,
RANK() OVER (PARTITION BY customer_id ORDER BY amount DESC) AS rank
FROM orders;
3. Time-Series Analysis
Perform operations like lead and lag for time-series data.
SELECT
customer_id,
order_date,
amount,
LAG(amount, 1) OVER (PARTITION BY customer_id ORDER BY order_date) AS previous_amount,
LEAD(amount, 1) OVER (PARTITION BY customer_id ORDER BY order_date) AS next_amount
FROM orders;
4. Percentile and Distribution Analysis
Compute percentiles and cumulative distributions.
SELECT
customer_id,
order_date,
amount,
PERCENT_RANK() OVER (PARTITION BY customer_id ORDER BY amount) AS percent_rank,
CUME_DIST() OVER (PARTITION BY customer_id ORDER BY amount) AS cumulative_distribution
FROM orders;
5. Identifying First and Last Records in Groups
Identify the first and last records within each group.
SELECT
customer_id,
order_date,
amount,
FIRST_VALUE(amount) OVER (PARTITION BY customer_id ORDER BY order_date) AS first_amount,
LAST_VALUE(amount) OVER (PARTITION BY customer_id ORDER BY order_date) AS last_amount
FROM orders;
6.For SAS programmers:-How to translate sas first. and last. in spark sql?
In Spark SQL, there’s no direct equivalent of SAS FIRST. and LAST. variables. However, you can achieve similar functionality using window functions like row_number() or first_value() and last_value() depending on your specific needs.
Here’s how you can translate the concept in Spark SQL:
1. Using row_number():
This approach assigns a unique row number within each group defined by the BY variable(s). You can then use conditional logic to identify the first and last rows based on the row number.
SELECT *,
CASE WHEN row_number() OVER (PARTITION BY variable1, variable2 ORDER BY sort_column) = 1 THEN 1 ELSE 0 END AS is_first,
CASE WHEN row_number() OVER (PARTITION BY variable1, variable2 ORDER BY sort_column) = COUNT(*) OVER (PARTITION BY variable1, variable2) THEN 1 ELSE 0 END AS is_last
FROM your_data
ORDER BY variable1, variable2, sort_column;
Explanation:
We use row_number() with a PARTITION BY clause based on your BY variable(s) and an ORDER BY clause to define the order within each group (replace sort_column with the actual column for sorting).
The CASE statement assigns 1 to the first row (where row_number is 1) and 0 to others within the group for is_first.
Similarly, the second CASE statement identifies the last row by checking if the row_number is equal to the total count of rows within the group (using COUNT(*) OVER (PARTITION BY...)).
2. Using first_value() and last_value():
These functions can be used if you only need the values of specific columns for the first and last rows in each group.
SELECT *,
first_value(column_to_get) OVER (PARTITION BY variable1, variable2 ORDER BY sort_column) AS first_value,
last_value(column_to_get) OVER (PARTITION BY variable1, variable2 ORDER BY sort_column) AS last_value
FROM your_data
ORDER BY variable1, variable2, sort_column;
Explanation:
first_value() and last_value() are used within the SELECT clause with a similar windowing definition as before.
They return the value of the specified column_to_get for the first and last rows within each group, respectively.
Choosing the Right Approach:
If you need additional information beyond just identifying the first and last rows (like row numbers), use row_number().
If you only need the values of specific columns for the first and last rows, use first_value() and last_value().
Remember:
Ensure your data is sorted by the BY variable(s) before applying these window functions.
Adjust the windowing specifications (partitioning and ordering) based on your actual requirements.
By leveraging these window functions in Spark SQL, you can effectively achieve functionalities similar to SAS FIRST. and LAST. variables for identifying and processing the first and last observations within groups defined by your BY variable(s).
Scenario
Assume we have a dataset with columns group1, group2, order1, and order2, and we want to identify the first and last occurrence of each group based on the combined ordering of order1 and order2.
SAS Code Example
data example; set example; by group1 group2 order1 order2; if first.group1 then first_group1 = 1; else first_group1 = 0; if last.group1 then last_group1 = 1; else last_group1 = 0; if first.group2 then first_group2 = 1; else first_group2 = 0; if last.group2 then last_group2 = 1; else last_group2 = 0; run;
Equivalent in Spark SQL
To translate this logic into Spark SQL, you can use the ROW_NUMBER function along with window functions to identify the first and last occurrence of each group.
Spark SQL Code Example
Create the DataFrame and Register as a Temporary View:
This step involves creating a DataFrame and registering it as a temporary view for SQL operations.
Define the Window Specification:
The window specification is partitioned by the grouping variables and ordered by the ordering variables.
Identify the First and Last Occurrences:
Use ROW_NUMBER with appropriate ordering to mark the first and last occurrences.
Here’s how you can achieve this:
from pyspark.sql import SparkSession
# Initialize Spark session spark = SparkSession.builder.appName("SAS First Last Equivalent").getOrCreate()
# Register the DataFrame as a temporary view df.createOrReplaceTempView("example")
# Use Spark SQL to identify first and last occurrences sql_query = """ WITH RankedData AS ( SELECT group1, group2, order1, order2, amount, ROW_NUMBER() OVER (PARTITION BY group1 ORDER BY order1, order2) AS rn_group1_asc, ROW_NUMBER() OVER (PARTITION BY group1 ORDER BY order1 DESC, order2 DESC) AS rn_group1_desc, ROW_NUMBER() OVER (PARTITION BY group2 ORDER BY order1, order2) AS rn_group2_asc, ROW_NUMBER() OVER (PARTITION BY group2 ORDER BY order1 DESC, order2 DESC) AS rn_group2_desc FROM example ) SELECT group1, group2, order1, order2, amount, CASE WHEN rn_group1_asc = 1 THEN 1 ELSE 0 END AS first_group1, CASE WHEN rn_group1_desc = 1 THEN 1 ELSE 0 END AS last_group1, CASE WHEN rn_group2_asc = 1 THEN 1 ELSE 0 END AS first_group2, CASE WHEN rn_group2_desc = 1 THEN 1 ELSE 0 END AS last_group2 FROM RankedData """
# Execute the SQL query result_df = spark.sql(sql_query)
# Show the result result_df.show()
Explanation
RankedData Common Table Expression (CTE):
We create a CTE called RankedData where we calculate row numbers for each group based on the specified order.
ROW_NUMBER() OVER (PARTITION BY group1 ORDER BY order1, order2) assigns a row number to each row within group1 ordered by order1 and order2.
ROW_NUMBER() OVER (PARTITION BY group1 ORDER BY order1 DESC, order2 DESC) assigns a reverse row number to identify the last occurrence within group1.
The same logic is applied for group2.
Select with CASE Statements:
We select from the RankedData CTE and use CASE statements to mark the first and last occurrences for each group.
Sample Output
The resulting DataFrame will include flags indicating the first and last occurrences for each group:
SELECT name, dept, salary, DENSE_RANK() OVER (PARTITION BY dept ORDER BY salary ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS dense_rank FROM employees; +-----+-----------+------+----------+ | name| dept|salary|dense_rank| +-----+-----------+------+----------+ | Lisa| Sales| 10000| 1| | Alex| Sales| 30000| 2| | Evan| Sales| 32000| 3| | Fred|Engineering| 21000| 1| | Tom|Engineering| 23000| 2| |Chloe|Engineering| 23000| 2| | Paul|Engineering| 29000| 3| |Helen| Marketing| 29000| 1| | Jane| Marketing| 29000| 1| | Jeff| Marketing| 35000| 2| +-----+-----------+------+----------+
SELECT name, dept, age, CUME_DIST() OVER (PARTITION BY dept ORDER BY age RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cume_dist FROM employees; +-----+-----------+------+------------------+ | name| dept|age | cume_dist| +-----+-----------+------+------------------+ | Alex| Sales| 33|0.3333333333333333| | Lisa| Sales| 35|0.6666666666666666| | Evan| Sales| 38| 1.0| | Paul|Engineering| 23| 0.25| |Chloe|Engineering| 25| 0.75| | Fred|Engineering| 28| 0.25| | Tom|Engineering| 33| 1.0| | Jane| Marketing| 28|0.3333333333333333| | Jeff| Marketing| 38|0.6666666666666666| |Helen| Marketing| 40| 1.0| +-----+-----------+------+------------------+
SELECT name, dept, salary, MIN(salary) OVER (PARTITION BY dept ORDER BY salary) AS min FROM employees; +-----+-----------+------+-----+ | name| dept|salary| min| +-----+-----------+------+-----+ | Lisa| Sales| 10000|10000| | Alex| Sales| 30000|10000| | Evan| Sales| 32000|10000| |Helen| Marketing| 29000|29000| | Jane| Marketing| 29000|29000| | Jeff| Marketing| 35000|29000| | Fred|Engineering| 21000|21000| | Tom|Engineering| 23000|21000| |Chloe|Engineering| 23000|21000| | Paul|Engineering| 29000|21000| +-----+-----------+------+-----+
6.Another Scenario in SAS:- How to translate a SAS scenario using PROC SORT with BY (ordering based on multiple keys) and NODUPKEY (removing duplicate observations) into Spark SQL:
SAS Scenario:
Imagine you have a dataset named sales_data containing columns like customer_id, product_id, date, and sales_amount. You want to:
Sort the data by customer_id, product_id, and date (in ascending order).
Remove any duplicate observations based on the combination of customer_id, product_id, and date.
SAS Code:
proc sort data=sales_data;
by customer_id product_id date;
nodupkey;
run;
Spark SQL Translation:
There are two main approaches to achieve this functionality in Spark SQL:
Approach 1: Using DISTINCT and ORDER BY:
SELECT DISTINCT customer_id, product_id, date, sales_amount
FROM sales_data
ORDER BY customer_id, product_id, date;
Explanation:
DISTINCT: This clause eliminates duplicate rows based on all columns in the SELECT clause (in this case, customer_id, product_id, and date).
ORDER BY: This clause sorts the resulting unique data based on the specified order (ascending for all three columns).
Approach 2: Using ROW_NUMBER() and WHERE:
SELECT customer_id, product_id, date, sales_amount
FROM (
SELECT customer_id, product_id, date, sales_amount,
row_number() OVER (PARTITION BY customer_id, product_id, date ORDER BY date) AS row_num
FROM sales_data
) AS ranked_data
WHERE row_num = 1;
Explanation:
Window Function (row_number):
A subquery assigns a unique row number (row_num) within each group defined by customer_id, product_id, and date.
The ORDER BY date ensures rows are ordered by the date for each customer/product combination.
Filtering Unique Rows:
The outer query filters the ranked data to keep only rows where row_num is 1. This selects the first occurrence for each unique combination of customer_id, product_id, and date.
Choosing the Right Approach:
If your primary goal is sorting and eliminating duplicates efficiently, DISTINCT with ORDER BY might be a simpler solution.
If you need the row number information for further processing, or if the data volume is large and DISTINCT performance becomes an issue, the window function approach (ROW_NUMBER()) might be preferable.
Additional Considerations:
Both approaches achieve the desired outcome of removing duplicate observations based on the specified key combination and sorting the remaining data.
You can adjust the ORDER BY clause to define a different sorting order if needed.
By using either of these approaches in Spark SQL, you can effectively translate the functionality of SAS PROC SORT with BY and NODUPKEY for handling multiple sorting keys and deduplication within your Spark DataFrame.
7.SAS TO SQL
data xyz;set abc;
by xy;
retain x1 y1;
length x1 $400 y1 $100;
if first.xy then do;
x1=' ';
y1=' ';
end;
x1=catx('',trim(x1),x2); y1=catx("",trim(y1),y2);
if last.xy then output;
run;
provided SAS code is performing a group-by operation on xy, retaining values across rows within each group, concatenating those values, and outputting the final concatenated results for each group.
# Define and execute SQL query
sql_query = """
WITH concatenated AS (
SELECT
xy,
COLLECT_LIST(x2) OVER (PARTITION BY xy) AS x2_list,
COLLECT_LIST(y2) OVER (PARTITION BY xy) AS y2_list,
ROW_NUMBER() OVER (PARTITION BY xy ORDER BY xy) AS row_num,
COUNT(*) OVER (PARTITION BY xy) AS count_xy
FROM abc
)
SELECT
xy,
CONCAT_WS('_', x2_list) AS x1,
CONCAT_WS('_', y2_list) AS y1
FROM concatenated
WHERE row_num = count_xy
"""
# Execute the SQL query
result = spark.sql(sql_query)
# Show the results
result.show()
Yup! Scientists find a ‘Unique’ Black Hole that is hungier than ever in the Universe! Scientists have observed a fascinating phenomenon involving a supermassive black hole, AT2022dsb, which appears to be devouring a star in a “tidal disruption event” (TDE).
When a star ventures too close to a black hole, the intense gravitational forces stretch it out in a process known as “spaghettification.”
As the star’s material spirals toward the black hole, it forms a glowing, donut-shaped torus of hot gas around it, resembling a cosmic feeding frenzy.
The Discovery and Unique Nature of This Black Hole
Black holes have long been known as the universe’s “hungry giants,” entities with such immense gravitational pull that not even light can escape them. This recently observed black hole, however, stands out because of the vast amounts of material it’s actively pulling in, including entire stars that stray too close. When a black hole consumes a star, it distorts and stretches it, a process astronomers poetically refer to as “spaghettification.”
The massive gravitational forces rip apart the star, and its matter spirals toward the event horizon, creating bright, swirling bands of light and energy. This radiant display, known as an accretion disk, reveals the black hole’s “feasting” process and provides scientists with clues about its behavior and growth.
Observing the Accretion Disk
The image above shows a vivid portrayal of a black hole in action, illustrating the intense gravitational forces pulling in stellar material. The accretion disk forms as material spirals inward, reaching incredible temperatures and emitting high levels of X-ray radiation that astronomers can observe from Earth.
Cosmic Impact and Significance
The discovery of this “voracious” black hole offers astronomers a unique chance to study black hole behavior more closely, including how they grow and influence their surroundings. By examining the material in the accretion disk, scientists hope to understand more about how black holes evolve over time and how they might impact the galaxies that host them. This discovery reinforces the idea that black holes are not just passive entities; they are dynamic and play a crucial role in the cosmic ecosystem by regulating star formation and influencing galactic structures.
Conclusion
As scientists continue to observe and learn from this remarkable cosmic phenomenon, we gain more insight into one of the most mysterious and powerful forces in the universe. This voracious black hole serves as a powerful reminder of the awe-inspiring and sometimes destructive beauty of our cosmos.
Here’s an enhanced Spark SQL cheatsheet with additional details, covering join types, union types, and set operations like EXCEPT and INTERSECT, along with options for table management (DDL operations like UPDATE, INSERT, DELETE, etc.). This comprehensive sheet is designed to help with quick Spark SQL reference.
Category
Concept
Syntax / Example
Description
Basic Statements
SELECT
SELECT col1, col2 FROM table WHERE condition;
Retrieves specific columns from a table based on a condition.
DISTINCT
SELECT DISTINCT col FROM table;
Returns unique values in the specified column.
LIMIT
SELECT * FROM table LIMIT 10;
Restricts the number of rows returned by a query.
Joins
INNER JOIN
SELECT * FROM t1 JOIN t2 ON t1.id = t2.id;
Returns rows that have matching values in both tables.
LEFT JOIN
SELECT * FROM t1 LEFT JOIN t2 ON t1.id = t2.id;
Returns all rows from the left table and matched rows from the right table; unmatched rows get NULL in columns from the right.
RIGHT JOIN
SELECT * FROM t1 RIGHT JOIN t2 ON t1.id = t2.id;
Returns all rows from the right table and matched rows from the left table; unmatched rows get NULL in columns from the left.
FULL OUTER JOIN
SELECT * FROM t1 FULL JOIN t2 ON t1.id = t2.id;
Returns rows when there is a match in either left or right table, including unmatched rows.
CROSS JOIN
SELECT * FROM t1 CROSS JOIN t2;
Returns the Cartesian product of the two tables.
Set Operations
UNION
SELECT * FROM t1 UNION SELECT * FROM t2;
Combines result sets from multiple queries, removing duplicates by default.
UNION ALL
SELECT * FROM t1 UNION ALL SELECT * FROM t2;
Combines result sets from multiple queries without removing duplicates.
INTERSECT
SELECT * FROM t1 INTERSECT SELECT * FROM t2;
Returns only the rows present in both queries.
EXCEPT
SELECT * FROM t1 EXCEPT SELECT * FROM t2;
Returns rows present in the first query but not in the second query.
EXCEPT ALL
SELECT * FROM t1 EXCEPT ALL SELECT * FROM t2;
Returns all rows in the first query that aren’t in the second, including duplicates.
Table Management
CREATE TABLE
CREATE TABLE table_name (id INT, name STRING);
Creates a new table with specified columns and data types.
DESCRIBE
DESCRIBE TABLE table_name;
Shows the structure and metadata of a table.
ALTER TABLE
ALTER TABLE table_name ADD COLUMNS (age INT);
Adds columns or modifies a table’s structure.
DROP TABLE
DROP TABLE IF EXISTS table_name;
Deletes a table if it exists.
TRUNCATE TABLE
TRUNCATE TABLE table_name;
Removes all rows from a table without deleting the table structure.
INSERT INTO
INSERT INTO table_name VALUES (1, 'name');
Adds new rows to a table.
INSERT OVERWRITE
INSERT OVERWRITE table_name SELECT * FROM other_table;
Replaces existing data in a table with the results of a query.
UPDATE
UPDATE table_name SET col = 'value' WHERE condition;
Updates specific columns based on a condition (SQL-style syntax may vary by environment).
DELETE
DELETE FROM table_name WHERE condition;
Deletes specific rows based on a condition (available in Delta tables, SQL-style syntax).
Window Functions
row_number()
ROW_NUMBER() OVER (PARTITION BY col ORDER BY col2 DESC)
Assigns a unique number to each row within a partition.
rank()
RANK() OVER (PARTITION BY col ORDER BY col2 DESC)
Assigns a rank to rows within a partition based on specified column(s).
lead(), lag()
LEAD(col) OVER (ORDER BY col2)
Accesses data from the following or preceding row.
Data Manipulation Functions
withColumn()
df.withColumn("newCol", df.oldCol + 1)
Adds or replaces a column with the specified expression.
withColumnRenamed()
df.withColumnRenamed("oldName", "newName")
Renames a column.
selectExpr()
df.selectExpr("col AS newCol", "col2 + 1")
Selects columns or expressions using SQL syntax.
String Functions
concat()
SELECT concat(col1, col2) FROM table;
Concatenates strings from multiple columns.
substring()
SELECT substring(col, 1, 5) FROM table;
Extracts a substring from a string column.
lower() / upper()
SELECT lower(col) FROM table;
Converts all characters in a string to lowercase or uppercase.
Date and Time Functions
current_date()
SELECT current_date();
Returns the current date.
datediff()
SELECT datediff(end_date, start_date) FROM table;
Returns the difference in days between two dates.
year(), month(), day()
SELECT year(col) FROM table;
Extracts parts of a date.
Aggregate Functions
collect_list()
SELECT collect_list(col) FROM table;
Aggregates values into a list for each group.
collect_set()
SELECT collect_set(col) FROM table;
Aggregates values into a unique set for each group.
avg(), sum(), count()
SELECT sum(col), count(col) FROM table GROUP BY group_col;
Performs aggregation functions like averaging, summing, or counting.
Optimization Techniques
cache()
df.cache()
Caches the DataFrame in memory to optimize performance on repeated actions.
repartition()
df.repartition(4, "col")
Redistributes data across partitions for load balancing.
broadcast()
broadcast(df)
Optimizes joins by broadcasting smaller DataFrames to all nodes.
Predicate Pushdown
spark.sql("SELECT * FROM table WHERE col = 'value'")
Pushes filters down to the data source, reducing data scanned.
UDFs
Register UDF
spark.udf.register("addOne", lambda x: x + 1)
Registers a custom Python function as a UDF.
Using a UDF
SELECT addOne(col) FROM table;
Applies a UDF to a column in Spark SQL.
Schema Management
printSchema()
df.printSchema()
Displays the schema of a DataFrame.
schema
df.schema
Returns the schema as a StructType object.
Schema Merge
spark.read.option("mergeSchema", "true")
Merges schemas when reading from multiple files.
Complex Types
Arrays
ARRAY<int>
Defines an array type, e.g., ARRAY<int>.
Struct
STRUCT<name: STRING, age: INT>
Defines a nested structure.
Miscellaneous
monotonically_increasing_id()
SELECT monotonically_increasing_id() AS id FROM table;
Generates unique IDs for rows.
input_file_name()
SELECT input_file_name() FROM table;
Retrieves the file name associated with each row.
coalesce()
SELECT coalesce(col1, col2) FROM table;
Returns the first non-null value from the specified columns.
Concept
Description
Syntax/Example
Basic Select
Retrieves data from a table.
SELECT column1, column2 FROM table;
WHERE Clause
Filters records based on conditions.
SELECT * FROM table WHERE condition;
Aggregations
Summarizes data (e.g., SUM, COUNT, AVG).
SELECT SUM(column) FROM table GROUP BY column2;
Window Functions
Performs calculations across rows, like cumulative sums, rank, or row numbers.
SELECT column, SUM(value) OVER (PARTITION BY column ORDER BY date) AS running_total FROM table;
Joins
Combines rows from two or more tables based on a related column.
SELECT * FROM table1 JOIN table2 ON table1.id = table2.id;
Subqueries
Nested queries for complex operations or transformations.
SELECT * FROM (SELECT column1 FROM table WHERE condition);
CTE (WITH Clause)
Temporary result set for improved readability and reuse in complex queries.
WITH temp AS (SELECT column FROM table) SELECT * FROM temp WHERE condition;
UNION/UNION ALL
Combines results from multiple SELECT statements.
SELECT column FROM table1 UNION SELECT column FROM table2;
Pivot
Converts rows into columns for specified values (aggregate with columns).
SELECT * FROM (SELECT * FROM table) PIVOT (SUM(value) FOR column IN (‘value1’, ‘value2’));
Unpivot
Converts columns into rows, useful for restructuring wide tables.
SELECT * FROM table UNPIVOT (value FOR name IN (col1, col2, col3));
Views
Virtual table based on a SELECT query, allows simplified access.
CREATE VIEW view_name AS SELECT column FROM table;
Temporary Tables
Temporary storage for session-specific tables, deleted when the session ends.
CREATE TEMPORARY TABLE temp_table AS SELECT * FROM table;
Group By with HAVING
Groups data by specified columns and applies conditions to aggregated data.
SELECT column, COUNT(*) FROM table GROUP BY column HAVING COUNT(*) > value;
Case Statements
Conditional logic within SQL queries for creating calculated columns.
SELECT column, CASE WHEN condition THEN ‘result1’ ELSE ‘result2’ END AS new_column FROM table;
Window Frame
Specifies the range for window functions, often cumulative or sliding (rows between clauses).
SUM(column) OVER (ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
Aggregate column values into a list for each group, useful in tracking customer purchase history: SELECT customer_id, collect_list(product) FROM purchases GROUP BY customer_id;
concat()
Concatenate multiple address fields into one formatted address: SELECT concat(street, ', ', city, ', ', zip) AS address FROM addresses;
row_number()
Number rows within each group, useful for ranking: SELECT *, row_number() OVER (PARTITION BY category ORDER BY sales DESC) AS rank FROM sales_data;
date_add()
Calculate future dates, such as a payment due date: SELECT order_id, date_add(order_date, 30) AS due_date FROM orders;
when() and coalesce()
Assign risk categories while handling nulls: SELECT customer_id, when(age > 60, 'high').when(age > 30, 'medium').otherwise('low') AS risk, coalesce(income, 0) AS income FROM customers;
array_contains()
Filter for specific tags in an array column: SELECT * FROM posts WHERE array_contains(tags, 'pyspark');
explode()
Expand array items into individual rows: SELECT order_id, explode(items) AS item FROM orders;
Conditional aggregation in Spark SQL:
Here’s an example of conditional aggregation in Spark SQL:
SELECT
SUM(CASE WHEN age > 30 THEN 1 ELSE 0 END) AS count_over_30,
SUM(CASE WHEN age <= 30 THEN 1 ELSE 0 END) AS count_under_30
FROM
customers;
In this example, we’re using a CASE statement to conditionally sum the values. If the age is greater than 30, we sum 1, otherwise we sum 0.
Using IF Function
Alternatively, you can use the IF function:
SELECT
SUM(IF(age > 30, 1, 0)) AS count_over_30,
SUM(IF(age <= 30, 1, 0)) AS count_under_30
FROM
customers;
Spark SQL commands to manage views
Here are the Spark SQL commands to manage views:
Creating Views
1. Creating Virtual Views
CREATE VIEW my_view AS SELECT * FROM my_table;
2. Creating Temporary Views
CREATE TEMPORARY VIEW my_temp_view AS SELECT * FROM my_table;
3. Creating or Replacing Temporary Views
CREATE OR REPLACE TEMPORARY VIEW my_temp_view AS SELECT * FROM my_table;
Deleting Views
1. Deleting Temporary Views if Exists
DROP VIEW IF EXISTS my_temp_view;
Checking Views
1. Checking Views DDL
DESCRIBE FORMATTED my_view;
2. Checking Extended Query
EXPLAIN EXTENDED SELECT * FROM my_view;
3. Checking in Spark Catalog
SHOW TABLES IN my_database; // lists all tables and views
SHOW VIEWS IN my_database; // lists only views
Note:
CREATE VIEW creates a virtual view, which is a stored query that doesn’t store data.
CREATE TEMPORARY VIEW creates a temporary view that is only available in the current Spark session.
CREATE OR REPLACE TEMPORARY VIEW creates or replaces a temporary view.
DROP VIEW IF EXISTS deletes a view if it exists.
DESCRIBE FORMATTED shows the DDL of a view.
EXPLAIN EXTENDED shows the extended query plan of a view.
SHOW TABLES and SHOW VIEWS list tables and views in the Spark catalog.
Here are the different types of joins in Spark SQL, along with examples:
1. Inner Join
Returns only the rows that have a match in both tables.
SELECT *
FROM table1
INNER JOIN table2
ON table1.id = table2.id;
2. Left Outer Join (or Left Join)
Returns all the rows from the left table and the matching rows from the right table. If there’s no match, the result will contain NULL values.
SELECT *
FROM table1
LEFT JOIN table2
ON table1.id = table2.id;
3. Right Outer Join (or Right Join)
Similar to the left outer join, but returns all the rows from the right table and the matching rows from the left table.
SELECT *
FROM table1
RIGHT JOIN table2
ON table1.id = table2.id;
4. Full Outer Join (or Full Join)
Returns all the rows from both tables, with NULL values in the columns where there are no matches.
SELECT *
FROM table1
FULL OUTER JOIN table2
ON table1.id = table2.id;
5. Semi Join
Returns only the rows from the left table that have a match in the right table.
SELECT *
FROM table1
JOIN table2
ON table1.id = table2.id;
6. Anti Join
Returns only the rows from the left table that do not have a match in the right table.
SELECT *
FROM table1
LEFT ANTI JOIN table2
ON table1.id = table2.id;
7. Cross Join
Returns the Cartesian product of both tables.
SELECT *
FROM table1
CROSS JOIN table2;
Note: Spark SQL also supports using the USING clause to specify the join condition, like this:
Here’s a categorized Spark SQL function reference, which organizes common Spark SQL functions by functionality. This can help with selecting the right function based on the operation you want to perform.
1. Aggregate Functions
Function
Description
Example
avg()
Calculates the average value.
SELECT avg(age) FROM table;
count()
Counts the number of rows.
SELECT count(*) FROM table;
max()
Finds the maximum value.
SELECT max(salary) FROM table;
min()
Finds the minimum value.
SELECT min(age) FROM table;
sum()
Calculates the sum of a column.
SELECT sum(salary) FROM table;
stddev()
Calculates the standard deviation.
SELECT stddev(salary) FROM table;
variance()
Calculates the variance.
SELECT variance(salary) FROM table;
2. Analytic Functions
Function
Description
Example
row_number()
Assigns a unique number to each row in a window.
ROW_NUMBER() OVER (PARTITION BY city)
rank()
Assigns a rank to each row in a partition.
RANK() OVER (ORDER BY salary DESC)
dense_rank()
Similar to rank but without gaps.
DENSE_RANK() OVER (ORDER BY age ASC)
ntile(n)
Divides rows into n buckets.
NTILE(4) OVER (ORDER BY age)
lead()
Accesses a row after the current row.
LEAD(salary, 1) OVER (ORDER BY age)
lag()
Accesses a row before the current row.
LAG(salary, 1) OVER (ORDER BY age)
3. String Functions
Function
Description
Example
concat()
Concatenates multiple strings.
SELECT concat(first_name, last_name) FROM table;
substring()
Extracts a substring from a string.
SELECT substring(name, 1, 3) FROM table;
length()
Returns the length of a string.
SELECT length(name) FROM table;
lower()
Converts string to lowercase.
SELECT lower(name) FROM table;
upper()
Converts string to uppercase.
SELECT upper(name) FROM table;
trim()
Trims spaces from both ends of a string.
SELECT trim(name) FROM table;
replace()
Replaces a substring within a string.
SELECT replace(name, 'a', 'b') FROM table;
split()
Splits a string into an array.
SELECT split(email, '@') FROM table;
4. Date and Time Functions
Function
Description
Example
current_date()
Returns the current date.
SELECT current_date();
current_timestamp()
Returns the current timestamp.
SELECT current_timestamp();
datediff()
Returns difference in days between two dates.
SELECT datediff(date1, date2) FROM table;
year(), month(), day()
Extracts year, month, day from date.
SELECT year(birthdate) FROM table;
date_add()
Adds days to a date.
SELECT date_add(date, 10) FROM table;
date_sub()
Subtracts days from a date.
SELECT date_sub(date, 10) FROM table;
to_date()
Converts string to date.
SELECT to_date(string_date) FROM table;
to_timestamp()
Converts string to timestamp.
SELECT to_timestamp(string_timestamp) FROM table;
5. Mathematical Functions
Function
Description
Example
abs()
Returns absolute value.
SELECT abs(-10) FROM table;
ceil()
Rounds up to the nearest integer.
SELECT ceil(salary) FROM table;
floor()
Rounds down to the nearest integer.
SELECT floor(salary) FROM table;
round()
Rounds to a specified number of decimal places.
SELECT round(salary, 2) FROM table;
sqrt()
Returns the square root.
SELECT sqrt(age) FROM table;
pow()
Returns a number raised to a power.
SELECT pow(salary, 2) FROM table;
exp()
Returns e^x (exponential).
SELECT exp(age) FROM table;
log()
Returns the logarithm of a number.
SELECT log(salary) FROM table;
6. Array Functions
Function
Description
Example
array()
Creates an array from multiple values.
SELECT array('a', 'b', 'c');
size()
Returns the number of elements in an array.
SELECT size(array_column) FROM table;
array_contains()
Checks if an array contains a specified value.
SELECT array_contains(array_column, 'value') FROM table;
explode()
Creates a new row for each element in the array.
SELECT explode(array_column) FROM table;
sort_array()
Sorts the elements of an array in ascending order.
SELECT sort_array(array_column) FROM table;
array_distinct()
Removes duplicate values from an array.
SELECT array_distinct(array_column) FROM table;
7. Conditional Functions
Function
Description
Example
when()
Conditional expression that works like an if-else.
SELECT when(age > 18, 'adult').otherwise('minor') FROM table;
coalesce()
Returns the first non-null value.
SELECT coalesce(col1, col2) FROM table;
ifnull()
Returns the second value if the first is null.
SELECT ifnull(col1, 'unknown') FROM table;
nullif()
Returns NULL if the two values are equal.
SELECT nullif(col1, col2) FROM table;
nvl()
Replaces NULL with a specified value.
SELECT nvl(col, 'default') FROM table;
8. Miscellaneous Functions
Function
Description
Example
lit()
Converts a literal value to a column.
SELECT lit(1) FROM table;
rand()
Generates a random number between 0 and 1.
SELECT rand() FROM table;
monotonically_increasing_id()
Returns a unique ID for each row.
SELECT monotonically_increasing_id() FROM table;
input_file_name()
Returns the file name of the source data.
SELECT input_file_name() FROM table;
This categorized list provides a quick reference for Spark SQL functions based on what kind of operation they perform, making it useful for development and troubleshooting in Spark SQL queries.
The collect_list() function is categorized under Aggregate Functions in Spark SQL. It aggregates data by collecting values into a list within each group, without removing duplicates. Here’s a quick example and then an exploration of some interesting, complex use cases for various functions, including collect_list().
1. Advanced Use Cases of Aggregate Functions
collect_list()
Function
Example
Description
collect_list()
SELECT id, collect_list(name) FROM table GROUP BY id;
Collects all values of a column into a list for each group, preserving duplicates.
Complex Use Case: If you have a dataset where each customer has multiple orders, and you want to get a list of order dates for each customer:
SELECT customer_id, collect_list(order_date) AS order_dates
FROM orders
GROUP BY customer_id;
This is useful in generating lists of values within groups, such as viewing the product purchase history of customers or tracking all updates to a particular row over time.
2. String Functions with Complex Use Cases
concat() and replace()
Function
Example
Description
concat()
SELECT concat(city, ', ', state) AS location FROM table;
Joins multiple columns or strings together.
replace()
SELECT replace(phone, '-', '') AS phone_no FROM table;
Replaces parts of a string based on a pattern.
Complex Use Case: Concatenating multiple address fields to form a single address and cleaning up data with replace():
SELECT concat(street, ', ', city, ', ', replace(zip, '-', '')) AS full_address
FROM addresses;
This would be useful for standardizing or preparing addresses for mailing systems by merging fields and removing unnecessary characters.
3. Analytic Functions with Interesting Examples
row_number(), rank(), lead(), and lag()
Function
Example
Description
row_number()
ROW_NUMBER() OVER (PARTITION BY city ORDER BY age DESC)
Assigns a unique number to each row in a window.
lead()
LEAD(salary, 1) OVER (ORDER BY age)
Retrieves the next row’s value in the current row’s column.
lag()
LAG(salary, 1) OVER (ORDER BY age)
Retrieves the previous row’s value in the current row’s column.
Complex Use Case: Track sales growth over time by calculating month-over-month difference in sales:
SELECT month, sales,
sales - LAG(sales, 1) OVER (ORDER BY month) AS sales_diff
FROM sales_data;
This lets you analyze trends or identify dips and peaks in performance by using the previous row’s data directly.
4. Date and Time Functions for Advanced Operations
date_add(), datediff(), year()
Function
Example
Description
date_add()
SELECT date_add(order_date, 30) AS due_date FROM table;
Adds a specific number of days to a date.
datediff()
SELECT datediff(due_date, order_date) AS days_to_ship FROM table;
Calculates the difference in days between two dates.
year(), month()
SELECT year(birthdate) AS birth_year FROM table;
Extracts parts of a date.
Complex Use Case: Calculate the monthly retention rate by finding customers who ordered in consecutive months:
SELECT customer_id,
month(order_date) AS month,
count(*) AS orders_this_month,
LEAD(month(order_date), 1) OVER (PARTITION BY customer_id ORDER BY order_date) - month(order_date) = 1 AS retained
FROM orders
GROUP BY customer_id, month;
This example is powerful for retention analysis, determining if customers return month after month.
5. Array Functions for Multi-Value Column Manipulation
array(), array_contains(), explode()
Function
Example
Description
array()
SELECT array(name, email) AS contact FROM table;
Combines multiple columns into an array.
array_contains()
SELECT array_contains(tags, 'Spark') AS has_spark FROM table;
Checks if an array contains a value.
explode()
SELECT id, explode(items) AS item FROM orders;
Expands array elements into individual rows.
Complex Use Case: Splitting tags into individual rows for better indexing and searching:
SELECT post_id, explode(tags) AS tag
FROM blog_posts;
This approach can help with filtering, analytics, or search functionalities in scenarios where each tag or attribute of an item needs to be analyzed individually.
6. Conditional Functions for Complex Case Logic
when(), coalesce(), ifnull()
Function
Example
Description
when()
SELECT when(age > 18, 'adult').otherwise('minor') AS category FROM table;
Implements conditional logic similar to if-else.
coalesce()
SELECT coalesce(col1, col2) AS result FROM table;
Returns the first non-null value from a list of columns.
ifnull()
SELECT ifnull(name, 'unknown') FROM table;
Replaces NULL values with a specified value.
Complex Use Case: Assign customers to risk categories based on age and income, while handling null values with coalesce():
SELECT customer_id,
when(age >= 60, 'high risk')
.when(age >= 30, 'medium risk')
.otherwise('low risk') AS risk_category,
coalesce(income, 0) AS income
FROM customers;
This is especially useful in customer segmentation, where you may need to categorize data based on multiple conditions and handle null values simultaneously.
7. Miscellaneous Functions for Unique Use Cases
monotonically_increasing_id(), input_file_name()
Function
Example
Description
monotonically_increasing_id()
SELECT monotonically_increasing_id() AS id FROM table;
Generates unique IDs for rows without requiring a primary key.
input_file_name()
SELECT input_file_name() FROM table;
Returns the file name from which a row originated.
Complex Use Case: Track records to their source file in an ETL process, useful in error tracking:
SELECT *, input_file_name() AS source_file
FROM data_files;
This helps you trace errors back to their data origin, making it valuable in complex ETL jobs involving multiple data sources.
These advanced use cases highlight the flexibility and power of Spark SQL functions when applied to complex data manipulation, making it easier to handle various analytical scenarios and transformations.
CRUD stands for Create, Read, Update, and Delete. It’s a set of basic operations that are essential for managing data in a database or any persistent storage system.
It refers to the four basic functions that any persistent storage application needs to perform. Persistent storage refers to data storage that retains information even after the device is powered off, unlike RAM which loses data when power is lost. Examples of persistent storage include hard drives and solid-state drives.
Here’s a breakdown of each CRUD operation:
Create: This operation allows you to add new data to the storage system.
Read: This operation allows you to retrieve existing data from the storage system.
Update: This operation allows you to modify existing data in the storage system.
Delete: This operation allows you to remove data from the storage system.
CRUD is fundamental for various applications, including:
Databases: CRUD operations are the foundation of working with data in relational and NoSQL databases.
APIs: When you build APIs, CRUD represents the essential functionalities for managing resources.
User Interfaces: Many interfaces use CRUD functionalities behind the scenes to allow users to view, edit, and delete information through forms and reports.
Overall, CRUD provides a simple and effective way to understand the core data manipulation operations in computer programming.with appropriate privileges to create or delete databases, especially in production environments. Make sure you have the necessary permissions before attempting these operations.
CReate
Create Table
The CREATE TABLE statement is used to define a new table in a database. It follows this general syntax:
CREATE TABLE: This keyword initiates the table creation process.
table_name: This is a chosen name that identifies your table.
column1, column2, etc.: These represent the individual columns within the table, each holding a specific type of data.
data_type: This specifies the kind of data each column can store. Examples include int for integers, varchar for variable-length text, and date for storing dates.
constraint1, constraint2, etc.: These are optional clauses that define rules for the data within a column. Common constraints include NOT NULL to ensure a value must be present, and PRIMARY KEY to uniquely identify each row.
The data types of columns or fields may vary from one database system to another. For example, NUMBER is supported in Oracle database for integer values whereas INT is supported in MySQL.
the CREATE TABLE syntax is available in the Data Definition Language (DDL) subset
Example 1: Creating a Customers Table
Let’s create a table named Customers to store customer information:
SQL
CREATE TABLE Customers (
customer_id int NOT NULL PRIMARY KEY,
first_name varchar(50) NOT NULL,
last_name varchar(50) NOT NULL,
email varchar(100) UNIQUE,
phone_number varchar(20)
);
Explanation of the Example:
This code creates a table named Customers.
It has five columns:
customer_id: Stores a unique integer identifier for each customer (primary key).
first_name and last_name: Stores customer’s first and last name (not null).
email: Stores the customer’s email address (unique).
phone_number: Stores the customer’s phone number (optional).
Example 2: Creating a Products Table
Here’s another example for a Products table:
SQL
CREATE TABLE Products (
product_id int NOT NULL AUTO_INCREMENT PRIMARY KEY,
name varchar(255) NOT NULL,
price decimal(10,2) NOT NULL,
description text,
stock int DEFAULT 0
);
Explanation of the Example:
This code creates a table named Products.
It has five columns:
product_id: Stores a unique integer identifier for each product (primary key with auto-increment).
name: Stores the product name (not null).
price: Stores the product’s price (decimal with two decimal places, not null).
description: Stores a textual description of the product (optional).
stock: Stores the current stock level (default value 0).
These are just a couple of examples. You can create tables to store any kind of data following this structure and define constraints based on your specific needs.
Creating data in a database using CRUD operations
When it comes to creating data in a database using CRUD operations (Create, Read, Update, Delete), there are several ways to perform the “Create” operation, depending on the specific requirements of the application and the capabilities of the database management system (DBMS) being used. Here are some common types of create operations:
Single Record Insertion:
This is the simplest form of the create operation, where a single record is inserted into a table.
Example INSERT INTO users (name, email) VALUES ('John Doe', 'john@example.com');
Bulk Insertion:
Bulk insertion involves inserting multiple records into a table in a single operation.
It is often more efficient than inserting records one by one, especially when dealing with large datasets.
Example INSERT INTO users (name, email) VALUES ('Alice', 'alice@example.com'), ('Bob', 'bob@example.com'), ('Charlie', 'charlie@example.com');
Insertion from Another Table:
Data can be inserted into a table by selecting records from another table and inserting them into the target table.
This is useful for copying data or transforming data from one table to another.
Example INSERT INTO new_users (name, email) SELECT name, email FROM old_users WHERE age > 18;
Insertion with Default Values:
If certain columns have default values defined, those values are automatically used during insertion if no explicit value is provided.
Example INSERT INTO users (name) VALUES ('Jane');
Assuming ’email’ column has a default value defined, it will be used during insertion.
Insertion with Generated Values:
Some databases support generated values for columns, such as auto-incrementing primary keys or UUIDs.
These values are automatically generated by the database system during insertion.
Example (using SQL with auto-incrementing primary key): INSERT INTO users (name) VALUES ('Jane');
Insertion Using ORM (Object-Relational Mapping):
In object-oriented programming, ORM frameworks abstract away the database operations and allow developers to create and manipulate objects instead of writing SQL queries directly.
Example (using an ORM like SQLAlchemy in Python): user = User(name='John', email='john@example.com') db.session.add(user) db.session.commit()
Read
For the “Read” operation, it involves retrieving data from the database. Here are common types of read operations:
Selecting All Records:
Retrieve all records from a table.
Example SELECT * FROM users;
Selecting Specific Columns:
Retrieve specific columns from a table.
Example SELECT name, email FROM users;
Filtering with WHERE Clause:
Retrieve records that meet specific conditions using the WHERE clause.
Example SELECT * FROM users WHERE age > 18;
Sorting with ORDER BY Clause:
Retrieve records sorted in ascending or descending order based on one or more columns.
Example SELECT * FROM users ORDER BY name ASC;
Limiting Results with LIMIT Clause:
Retrieve a limited number of records from the result set.
Example SELECT * FROM users LIMIT 10;
Joining Tables:
Retrieve data from multiple tables by joining them based on related columns.
Example SELECT users.name, orders.order_date FROM users INNER JOIN orders ON users.id = orders.user_id;
Aggregation Functions:
Retrieve aggregated data using functions like COUNT, SUM, AVG, MIN, MAX, etc.
Example SELECT COUNT(*) FROM users; SELECT AVG(age) FROM users WHERE country = 'USA';
Grouping Results with GROUP BY Clause:
Group rows that have the same values into summary rows.
Example SELECT country, COUNT(*) FROM users GROUP BY country;
Subqueries:
Retrieve data from a subquery and use it in the main query.
Example SELECT name, email FROM users WHERE age > (SELECT AVG(age) FROM users);
These are some common types of read operations used to retrieve data from a database using CRUD operations. The appropriate method to use depends on the specific requirements and constraints of the application.
***Create & Delete Database:-
Creating and deleting databases can vary slightly depending on the specific database management system (DBMS) you are using. Let us see some examples for MySQL and PostgreSQL, two popular relational database management systems.
1. MySQL Example:
Creating a Database:
CREATE DATABASE my_database;
Deleting a Database:
DROP DATABASE my_database;
2. PostgreSQL Example:
Creating a Database:
CREATE DATABASE my_database;
Deleting a Database:
DROP DATABASE my_database;
In both examples:
my_database is the name of the database you want to create or delete.
CREATE DATABASE is the SQL command used to create a new database.
DROP DATABASE is the SQL command used to delete an existing database.
It’s important to exercise caution when using DROP DATABASE because it permanently deletes the database and all of its contents.
Always double-check before executing this command to avoid accidental data loss.
Additionally, in practice, you may need to authenticate
In Spark SQL, CRUD (Create, Read, Update, Delete) operations
In Spark SQL, CRUD (Create, Read, Update, Delete) operations allow you to manage data similarly to traditional SQL databases. Here’s how CRUD can be implemented in Spark SQL:
1. Create (INSERT)
To create data in Spark SQL, you can use the CREATE TABLE command to define the structure of a table and then insert records with INSERT INTO or INSERT OVERWRITE.
Operation
Command
Example
CREATE TABLE
Defines a new table schema.
CREATE TABLE employees (id INT, name STRING, age INT, department STRING);
INSERT INTO
Adds rows to an existing table without replacing existing data.
INSERT INTO employees VALUES (1, 'Alice', 30, 'Engineering');
INSERT OVERWRITE
Replaces all data in the table with new data.
INSERT OVERWRITE TABLE employees SELECT * FROM new_data;
2. Read (SELECT)
The SELECT command is used to query data from tables. Spark SQL supports filtering, aggregating, sorting, and joining data through the SELECT statement.
Operation
Command
Example
SELECT
Retrieves specific columns or rows based on conditions.
SELECT name, department FROM employees WHERE age > 25;
JOIN
Combines data from multiple tables based on a key.
SELECT e.name, d.name AS dept FROM employees e JOIN departments d ON e.department = d.id;
GROUP BY, ORDER BY
Groups and sorts data as part of the query.
SELECT department, AVG(age) FROM employees GROUP BY department ORDER BY AVG(age) DESC;
3. Update
Updating records directly is available in Spark SQL when using Delta Lake. Delta Lake provides ACID transactions, allowing you to modify rows in a table.
Operation
Command
Example
UPDATE
Modifies existing rows in a table (Delta tables).
UPDATE employees SET age = 31 WHERE id = 1;
Note: The UPDATE operation is only supported in Delta Lake tables. Standard Spark tables do not support direct row updates.
4. Delete
Similar to updates, deleting records is supported in Delta Lake for Spark SQL. The DELETE statement removes rows that match a specified condition.
Operation
Command
Example
DELETE
Deletes specific rows based on a condition (Delta tables).
DELETE FROM employees WHERE age < 25;
Note: Like UPDATE, DELETE operations require Delta Lake tables.
Additional Delta Lake Operations for CRUD
Delta Lake tables allow Spark SQL to perform row-level operations typically found in traditional SQL databases:
Command
Description
Example
MERGE INTO
Merges updates, inserts, and deletes based on a condition.
MERGE INTO employees AS e USING updates AS u ON e.id = u.id WHEN MATCHED THEN UPDATE SET e.age = u.age WHEN NOT MATCHED THEN INSERT (id, name, age, department) VALUES (u.id, u.name, u.age, u.department);
Upsert (Insert/Update)
Inserts new records and updates existing ones based on matching criteria (using MERGE).
Same as the above example using MERGE INTO.
Example: CRUD Workflow with Delta Lake
Here’s an example of how CRUD operations can be applied in a Delta Lake table:
-- Create Delta Lake table
CREATE TABLE delta.`/path/to/delta/employees` (id INT, name STRING, age INT, department STRING) USING delta;
-- Insert data
INSERT INTO delta.`/path/to/delta/employees` VALUES (1, 'Alice', 30, 'Engineering');
-- Read data
SELECT * FROM delta.`/path/to/delta/employees` WHERE department = 'Engineering';
-- Update data
UPDATE delta.`/path/to/delta/employees` SET age = 31 WHERE id = 1;
-- Delete data
DELETE FROM delta.`/path/to/delta/employees` WHERE age < 25;
These commands provide a comprehensive way to perform CRUD operations in Spark SQL using Delta Lake for full ACID compliance, enabling more robust data management within Spark.
Comparative overview of partitions, bucketing, segmentation, and broadcasting in PySpark, Spark SQL, and Hive QL in tabular form, along with examples
Here’s a comparative overview of partitions, bucketing, segmentation, and broadcasting in PySpark, Spark SQL, and Hive QL in tabular form, along with examples:
Concept
PySpark
Spark SQL
Hive QL
Partitions
df.repartition(numPartitions, "column") creates partitions based on specified column.
CREATE TABLE table_name PARTITIONED BY (col1 STRING) allows data to be organized by partition.
ALTER TABLE table_name ADD PARTITION (col1='value') adds a new partition to an existing table.
Bucketing
df.write.bucketBy(numBuckets, "column").saveAsTable("table_name") for distributing data.
CREATE TABLE table_name CLUSTERED BY (col1) INTO numBuckets BUCKETS for bucketing data in a table.
CREATE TABLE table_name CLUSTERED BY (col1) INTO numBuckets BUCKETS for bucketing strategy.
Segmentation
Segmentation done by filtering DataFrames based on specific criteria, e.g., df.filter(df.col > 1).
SELECT * FROM table WHERE col > value for segmenting data based on specific criteria.
SELECT * FROM table WHERE col > value for segmentation in queries.
Broadcasting
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", size) for broadcast joins.
SELECT /*+ BROADCAST(t2) */ * FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key for broadcast hint.
SET hive.auto.convert.join = true allows automatic broadcasting of smaller tables during joins.
Examples:
Partitions: In PySpark, if you have a DataFrame df, you can repartition it as follows: df = df.repartition(4, "column_name")
Bucketing: When bucketing in PySpark: df.write.bucketBy(5, "column_name").saveAsTable("bucketed_table")
Broadcasting: Broadcasting a DataFrame for optimization: from import SparkSession spark = SparkSession.builder.getOrCreate() spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10485760) # 10MB
This table provides a concise yet comprehensive guide to partitioning, bucketing, segmentation, and broadcasting across different frameworks.
We can use the NTILE window function to create buckets within a dataset in both PySpark and Spark SQL. The NTILE(n) function divides the result set into n buckets or groups, assigning a bucket number to each row. Here’s how you can apply it:
PySpark Example
from pyspark.sql import Window
from pyspark.sql import functions as F
windowSpec = Window.orderBy("column_name")
df_with_buckets = df.withColumn("bucket", F.ntile(4).over(windowSpec))
Spark SQL Example
SELECT *, NTILE(4) OVER (ORDER BY column_name) AS bucket
FROM table_name;
This creates four buckets based on the ordering of column_name. Each row is assigned a bucket number from 1 to 4.
Important concepts in the PySpark DataFrame API / Spark SQL
Here’s the updated comprehensive overview of important concepts in the PySpark DataFrame API, now including options for showing DataFrame content, schema, and columns:
Category
Function
Description
Example
Data Reading
spark.read.csv()
Read a CSV file into a DataFrame.
df = spark.read.csv("path/to/file.csv")
spark.read.json()
Read a JSON file into a DataFrame.
df = spark.read.json("path/to/file.json")
spark.read.parquet()
Read a Parquet file into a DataFrame.
df = spark.read.parquet("path/to/file.parquet")
DataFrame Creation
createDataFrame()
Create a DataFrame from an RDD or a list.
df = spark.createDataFrame(data, schema)
from_records()
Create a DataFrame from structured data (like a list of tuples).
Displays the first n rows of the DataFrame. Defaults to 20.
df.show(5)
show(truncate=True)
Displays all columns but truncates long strings.
df.show(truncate=False)
printSchema()
Prints the schema of the DataFrame, showing column names and types.
df.printSchema()
df.columns
Returns a list of column names in the DataFrame.
columns = df.columns
count()
Count the number of rows in the DataFrame.
row_count = df.count()
collect()
Retrieve all rows from the DataFrame as a list.
data = df.collect()
Optimization
persist()
Store DataFrame in memory/disk for re-use.
df.persist()
cache()
Cache the DataFrame in memory.
df.cache()
repartition()
Change the number of partitions of the DataFrame.
df.repartition(4)
coalesce()
Reduce the number of partitions without a full shuffle.
df.coalesce(2)
This table now includes options for showing DataFrame content, schema, and columns along with various other functionalities in the PySpark DataFrame API.
Here’s a cheat sheet for Spark SQL with common queries, functionalities, and examples:
Category
SQL Command
Description
Example
Data Reading
SELECT * FROM table
Retrieve all columns from a table.
SELECT * FROM employees
FROM
Specify the table from which to select data.
SELECT name, age FROM employees
Data Filtering
WHERE
Filter rows based on a condition.
SELECT * FROM employees WHERE age > 30
Data Transformation
SELECT ... AS
Rename selected columns.
SELECT name AS employee_name FROM employees
JOIN
Combine rows from two or more tables based on a related column.
SELECT * FROM employees JOIN departments ON employees.dep_id = departments.id
GROUP BY
Group rows that have the same values in specified columns.
SELECT department, COUNT(*) FROM employees GROUP BY department
HAVING
Filter groups based on a condition after grouping.
SELECT department, COUNT(*) FROM employees GROUP BY department HAVING COUNT(*) > 5
Aggregations
COUNT()
Count the number of rows.
SELECT COUNT(*) FROM employees
SUM(), AVG(), MIN(), MAX()
Perform summation, average, minimum, and maximum calculations.
SELECT AVG(salary) FROM employees
Sorting
ORDER BY
Sort the result set by one or more columns.
SELECT * FROM employees ORDER BY salary DESC
Window Functions
ROW_NUMBER() OVER
Assign a unique number to rows within a partition.
SELECT name, ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS rank FROM employees
Set Operations
UNION
Combine the results of two queries, removing duplicates.
SELECT name FROM employees UNION SELECT name FROM contractors
UNION ALL
Combine results including duplicates.
SELECT name FROM employees UNION ALL SELECT name FROM contractors
Data Manipulation
INSERT INTO
Insert new records into a table.
INSERT INTO employees VALUES ('John Doe', 28, 'Sales')
UPDATE
Modify existing records in a table.
UPDATE employees SET age = age + 1 WHERE name = 'John Doe'
DELETE
Remove records from a table.
DELETE FROM employees WHERE name = 'John Doe'
Table Management
CREATE TABLE
Create a new table.
CREATE TABLE new_table (id INT, name STRING)
DROP TABLE
Delete a table and its data.
DROP TABLE new_table
ALTER TABLE
Modify an existing table (e.g., add a column).
ALTER TABLE employees ADD COLUMN hire_date DATE
String Functions
CONCAT()
Concatenate two or more strings.
SELECT CONCAT(first_name, ' ', last_name) AS full_name FROM employees
SUBSTRING()
Extract a substring from a string.
SELECT SUBSTRING(name, 1, 3) FROM employees
Date Functions
CURRENT_DATE
Retrieve the current date.
SELECT CURRENT_DATE
DATEDIFF()
Calculate the difference between two dates.
SELECT DATEDIFF('2024-01-01', '2023-01-01') AS days_difference
YEAR(), MONTH(), DAY()
Extract year, month, or day from a date.
SELECT YEAR(hire_date) FROM employees
This cheat sheet covers various aspects of Spark SQL, including data reading, transformation, filtering, and aggregations, along with examples for clarity. You can adapt these queries to your specific dataset and requirements.
comparison cheatcode for date manipulation in PySpark SQL vs Hive QL:
Function
PySpark SQL Syntax
HiveQL Syntax
Description
Current Date
current_date()
current_date()
Returns current date.
Add Days
date_add(date, days)
date_add(date, days)
Adds specified days to a date.
Subtract Days
date_sub(date, days)
date_sub(date, days)
Subtracts specified days from a date.
Add Months
add_months(date, months)
add_months(date, months)
Adds specified months to a date.
Truncate Date
date_trunc(format, date)
trunc(date, 'format')
Truncates date to specified unit.
Date Difference
datediff(end_date, start_date)
datediff(end_date, start_date)
Returns days between dates.
Year Extraction
year(date)
year(date)
Extracts the year from a date.
Month Extraction
month(date)
month(date)
Extracts the month from a date.
Day Extraction
day(date) or dayofmonth(date)
day(date) or dayofmonth(date)
Extracts the day from a date.
Format Date
date_format(date, 'format')
date_format(date, 'format')
Formats date to specified pattern.
Last Day of Month
last_day(date)
last_day(date)
Returns last day of the month.
Unix Timestamp
unix_timestamp(date, 'pattern')
unix_timestamp(date, 'pattern')
Converts date to Unix timestamp.
From Unix Timestamp
from_unixtime(unix_time, 'format')
from_unixtime(unix_time, 'format')
Converts Unix timestamp to date.
Next Day
next_day(date, 'day_of_week')
next_day(date, 'day_of_week')
Returns next specified day of the week.
Example
To add 5 days to the current date:
PySpark SQL: SELECT date_add(current_date(), 5)
Hive QL: SELECT date_add(current_date(), 5)
Both PySpark SQL and HiveQL share similar functions, but PySpark SQL has better integration with Python datetime types and additional flexibility.
PySpark SQL provides better integration with Python datetime types due to its compatibility with the pyspark.sql.functions library, which allows for seamless handling of Python datetime objects. This flexibility enables direct manipulation of dates and times, conversion between types, and usage of Python’s datetime features within PySpark’s DataFrames.
Additionally, PySpark SQL allows using user-defined functions (UDFs) that interact directly with Python datetime libraries, enhancing its adaptability for complex time-based transformations compared to HiveQL’s fixed SQL-like functions.
Here’s a table showing examples of how PySpark SQL integrates with Python datetime types via the pyspark.sql.functions library:
Function
Description
PySpark Example
Explanation
Current Date
Returns today’s date.
from pyspark.sql.functions import current_date df.select(current_date().alias("current_date"))
Directly fetches the current date using current_date() function.
Convert to DateType
Converts a Python datetime to Spark DateType.
from datetime import datetime from pyspark.sql.functions import lit, to_date df.select(to_date(lit(datetime.now())))
Allows converting Python datetime objects directly to Spark DateType using to_date() function.
Add Days
Adds days to a date.
from pyspark.sql.functions import date_add df.select(date_add(current_date(), 5))
Adds 5 days to the current date.
Date Formatting
Formats date to a specified pattern.
from pyspark.sql.functions import date_format df.select(date_format(current_date(), 'yyyy-MM-dd'))
Formats the date into yyyy-MM-dd format.
Extracting Parts of Date
Extracts year, month, day, etc., from a date.
from pyspark.sql.functions import year, month, dayofmonth df.select(year(current_date()), month(current_date()))
Extracts year and month directly from the current date.
Date Difference
Finds the difference between two dates.
from pyspark.sql.functions import datediff df.select(datediff(current_date(), lit('2022-01-01')))
Calculates the difference in days between today and a specified date.
Handling Null Dates
Checks and fills null date values.
df.fillna({'date_column': '1970-01-01'})
Replaces null date values with a default value (e.g., epoch start date).
Converts a string to DateType using a specified format.
Using Python UDFs with Dates
Applies Python functions directly to datetime columns.
from pyspark.sql.functions import udf df.withColumn('year', udf(lambda x: x.year, IntegerType())(col('date_column')))
Allows creating UDFs with Python’s datetime methods, e.g., extracting the year directly from date columns.
Explanation:
PySpark’s integration with Python datetime types simplifies date manipulations by allowing direct conversion and application of Python datetime methods.
Functions like to_date, date_format, and UDFs make PySpark SQL highly flexible for working with dates, giving more control over formatting, extraction, and handling nulls or custom date logic in transformations.
This compatibility makes it easier to write complex date manipulations within PySpark, directly utilizing Python’s rich datetime functionality.