Comparative overview of partitions, bucketing, segmentation, and broadcasting in PySpark, Spark SQL, and Hive QL in tabular form, along with examples
Here’s a comparative overview of partitions, bucketing, segmentation, and broadcasting in PySpark, Spark SQL, and Hive QL in tabular form, along with examples:
Concept | PySpark | Spark SQL | Hive QL |
---|---|---|---|
Partitions | df.repartition(numPartitions, "column") creates partitions based on specified column. | CREATE TABLE table_name PARTITIONED BY (col1 STRING) allows data to be organized by partition. | ALTER TABLE table_name ADD PARTITION (col1='value') adds a new partition to an existing table. |
Bucketing | df.write.bucketBy(numBuckets, "column").saveAsTable("table_name") for distributing data. | CREATE TABLE table_name CLUSTERED BY (col1) INTO numBuckets BUCKETS for bucketing data in a table. | CREATE TABLE table_name CLUSTERED BY (col1) INTO numBuckets BUCKETS for bucketing strategy. |
Segmentation | Segmentation done by filtering DataFrames based on specific criteria, e.g., df.filter(df.col > 1) . | SELECT * FROM table WHERE col > value for segmenting data based on specific criteria. | SELECT * FROM table WHERE col > value for segmentation in queries. |
Broadcasting | spark.conf.set("spark.sql.autoBroadcastJoinThreshold", size) for broadcast joins. | SELECT /*+ BROADCAST(t2) */ * FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key for broadcast hint. | SET hive.auto.convert.join = true allows automatic broadcasting of smaller tables during joins. |
Examples:
- Partitions: In PySpark, if you have a DataFrame
df
, you can repartition it as follows:df = df.repartition(4, "column_name")
- Bucketing: When bucketing in PySpark:
df.write.bucketBy(5, "column_name").saveAsTable("bucketed_table")
- Segmentation: Segmenting DataFrames:pythonCopy code
df_segment = df.filter(df["column"] > 100)
- Broadcasting: Broadcasting a DataFrame for optimization:
from import SparkSession spark = SparkSession.builder.getOrCreate() spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10485760) # 10MB
This table provides a concise yet comprehensive guide to partitioning, bucketing, segmentation, and broadcasting across different frameworks.
We can use the NTILE
window function to create buckets within a dataset in both PySpark and Spark SQL. The NTILE(n)
function divides the result set into n
buckets or groups, assigning a bucket number to each row. Here’s how you can apply it:
PySpark Example
from pyspark.sql import Window
from pyspark.sql import functions as F
windowSpec = Window.orderBy("column_name")
df_with_buckets = df.withColumn("bucket", F.ntile(4).over(windowSpec))
Spark SQL Example
SELECT *, NTILE(4) OVER (ORDER BY column_name) AS bucket
FROM table_name;
This creates four buckets based on the ordering of column_name
. Each row is assigned a bucket number from 1 to 4.
Important concepts in the PySpark DataFrame API / Spark SQL
Here’s the updated comprehensive overview of important concepts in the PySpark DataFrame API, now including options for showing DataFrame content, schema, and columns:
Category | Function | Description | Example |
---|---|---|---|
Data Reading | spark.read.csv() | Read a CSV file into a DataFrame. | df = spark.read.csv("path/to/file.csv") |
spark.read.json() | Read a JSON file into a DataFrame. | df = spark.read.json("path/to/file.json") | |
spark.read.parquet() | Read a Parquet file into a DataFrame. | df = spark.read.parquet("path/to/file.parquet") | |
DataFrame Creation | createDataFrame() | Create a DataFrame from an RDD or a list. | df = spark.createDataFrame(data, schema) |
from_records() | Create a DataFrame from structured data (like a list of tuples). | df = spark.createDataFrame(data, ["col1", "col2"]) | |
Transformation | select() | Select specific columns from the DataFrame. | df.select("column1", "column2") |
filter() | Filter rows based on a condition. | df.filter(df["column"] > 100) | |
groupBy() | Group rows by a specific column and perform aggregations. | df.groupBy("column").count() | |
withColumn() | Add a new column or replace an existing column. | df.withColumn("new_col", df["col1"] + 1) | |
join() | Join two DataFrames together. | df1.join(df2, "common_col") | |
union() | Combine two DataFrames with the same schema. | df1.union(df2) | |
drop() | Drop specified columns from the DataFrame. | df.drop("column1") | |
distinct() | Return a new DataFrame with distinct rows. | df.distinct() | |
orderBy() | Sort the DataFrame based on one or more columns. | df.orderBy("column1", ascending=False) | |
pivot() | Pivot a DataFrame to reshape it. | df.groupBy("column1").pivot("column2").agg(F.sum("value")) | |
transpose() | Transpose the DataFrame, flipping rows and columns. | df.T (not directly available; use other methods to achieve) | |
Window Functions | Window.partitionBy() | Create a window specification for calculations over specified partitions. | windowSpec = Window.partitionBy("column").orderBy("value") |
row_number() | Assign a unique row number to rows within a partition. | df.withColumn("row_num", F.row_number().over(windowSpec)) | |
UDFs (User Defined Functions) | udf.register() | Register a Python function as a UDF. | spark.udf.register("my_udf", my_function) |
withColumn() | Use a UDF to create a new column. | df.withColumn("new_col", udf("my_udf")(df["col"])) | |
String Manipulation | F.concat() | Concatenate multiple strings into one. | df.withColumn("new_col", F.concat(df["col1"], df["col2"])) |
F.substring() | Extract a substring from a string column. | df.withColumn("sub_col", F.substring("col", 1, 3)) | |
F.lower() | Convert a string column to lowercase. | df.withColumn("lower_col", F.lower("col")) | |
Date Manipulation | F.current_date() | Get the current date. | df.withColumn("current_date", F.current_date()) |
F.date_add() | Add days to a date column. | df.withColumn("new_date", F.date_add("date_col", 5)) | |
F.year() | Extract the year from a date column. | df.withColumn("year", F.year("date_col")) | |
Schema and Data Types | StructType() | Define the schema of a DataFrame. | schema = StructType([...]) |
DataType | Define data types for DataFrame columns. | IntegerType(), StringType(), ... | |
Actions | show(n) | Displays the first n rows of the DataFrame. Defaults to 20. | df.show(5) |
show(truncate=True) | Displays all columns but truncates long strings. | df.show(truncate=False) | |
printSchema() | Prints the schema of the DataFrame, showing column names and types. | df.printSchema() | |
df.columns | Returns a list of column names in the DataFrame. | columns = df.columns | |
count() | Count the number of rows in the DataFrame. | row_count = df.count() | |
collect() | Retrieve all rows from the DataFrame as a list. | data = df.collect() | |
Optimization | persist() | Store DataFrame in memory/disk for re-use. | df.persist() |
cache() | Cache the DataFrame in memory. | df.cache() | |
repartition() | Change the number of partitions of the DataFrame. | df.repartition(4) | |
coalesce() | Reduce the number of partitions without a full shuffle. | df.coalesce(2) |
This table now includes options for showing DataFrame content, schema, and columns along with various other functionalities in the PySpark DataFrame API.
Here’s a cheat sheet for Spark SQL with common queries, functionalities, and examples:
Category | SQL Command | Description | Example |
---|---|---|---|
Data Reading | SELECT * FROM table | Retrieve all columns from a table. | SELECT * FROM employees |
FROM | Specify the table from which to select data. | SELECT name, age FROM employees | |
Data Filtering | WHERE | Filter rows based on a condition. | SELECT * FROM employees WHERE age > 30 |
Data Transformation | SELECT ... AS | Rename selected columns. | SELECT name AS employee_name FROM employees |
JOIN | Combine rows from two or more tables based on a related column. | SELECT * FROM employees JOIN departments ON employees.dep_id = departments.id | |
GROUP BY | Group rows that have the same values in specified columns. | SELECT department, COUNT(*) FROM employees GROUP BY department | |
HAVING | Filter groups based on a condition after grouping. | SELECT department, COUNT(*) FROM employees GROUP BY department HAVING COUNT(*) > 5 | |
Aggregations | COUNT() | Count the number of rows. | SELECT COUNT(*) FROM employees |
SUM() , AVG() , MIN() , MAX() | Perform summation, average, minimum, and maximum calculations. | SELECT AVG(salary) FROM employees | |
Sorting | ORDER BY | Sort the result set by one or more columns. | SELECT * FROM employees ORDER BY salary DESC |
Window Functions | ROW_NUMBER() OVER | Assign a unique number to rows within a partition. | SELECT name, ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS rank FROM employees |
Set Operations | UNION | Combine the results of two queries, removing duplicates. | SELECT name FROM employees UNION SELECT name FROM contractors |
UNION ALL | Combine results including duplicates. | SELECT name FROM employees UNION ALL SELECT name FROM contractors | |
Data Manipulation | INSERT INTO | Insert new records into a table. | INSERT INTO employees VALUES ('John Doe', 28, 'Sales') |
UPDATE | Modify existing records in a table. | UPDATE employees SET age = age + 1 WHERE name = 'John Doe' | |
DELETE | Remove records from a table. | DELETE FROM employees WHERE name = 'John Doe' | |
Table Management | CREATE TABLE | Create a new table. | CREATE TABLE new_table (id INT, name STRING) |
DROP TABLE | Delete a table and its data. | DROP TABLE new_table | |
ALTER TABLE | Modify an existing table (e.g., add a column). | ALTER TABLE employees ADD COLUMN hire_date DATE | |
String Functions | CONCAT() | Concatenate two or more strings. | SELECT CONCAT(first_name, ' ', last_name) AS full_name FROM employees |
SUBSTRING() | Extract a substring from a string. | SELECT SUBSTRING(name, 1, 3) FROM employees | |
Date Functions | CURRENT_DATE | Retrieve the current date. | SELECT CURRENT_DATE |
DATEDIFF() | Calculate the difference between two dates. | SELECT DATEDIFF('2024-01-01', '2023-01-01') AS days_difference | |
YEAR() , MONTH() , DAY() | Extract year, month, or day from a date. | SELECT YEAR(hire_date) FROM employees |
This cheat sheet covers various aspects of Spark SQL, including data reading, transformation, filtering, and aggregations, along with examples for clarity. You can adapt these queries to your specific dataset and requirements.
comparison cheatcode for date manipulation in PySpark SQL vs Hive QL:
Function | PySpark SQL Syntax | HiveQL Syntax | Description |
---|---|---|---|
Current Date | current_date() | current_date() | Returns current date. |
Add Days | date_add(date, days) | date_add(date, days) | Adds specified days to a date. |
Subtract Days | date_sub(date, days) | date_sub(date, days) | Subtracts specified days from a date. |
Add Months | add_months(date, months) | add_months(date, months) | Adds specified months to a date. |
Truncate Date | date_trunc(format, date) | trunc(date, 'format') | Truncates date to specified unit. |
Date Difference | datediff(end_date, start_date) | datediff(end_date, start_date) | Returns days between dates. |
Year Extraction | year(date) | year(date) | Extracts the year from a date. |
Month Extraction | month(date) | month(date) | Extracts the month from a date. |
Day Extraction | day(date) or dayofmonth(date) | day(date) or dayofmonth(date) | Extracts the day from a date. |
Format Date | date_format(date, 'format') | date_format(date, 'format') | Formats date to specified pattern. |
Last Day of Month | last_day(date) | last_day(date) | Returns last day of the month. |
Unix Timestamp | unix_timestamp(date, 'pattern') | unix_timestamp(date, 'pattern') | Converts date to Unix timestamp. |
From Unix Timestamp | from_unixtime(unix_time, 'format') | from_unixtime(unix_time, 'format') | Converts Unix timestamp to date. |
Next Day | next_day(date, 'day_of_week') | next_day(date, 'day_of_week') | Returns next specified day of the week. |
Example
To add 5 days to the current date:
- PySpark SQL:
SELECT date_add(current_date(), 5)
- Hive QL:
SELECT date_add(current_date(), 5)
Both PySpark SQL and HiveQL share similar functions, but PySpark SQL has better integration with Python datetime types and additional flexibility.
PySpark SQL provides better integration with Python datetime types due to its compatibility with the pyspark.sql.functions
library, which allows for seamless handling of Python datetime
objects. This flexibility enables direct manipulation of dates and times, conversion between types, and usage of Python’s datetime features within PySpark’s DataFrames.
Additionally, PySpark SQL allows using user-defined functions (UDFs) that interact directly with Python datetime libraries, enhancing its adaptability for complex time-based transformations compared to HiveQL’s fixed SQL-like functions.
Here’s a table showing examples of how PySpark SQL integrates with Python datetime types via the pyspark.sql.functions
library:
Function | Description | PySpark Example | Explanation |
---|---|---|---|
Current Date | Returns today’s date. | from pyspark.sql.functions import current_date df.select(current_date().alias("current_date")) | Directly fetches the current date using current_date() function. |
Convert to DateType | Converts a Python datetime to Spark DateType. | from datetime import datetime from pyspark.sql.functions import lit, to_date df.select(to_date(lit(datetime.now()))) | Allows converting Python datetime objects directly to Spark DateType using to_date() function. |
Add Days | Adds days to a date. | from pyspark.sql.functions import date_add df.select(date_add(current_date(), 5)) | Adds 5 days to the current date. |
Date Formatting | Formats date to a specified pattern. | from pyspark.sql.functions import date_format df.select(date_format(current_date(), 'yyyy-MM-dd')) | Formats the date into yyyy-MM-dd format. |
Extracting Parts of Date | Extracts year, month, day, etc., from a date. | from pyspark.sql.functions import year, month, dayofmonth df.select(year(current_date()), month(current_date())) | Extracts year and month directly from the current date. |
Date Difference | Finds the difference between two dates. | from pyspark.sql.functions import datediff df.select(datediff(current_date(), lit('2022-01-01'))) | Calculates the difference in days between today and a specified date. |
Handling Null Dates | Checks and fills null date values. | df.fillna({'date_column': '1970-01-01'}) | Replaces null date values with a default value (e.g., epoch start date). |
Convert String to Date | Converts a string to DateType. | df.select(to_date(lit("2023-10-31"), "yyyy-MM-dd")) | Converts a string to DateType using a specified format. |
Using Python UDFs with Dates | Applies Python functions directly to datetime columns. | from pyspark.sql.functions import udf df.withColumn('year', udf(lambda x: x.year, IntegerType())(col('date_column'))) | Allows creating UDFs with Python’s datetime methods, e.g., extracting the year directly from date columns. |
Explanation:
- PySpark’s integration with Python datetime types simplifies date manipulations by allowing direct conversion and application of Python
datetime
methods. - Functions like
to_date
,date_format
, and UDFs make PySpark SQL highly flexible for working with dates, giving more control over formatting, extraction, and handling nulls or custom date logic in transformations.
This compatibility makes it easier to write complex date manipulations within PySpark, directly utilizing Python’s rich datetime functionality.
Leave a Reply