PySpark provides a powerful API for data manipulation, similar to pandas, but optimized for big data processing. Below is a comprehensive overview of DataFrame operations, functions, and syntax in PySpark with examples.
Creating DataFrames
Creating DataFrames from various sources is a common task in PySpark. Below are examples for creating DataFrames from CSV files, Excel files, Python List, Python Tuple, Python Dictionary, Pandas DataFrames, Hive tables, values, RDDs, Oracle databases, and HBase tables.
1. Creating DataFrames from CSV Files
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from CSV")
.getOrCreate()
# Read CSV file
df_csv = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
df_csv.show()
2. Creating DataFrames from Excel Files
To read Excel files, you need to install the spark-excel
library.
# Add the following dependency when initializing SparkSession
# .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.5")
df_excel = spark.read
.format("com.crealytics.spark.excel")
.option("header", "true")
.option("inferSchema", "true")
.load("path/to/file.xlsx")
df_excel.show()
3. Creating DataFrames from Python List
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from List")
.getOrCreate()
# Create DataFrame from list
data = [("Alice", 30), ("Bob", 25), ("Cathy", 28)]
columns = ["name", "age"]
df_list = spark.createDataFrame(data, columns)
df_list.show()
4. Creating DataFrames from Python Tuple
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from Tuple")
.getOrCreate()
# Create DataFrame from tuple
data = (("Alice", 30), ("Bob", 25), ("Cathy", 28))
columns = ["name", "age"]
df_tuple = spark.createDataFrame(data, columns)
df_tuple.show()
5. Creating DataFrames from Python Dictionary
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from Dictionary")
.getOrCreate()
# Create DataFrame from dictionary
data = {"name": ["Alice", "Bob", "Cathy"], "age": [30, 25, 28]}
df_dict = spark.createDataFrame(pd.DataFrame(data))
df_dict.show()
6. Creating DataFrames from Pandas DataFrame
import pandas as pd
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from Pandas")
.getOrCreate()
# Create a Pandas DataFrame
pdf = pd.DataFrame({"name": ["Alice", "Bob", "Cathy"], "age": [30, 25, 28]})
# Convert Pandas DataFrame to Spark DataFrame
df_pandas = spark.createDataFrame(pdf)
df_pandas.show()
7. Creating DataFrames from Hive Tables
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from Hive")
.enableHiveSupport()
.getOrCreate()
# Read Hive table
df_hive = spark.sql("SELECT * FROM database.table_name")
df_hive.show()
8. Creating DataFrames from Values
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from Values")
.getOrCreate()
# Create DataFrame from values
data = [("Alice", 30), ("Bob", 25), ("Cathy", 28)]
columns = ["name", "age"]
df_values = spark.createDataFrame(data, columns)
df_values.show()
9. Creating DataFrames from RDDs
from pyspark.sql import Row, SparkSession
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from RDD")
.getOrCreate()
# Create an RDD
rdd = spark.sparkContext.parallelize([
Row(name="Alice", age=30),
Row(name="Bob", age=25),
Row(name="Cathy", age=28)
])
# Convert RDD to DataFrame
df_rdd = spark.createDataFrame(rdd)
df_rdd.show()
10. Creating DataFrames from Oracle Database
To read from Oracle, you need to have the JDBC driver for Oracle in your classpath.
# Add the following dependency when initializing SparkSession
# .config("spark.jars", "path/to/ojdbc8.jar")
jdbc_url = "jdbc:oracle:thin:@hostname:port:SID"
connection_properties = {
"user": "username",
"password": "password",
"driver": "oracle.jdbc.driver.OracleDriver"
}
df_oracle = spark.read.jdbc(jdbc_url, "schema.table_name", properties=connection_properties)
df_oracle.show()
11. Creating DataFrames from HBase Tables
To read from HBase, you need the hbase-spark
connector.
# Add the following dependency when initializing SparkSession
# .config("spark.jars.packages", "org.apache.hbase.connectors.spark:hbase-spark:1.0.0")
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from HBase")
.getOrCreate()
# Define HBase catalog
catalog = ''.join("""{
"table":{"namespace":"default", "name":"tablename"},
"rowkey":"key",
"columns":{
"col0":{"cf":"rowkey", "col":"key", "type":"string"},
"col1":{"cf":"cf1", "col":"col1", "type":"string"},
"col2":{"cf":"cf2", "col":"col2", "type":"string"}
}
}""".split())
df_hbase = spark.read
.options(catalog=catalog)
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
df_hbase.show()
12. Creating DataFrames from JSON Files
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from JSON")
.getOrCreate()
# Read JSON file
df_json = spark.read.json("path/to/file.json")
df_json.show()
13. Creating DataFrames from Parquet Files
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from Parquet")
.getOrCreate()
# Read Parquet file
df_parquet = spark.read.parquet("path/to/file.parquet")
df_parquet.show()
Viewing Data in PySpark
1. show()
: View the first few rows
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("example").getOrCreate()
data = [('Alice', 25), ('Bob', 30), ('Charlie', 35)]
columns = ['Name', 'Age']
sdf = spark.createDataFrame(data, columns)
# View the first 20 rows (default)
sdf.show()
# View the first n rows
sdf.show(10)
2. show()
with truncate
: Control column width
# Truncate long strings to 20 characters (default)
sdf.show(truncate=True)
# Do not truncate strings
sdf.show(truncate=False)
# Truncate strings to a specific length
sdf.show(truncate=5)
3. show()
with vertical
: Vertical display of rows
# Vertical display of rows
sdf.show(vertical=True)
4. printSchema()
: Print the schema of the DataFrame
# Print the schema
sdf.printSchema()
5. describe()
: Summary statistics
# Summary statistics of DataFrame
sdf.describe().show()
6. head()
: Retrieve the first row or n rows
# Retrieve the first row
print(sdf.head())
# Retrieve the first n rows
print(sdf.head(5))
7. take()
: Retrieve the first n rows
# Retrieve the first n rows as a list of Row objects
print(sdf.take(5))
8. collect()
: Retrieve all rows
# Retrieve all rows as a list of Row objects
all_rows = sdf.collect()
for row in all_rows:
print(row)
- PySpark:
show()
: Flexible method for displaying rows with options liketruncate
andvertical
.printSchema()
: For printing the schema.describe()
: For summary statistics.head()
,take()
: For retrieving specific number of rows.collect()
: For retrieving all rows.
PySpark Operations on DataFrames
PySpark DataFrame operations can be broadly classified into two categories: Transformations and Actions.
Transformations
Transformations are operations that are applied on a DataFrame to produce a new DataFrame. Transformations are lazy, meaning they are not executed immediately. Instead, they are recorded as a lineage of transformations to be applied when an action is called.
Here are some common transformations:
select()
- Description: Selects a subset of columns.
- Syntax:
df.select("col1", "col2", ...)
- Example:
df.select("name", "age").show()
df_pyspark.select([‘Vars’,’Month_period’]).show()
df_pyspark.select(‘Vars’,’Month_period’).show()
from pyspark.sql.functions import col
df_pyspark.select(col(“Month_Period”)).show()
df_pyspark[“Month_Period”] are all these same
withColumn()
- Description: Adds or replaces a column.
- Syntax:
df.withColumn("new_col", df["existing_col"] + 1)
- Example:
df.withColumn("new_age", df["age"] + 1).show()
use cases for the withColumn()
method in PySpark:
1. Conditional Column Creation
from pyspark.sql.functions import when, col
df = df.withColumn("is_adult", when(col("age") >= 18, "Yes").otherwise("No"))
2. Column Transformation using UDFs
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def convert_to_uppercase(name):
return name.upper()
udf_convert_to_uppercase = udf(convert_to_uppercase, StringType())
df = df.withColumn("uppercase_name", udf_convert_to_uppercase(df["name"]))
3. Column Calculation using Multiple Columns
from pyspark.sql.functions import col
df = df.withColumn("total_amount", col("quantity") * col("price"))
4. Renaming and Dropping Columns
df = df.withColumnRenamed("old_name", "new_name")
df = df.drop("column_to_drop")
5. Creating a Column with a List or Array
from pyspark.sql.functions import array, lit
df = df.withColumn("colors", array(lit("red"), lit("green"), lit("blue")))
6. Creating a Column with a Struct
from pyspark.sql.functions import struct, lit
df = df.withColumn("address", struct(lit("123 Main St"), lit("Anytown"), lit("USA")))
7. Using Window Functions
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
window = Window.orderBy("date")
df = df.withColumn("row_num", row_number().over(window))
8. Creating a Column with a JSON Object
from pyspark.sql.functions import to_json, struct
df = df.withColumn("json_data", to_json(struct(df["name"], df["age"])))
9. Using Aggregate Functions
from pyspark.sql.functions import sum, avg
df = df.groupBy("group").agg(sum("value").alias("sum_value"), avg("value").alias("avg_value"))
df = df.withColumn("sum_value_squared", df["sum_value"] ** 2)
10. Using Pattern Matching with when
and `otherwise
from pyspark.sql.functions import when, col
df = df.withColumn("category",
when(col("value") < 10, "low").
when(col("value") < 20, "medium").
otherwise("high"))
drop()
- Description: Drops specified columns.
- Syntax:
df.drop("col1", "col2")
- Example:
df.drop("age").show()
distinct()
- Description: Returns distinct rows.
- Syntax:
df.distinct()
- Example:
df.distinct().show()
filter() or where()
- Description: Filters rows based on a condition.
- Syntax:
df.filter(df["age"] > 30)
- Example:df.filter(df[“age”] > 30).show()
groupBy()
- Description: Groups rows by specified columns.
- Syntax:
df.groupBy("department").agg({"salary": "mean"})
- Example:
df.groupBy("department").mean("salary").show()
orderBy() or sort()
- Description: Sorts rows by specified columns.
- Syntax:
df.orderBy(df["age"].desc())
- Example:
df.orderBy(df["age"].desc()).show()
join()
- Description: Joins two DataFrames.
- Syntax:
df1.join(df2, df1["key"] == df2["key"], "inner")
- Example:
df1.join(df2, df1["id"] == df2["id"], "inner").show()
union()
- Description: Unions two DataFrames.
- Syntax:
df1.union(df2)
- Example:
df1.union(df2).show()
sample()
- Description: Returns a sampled subset of rows.
- Syntax:
df.sample(withReplacement=False, fraction=0.1)
- Example:
df.sample(withReplacement=False, fraction=0.1).show()
Actions
Actions trigger the execution of the transformations recorded in the DAG and return results.
Here are some common actions:
collect()
- Description: Returns all rows as a list of Row objects.
- Syntax:
df.collect()
- Example:rows = df.collect() for row in rows: print(row)
first()
- Description: Returns the first row.
- Syntax:
df.first()
- Example:print(df.first())
take()
- Description: Returns the first
n
rows. - Syntax:
df.take(n)
- Example:print(df.take(5))
describe()
- Description: Computes basic statistics.
- Syntax:
df.describe()
- Example:df.describe().show()
cache()
- Description: Caches the DataFrame in memory.
- Syntax:
df.cache()
- Example:
df.cache()
persist()
- Description: Persists the DF with specified storage level.
- Syntax:
df.persist(storageLevel)
- Example:from pyspark import StorageLevel df.persist(StorageLevel.MEMORY_AND_DISK)
write()
- Description: Writes the DataFrame to a specified format.
- Syntax:
df.write.format("parquet").save("path")
- Example:df.write.format(“parquet”).save(“path/to/parquet”)
foreach()
- Description: Applies a function to each row.
- Syntax:
df.foreach(f)
- Example:def print_row(row): print(row) df.foreach(print_row)
1.how to use SQL expressions inside the filter or where functions?using the expr function from the pyspark.sql.functions module, which allows you to pass SQL expressions as strings.
Here is an example demonstrating how to use SQL expressions inside the filter
or where
functions:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
# Initialize Spark session
spark = SparkSession.builder
.appName("SQL Expression Example")
.getOrCreate()
# Sample DataFrame
data = [
(1, "Alice", 25, 2000),
(2, "Bob", 30, 3000),
(3, "Charlie", 35, 4000)
]
columns = ["id", "name", "age", "salary"]
df = spark.createDataFrame(data, columns)
# Using SQL expression in filter
filtered_df = df.filter(expr("age > 30"))
filtered_df.show()
Detailed Example
Let’s go through a more detailed example with multiple conditions and transformations using SQL expressions.
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
# Initialize Spark session
spark = SparkSession.builder
.appName("SQL Expression Example")
.getOrCreate()
# Sample DataFrame
data = [
(1, "Alice", 25, 2000),
(2, "Bob", 30, 3000),
(3, "Charlie", 35, 4000),
(4, "David", 40, 5000)
]
columns = ["id", "name", "age", "salary"]
df = spark.createDataFrame(data, columns)
# Filter using SQL expression
filtered_df = df.filter(expr("age > 30 AND salary > 3000"))
# Select specific columns using SQL expression
selected_df = filtered_df.selectExpr("id", "name", "salary * 1.1 AS adjusted_salary")
# Adding a new column using SQL expression
with_new_column_df = selected_df.withColumn("salary_category", expr("CASE WHEN adjusted_salary > 4000 THEN 'High' ELSE 'Medium' END"))
with_new_column_df.show()
Output
+---+-------+---------------+---------------+
| id| name|adjusted_salary|salary_category|
+---+-------+---------------+---------------+
| 3|Charlie| 4400.0| High|
| 4| David| 5500.0| High|
+---+-------+---------------+---------------+
2. select function syntax variations and examples:
The select
function in PySpark is used to select a subset of columns or expressions from a DataFrame. It can be used in several ways to achieve different results.
Basic Syntax
The basic syntax for the select
function is:
DataFrame.select(*cols)
Here, *cols can be a list of column names, Column objects, or expressions.
Selecting Specific Columns
df.select('column1', 'column2').show()
This will select and display the 'column1' and 'column2' columns.
Selecting Columns with Expressions
from pyspark.sql.functions import col, expr
df.select(col('column1'), expr('column2 + 1').alias('column2_plus_one')).show()
This will select ‘column1’ and an expression ‘column2 + 1’ as ‘column2_plus_one’.
Using Column Objects
Example:
from pyspark.sql.functions import col
df.select(col('column1'), col('column2')).show()
This will select 'column1' and 'column2' using Column objects.
Renaming Columns
df.select(col('column1').alias('new_column1')).show()
This will select 'column1' and rename it to 'new_column1'.
Selecting All Columns
df.select('*').show()
This will select all columns in the DataFrame.
Using SQL Expressions
df.selectExpr('column1', 'column2 + 1 as column2_plus_one').show()
This allows you to use SQL expressions directly.
3.PySpark DataFrame groupBy Operation
The groupBy
operation in PySpark is used to group data based on one or more columns. Once grouped, you can apply various aggregation functions like count
, sum
, avg
, max
, min
, etc.
Syntax
DataFrame.groupBy(*cols)
Here, *cols
can be one or more column names or expressions.
Common Aggregation Functions
count()
sum(column)
avg(column)
max(column)
min(column)
agg(*exprs)
Examples
Sample DataFrame
data = [
("Alice", "Sales", 3000),
("Bob", "Sales", 4000),
("Alice", "Sales", 4000),
("Catherine", "HR", 4000),
("David", "HR", 5000),
("Eve", "IT", 3000)
]
columns = ["Name", "Department", "Salary"]
df = spark.createDataFrame(data, schema=columns)
df.show()
1. Group By Single Column and Count
df.groupBy("Department").count().show()
Output:
+----------+-----+
|Department|count|
+----------+-----+
| HR| 2|
| IT| 1|
| Sales| 3|
+----------+-----+
2. Group By Multiple Columns and Count
df.groupBy("Department", "Name").count().show()
Output:
+----------+-----+-----+
|Department| Name|count|
+----------+-----+-----+
| Sales|Alice| 2|
| HR|David| 1|
| HR|Catherine| 1|
| IT| Eve| 1|
| Sales| Bob| 1|
+----------+-----+-----+
3. Group By with Sum Aggregation
df.groupBy("Department").sum("Salary").show()
Output:
+----------+-----------+
|Department|sum(Salary)|
+----------+-----------+
| HR| 9000 |
| IT| 3000 |
| Sales| 11000 |
+----------+-----------+
4. Group By with Multiple Aggregations
df.groupBy("Department").agg(
count("*").alias("Total_Count"),
sum("Salary").alias("Total_Salary"),
avg("Salary").alias("Average_Salary"),
max("Salary").alias("Max_Salary"),
min("Salary").alias("Min_Salary")
).show()
Output:
+----------+-----------+-----------+--------------+-----------+-----------+
|Department|Total_Count|Total_Salary|Average_Salary|Max_Salary|Min_Salary|
+----------+-----------+-----------+--------------+-----------+-----------+
| HR| 2| 9000| 4500.0| 5000| 4000|
| IT| 1| 3000| 3000.0| 3000| 3000|
| Sales| 3| 11000| 3666.67| 4000| 3000|
+----------+-----------+-----------+--------------+-----------+-----------+
5. Using agg
Method with Expressions
from pyspark.sql.functions import expr
df.groupBy("Department").agg(
expr("count(*) as Total_Count"),
expr("sum(Salary) as Total_Salary"),
expr("avg(Salary) as Average_Salary"),
expr("max(Salary) as Max_Salary"),
expr("min(Salary) as Min_Salary")
).show()
Output is the same as above.
6. Group By with Having Clause (Filtering Groups)
df.groupBy("Department").agg(
sum("Salary").alias("Total_Salary")
).filter(col("Total_Salary") > 5000).show()
Output:
+----------+-----------+
|Department|Total_Salary|
+----------+-----------+
| HR| 9000|
| Sales| 11000|
+----------+-----------+
3.PySpark orderBy() and sort() Operations
In PySpark, both orderBy()
and sort()
are used to sort the rows of a DataFrame. They can be used interchangeably, as they provide the same functionality.
Syntax
DataFrame.orderBy(*cols, ascending=True)
DataFrame.sort(*cols, ascending=True)
cols
: List of column names or expressions to sort by.ascending
: Boolean or list of booleans. If a single boolean is provided, it applies to all columns. If a list is provided, it specifies the sort order for each corresponding column.
data = [
("Alice", 34, "HR", 3000),
("Bob", 45, "IT", 4000),
("Catherine", 29, "HR", 5000),
("David", 36, "IT", 2500),
("Eve", 28, "Sales", 2800)
]
columns = ["Name", "Age", "Department", "Salary"]
df = spark.createDataFrame(data, schema=columns)
df.show()
1. Order By Single Column Ascending
df.orderBy("Age").show()
Equivalent:
df.sort("Age").show()
Output:
+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Eve| 28| Sales| 2800|
|Catherine| 29| HR| 5000|
| Alice| 34| HR| 3000|
| David| 36| IT| 2500|
| Bob| 45| IT| 4000|
+---------+---+----------+------+
2. Order By Single Column Descending
df.orderBy(col("Age").desc()).show()
df.sort(col("Age").desc()).show()
Output:
+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Bob| 45| IT| 4000|
| David| 36| IT| 2500|
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 5000|
| Eve| 28| Sales| 2800|
+---------+---+----------+------+
3. Order By Multiple Columns
df.sort("col1", "col2")
df.sort(col("col1").asc(), col("col2").desc())
4. Order By Multiple Columns with Different Sort Orders
df.orderBy(["Department", "Age"], ascending=[True, False]).show()
df.sort(["Department", "Age"], ascending=[True, False]).show()
Output:
+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
|Catherine| 29| HR| 5000|
| Alice| 34| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 2500|
| Eve| 28| Sales| 2800|
+---------+---+----------+------+
Useful Examples
Sorting by Salary in Descending Order
df.orderBy(col("Salary").desc()).show()
Output:
+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
|Catherine| 29| HR| 5000|
| Bob| 45| IT| 4000|
| Alice| 34| HR| 3000|
| Eve| 28| Sales| 2800|
| David| 36| IT| 2500|
+---------+---+----------+------+
Sorting by Department and then by Salary within each Department
df.orderBy("Department", col("Salary").desc()).show()
Output:
+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
|Catherine| 29| HR| 5000|
| Alice| 34| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 2500|
| Eve| 28| Sales| 2800|
+---------+---+----------+------+
Sorting with Expression
df.orderBy(expr("Salary + Age").desc()).show()
Output:
+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Bob| 45| IT| 4000|
|Catherine| 29| HR| 5000|
| Alice| 34| HR| 3000|
| David| 36| IT| 2500|
| Eve| 28| Sales| 2800|
+---------+---+----------+------+
4.PySpark Join Operations
In PySpark, the join
operation is used to combine two DataFrames based on a condition. The join operation can be performed in several ways, such as inner join, left join, right join, outer join, semi join, and anti join.
Syntax
DataFrame.join(other, on=None, how=None)
other
: The DataFrame to join with.on
: The column(s) to join on. This can be a string representing a single column name, a list of column names, or a condition.how
: The type of join to perform. Default isinner
.
Join Types
inner
: Inner join.left
orleft_outer
: Left outer join.right
orright_outer
: Right outer join.outer
orfull
orfull_outer
: Full outer join.left_semi
: Left semi join.left_anti
: Left anti join.
Examples
data1 = [
("Alice", 34, "HR"),
("Bob", 45, "IT"),
("Catherine", 29, "HR"),
("David", 36, "IT")
]
columns1 = ["Name", "Age", "Department"]
data2 = [
("HR", 3000),
("IT", 4000),
("Sales", 2800)
]
columns2 = ["Department", "Salary"]
df1 = spark.createDataFrame(data1, schema=columns1)
df2 = spark.createDataFrame(data2, schema=columns2)
df1.show()
df2.show()
1. Inner Join
inner_join_df = df1.join(df2, on="Department", how="inner")
inner_join_df.show()
Output:
+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 4000|
+---------+---+----------+------+
2. Left Outer Join
left_join_df = df1.join(df2, on="Department", how="left")
left_join_df.show()
Output:
+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 4000|
+---------+---+----------+------+
3. Right Outer Join
right_join_df = df1.join(df2, on="Department", how="right")
right_join_df.show()
Output:
+---------+----+----------+------+
| Name| Age|Department|Salary|
+---------+----+----------+------+
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 4000|
| null|null| Sales| 2800|
+---------+----+----------+------+
4. Full Outer Join
outer_join_df = df1.join(df2, on="Department", how="outer")
outer_join_df.show()
Output:
+---------+----+----------+------+
| Name| Age|Department|Salary|
+---------+----+----------+------+
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 4000|
| null|null| Sales| 2800|
+---------+----+----------+------+
5. Left Semi Join
semi_join_df = df1.join(df2, on="Department", how="left_semi")
semi_join_df.show()
Output:
+---------+---+----------+
| Name|Age|Department|
+---------+---+----------+
| Alice| 34| HR|
|Catherine| 29| HR|
| Bob| 45| IT|
| David| 36| IT|
+---------+---+----------+
6. Left Anti Join
anti_join_df = df1.join(df2, on="Department", how="left_anti")
anti_join_df.show()
Output:
+----+---+----------+
|Name|Age|Department|
+----+---+----------+
+----+---+----------+
7. Join on Multiple Columns
data3 = [
("Alice", 34, "HR", 3000),
("Bob", 45, "IT", 4000),
("Catherine", 29, "HR", 5000),
("David", 36, "IT", 2500),
("Eve", 28, "Sales", 2800)
]
columns3 = ["Name", "Age", "Department", "Salary"]
data4 = [
("HR", 34, 3000, "Manager"),
("IT", 45, 4000, "Developer"),
("Sales", 28, 2800, "Salesman")
]
columns4 = ["Department", "Age", "Salary", "Role"]
df3 = spark.createDataFrame(data3, schema=columns3)
df4 = spark.createDataFrame(data4, schema=columns4)
multi_col_join_df = df3.join(df4, on=["Department", "Age", "Salary"], how="inner")
multi_col_join_df.show()
Output:
+-----+---+----------+------+--------+
| Name|Age|Department|Salary| Role|
+-----+---+----------+------+--------+
|Alice| 34| HR| 3000| Manager|
| Bob| 45| IT| 4000|Developer|
+-----+---+----------+------+--------+
Example Project: Comprehensive ETL Pipeline in PySpark
Let’s create a comprehensive example project that demonstrates these operations.
- Setup Spark Session
- Load Data from CSV, Hive, Oracle, and create DataFrames
- Apply Transformations
- Perform Actions
- Persist DataFrames
- Convert to Pandas for further analysis and visualization
1. Setup Spark Session
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder
.appName("ETL Pipeline Example")
.enableHiveSupport()
.getOrCreate()
2. Load Data
From CSV
df_csv = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
From Hive
df_hive = spark.sql("SELECT * FROM database.table_name")
From Oracle
jdbc_url = "jdbc:oracle:thin:@hostname:port:SID"
connection_properties = {
"user": "username",
"password": "password",
"driver": "oracle.jdbc.driver.OracleDriver"
}
df_oracle = spark.read.jdbc(jdbc_url, "schema.table_name", properties=connection_properties)
3. Apply Transformations
# Select columns
df_selected = df_csv.select("name", "age", "salary")
# Filter rows
df_filtered = df_selected.filter(df_selected["age"] > 30)
# Group by and aggregate
df_grouped = df_filtered.groupBy("department").mean("salary")
# Join with another DataFrame
df_joined = df_grouped.join(df_hive, df_grouped["department"] == df_hive["dept_id"])
# Add new column
df_with_new_col = df_joined.withColumn("new_col", df_joined["salary"] + 1000)
# Drop column
df_final = df_with_new_col.drop("dept_id")
4. Perform Actions
# Show DataFrame
df_final.show()
# Collect DataFrame
rows = df_final.collect()
for row in rows:
print(row)
# Count rows
print(df_final.count())
# Describe DataFrame
df_final.describe().show()
5. Persist DataFrames
# Cache DataFrame
df_final.cache()
# Persist DataFrame
from pyspark import StorageLevel
df_final.persist(StorageLevel.MEMORY_AND_DISK)
6. Convert to Pandas for Further Analysis and Visualization
import pandas as pd
# Convert to Pandas DataFrame
pdf = df_final.toPandas()
# Analyze with Pandas
print(pdf.describe())
# Visualize with Matplotlib
import matplotlib.pyplot as plt
pdf['salary'].hist()
plt.show()
you can use SQL expressions inside DataFrame operations,
including the filter
or where
functions, in PySpark. This is done
Date and Time Functions- Pyspark Dataframes
String Manipulation on PySpark DataFrames
Window functions in PySpark on Dataframes
Some Glaring Questions:-
How do you handle missing data in PySpark?
Handling missing data is a common task in data processing workflows, and PySpark provides various methods to manage missing or null values in a DataFrame. You can choose to remove, replace, or impute missing data depending on the specific use case.
Here are some common techniques to handle missing data in PySpark:
1. Drop Rows with Missing Values (dropna
)
The dropna()
method allows you to drop rows that contain null or missing values.
Example: Drop Rows with Any Missing Values
This will remove any row that has at least one missing value in any column.
# Drop rows where any column has a null value
df_clean = df.dropna()
Example: Drop Rows Based on Specific Columns
You can specify which columns to check for missing values.
# Drop rows only if 'column1' or 'column2' has a null value
df_clean = df.dropna(subset=["column1", "column2"])
Example: Drop Rows with Threshold
You can also set a threshold, which means only rows with a certain number of non-null values will be retained.
# Drop rows that have less than 2 non-null values
df_clean = df.dropna(thresh=2)
2. Replace Missing Values (fillna
)
The fillna()
method replaces null or missing values with a specified value. You can fill nulls with a constant value or use specific values for different columns.
Example: Fill All Null Values with a Single Value
You can replace all null values in a DataFrame with a constant value like 0
or an empty string.
# Replace all null values with 0
df_filled = df.fillna(0)
Example: Fill Nulls in Specific Columns
You can fill nulls in specific columns with different values.
# Replace nulls in 'column1' with 0 and in 'column2' with 'unknown'
df_filled = df.fillna({"column1": 0, "column2": "unknown"})
3. Impute Missing Values with Mean, Median, or Mode
To fill missing values with statistical values like mean, median, or mode, you can use PySpark’s agg()
function or the pyspark.ml.feature.Imputer
.
Example: Fill Missing Values with Mean
You can calculate the mean of a column and then use fillna()
to replace the missing values.
from pyspark.sql.functions import mean
# Calculate the mean of 'column1'
mean_value = df.select(mean(df['column1'])).collect()[0][0]
# Fill missing values in 'column1' with the mean
df_filled = df.fillna({"column1": mean_value})
Example: Use the Imputer
from pyspark.ml
PySpark provides the Imputer
class, which allows you to automatically fill missing values with the mean, median, or other strategies.
from pyspark.ml.feature import Imputer
# Create an Imputer object and set the strategy to 'mean'
imputer = Imputer(inputCols=["column1", "column2"], outputCols=["column1_imputed", "column2_imputed"])
# Fit the imputer model and transform the DataFrame
df_imputed = imputer.fit(df).transform(df)
inputCols
: Columns where the missing values are found.outputCols
: Columns where the imputed values will be stored.- You can change the strategy to “median” or “mode” using
imputer.setStrategy("median")
.
4. Identifying Rows with Missing Data
Before handling missing data, you may want to identify rows or columns that contain missing values.
Example: Filter Rows with Null Values
You can use the filter()
or where()
methods to filter rows with null values.
# Filter rows where 'column1' has a null value
df_nulls = df.filter(df['column1'].isNull())
Example: Count Missing Values in Each Column
You can count the number of missing values in each column.
# Count the number of missing values in each column
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()
5. Handling Missing Data in Complex Data Types
If you are working with complex types like arrays, structs, or maps, you might need to handle missing data within these nested structures.
Example: Fill Missing Values in Struct Columns
You can use the withColumn()
method and a combination of functions like when()
, col()
, and lit()
to handle missing values within nested fields.
# Assuming you have a StructType column called 'address' with a 'city' field
df_filled = df.withColumn("address",
when(col("address.city").isNull(),
struct(lit("Unknown").alias("city"), col("address.state"))
).otherwise(col("address"))
)
6. Dropping or Filling Columns with High Missing Rates
Sometimes, you may want to drop or fill entire columns if they contain too many missing values. You can first check the percentage of missing values and then decide whether to drop or fill the column.
Example: Drop Columns with a High Percentage of Missing Values
# Calculate the percentage of missing values for each column
threshold = 0.5 # Set a threshold, e.g., drop columns with more than 50% missing values
total_rows = df.count()
# Identify columns with more than 50% missing values
cols_to_drop = [c for c in df.columns if df.filter(col(c).isNull()).count() / total_rows > threshold]
# Drop these columns
df_clean = df.drop(*cols_to_drop)
7. Handling Null Values in Joins
When performing joins, missing values can affect the results, especially if you’re using keys that contain nulls. You might want to handle missing values before or after the join operation.
Example: Fill Nulls Before Join
# Fill missing values in the join key column before performing the join
df = df.fillna({"join_key": "default_value"})
other_df = other_df.fillna({"join_key": "default_value"})
# Perform the join
joined_df = df.join(other_df, on="join_key")
Summary of Methods to Handle Missing Data in PySpark:
- Drop rows with missing values:
dropna()
. - Replace missing values with a constant value:
fillna()
. - Impute missing values with mean, median, or mode: Use
pyspark.ml.feature.Imputer
. - Filter or count missing values using
filter()
,where()
, orcount()
. - Handle nested missing data in complex types like
StructType
,ArrayType
, etc. - Drop columns with too many missing values by calculating the percentage of nulls.
The approach you take depends on your specific requirements (e.g., whether the missing data can be safely removed, replaced with a default, or needs to be imputed).
NAN or None or Null
In PySpark, NaN
(Not a Number), None
, and null
are commonly used to represent missing or undefined data in DataFrames, but they are not exactly the same. Let’s go over how these different types of missing data behave in PySpark.
Key Differences Between None
, null
, and NaN
:
None
in Python:
None
is the Python equivalent of null
in Spark and SQL. When you create a DataFrame with None
in the data, Spark will treat it as a null value in the DataFrame.
data = [(1, "Alice"), (2, None)]
df = spark.createDataFrame(data, ["id", "name"])
df.show()
+---+-----+
| id| name|
+---+-----+
| 1|Alice|
| 2| null|
+---+-----+
null
in PySpark:
# Checking for null values
df.filter(df["name"].isNull()).show()
NaN
in PySpark:
from pyspark.sql.functions import isnan
# Checking for NaN values in a column
df.filter(isnan(df["column"])).show()
How to Handle None
, null
, and NaN
in PySpark
1. Handling None
and null
- Replace
null
orNone
Values: You can usefillna()
to replaceNone
(ornull
) values in the DataFrame.
# Replace null values in the 'name' column with 'Unknown'
df_filled = df.fillna({"name": "Unknown"})
df_filled.show()
- Drop Rows with
null
Values: You can usedropna()
to remove rows containingnull
values.
# Drop rows where any column contains null values
df_clean = df.dropna()
df_clean.show()
2. Handling NaN
Values
- Replace
NaN
Values: You can usefillna()
to replaceNaN
values in numeric columns.
# Replace NaN values in numeric columns with 0
df_filled = df.fillna(0)
- Filter Out
NaN
Values: You can use theisnan()
function to filter rows containingNaN
values.
from pyspark.sql.functions import isnan
# Filter rows where a column contains NaN
df_filtered = df.filter(~isnan(df["column"]))
df_filtered.show()
- Note: Unlike
null
orNone
,NaN
is specific to numeric data types (e.g.,FloatType
andDoubleType
). It does not apply to other data types likeStringType
.
3. Checking for None
, null
, and NaN
Check for null
or None
Values: Use isNull()
or isNotNull()
to check for null values in any column.
# Filter rows where the 'name' column is null
df.filter(df["name"].isNull()).show()
Check for NaN
Values: Use isnan()
to check for NaN
values in numeric columns.
from pyspark.sql.functions import isnan
# Filter rows where the 'score' column is NaN
df.filter(isnan(df["score"])).show()
Example: Handling None
, null
, and NaN
Together
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan
# Initialize Spark session
spark = SparkSession.builder.master("local").appName("null_nan_example").getOrCreate()
# Example DataFrame with None (null) and NaN
data = [
(1, "Alice", 50.0),
(2, None, float('nan')),
(3, "Bob", None),
(4, "Charlie", 70.0)
]
columns = ["id", "name", "score"]
df = spark.createDataFrame(data, columns)
# Show the original DataFrame
df.show()
# Filter rows where 'name' is null (None)
df.filter(df["name"].isNull()).show()
# Filter rows where 'score' is NaN
df.filter(isnan(df["score"])).show()
# Replace NaN and null values
df_filled = df.fillna({"name": "Unknown", "score": 0})
df_filled.show()
Output:
+---+-------+-----+
| id| name|score|
+---+-------+-----+
| 1| Alice| 50.0|
| 2| null| NaN|
| 3| Bob| null|
| 4|Charlie| 70.0|
+---+-------+-----+
# Rows where 'name' is null
+---+----+-----+
| id|name|score|
+---+----+-----+
| 2|null| NaN|
+---+----+-----+
# Rows where 'score' is NaN
+---+----+-----+
| id|name|score|
+---+----+-----+
| 2|null| NaN|
+---+----+-----+
# DataFrame after replacing NaN and null values
+---+-------+-----+
| id| name|score|
+---+-------+-----+
| 1| Alice| 50.0|
| 2|Unknown| 0.0|
| 3| Bob| 0.0|
| 4|Charlie| 70.0|
+---+-------+-----+
Summary of Key Points:
None
andnull
: Both represent missing or undefined values in PySpark and can occur in any column type.- Use
isNull()
orfillna()
to handleNone
/null
values.
- Use
NaN
: Specific to numeric data types (FloatType
,DoubleType
), representing an invalid number.- Use
isnan()
to filter or handleNaN
values in numeric columns.
- Use
- Handling nulls and NaNs: You can fill, filter, or drop rows with
null
orNaN
values using PySpark functions likefillna()
,dropna()
,isNull()
, andisnan()
.
All three (None
, null
, NaN
) are common representations of missing data in PySpark, but they behave differently based on their data type and how you handle them.
.na.drop()
vs dropna()
in PySpark
In PySpark, both .na.drop()
and dropna()
are used to drop rows containing null
values from a DataFrame, and they are essentially the same in functionality. The only difference is the way they are called.
1. .na.drop()
vs dropna()
in PySpark
.na.drop()
: This is called through thena
property of the DataFrame, which provides various functions to handle missing data.dropna()
: This is a more direct method call on the DataFrame and achieves the same result as.na.drop()
.
Example of .na.drop()
# Using .na.drop() to drop rows with null values
df_clean = df.na.drop()
Example of dropna()
# Using dropna() to drop rows with null values
df_clean = df.dropna()
Both examples will drop rows that contain any null
values in any column.
Options for .na.drop()
and dropna()
- Both
.na.drop()
anddropna()
support the same arguments likesubset
,how
, andthresh
.subset
: Specifies columns to check for null values.how
: Specifies whether to drop rows with any nulls ('any'
) or all nulls ('all'
).thresh
: Specifies a minimum number of non-null values required to keep a row.
Example with subset
and how
:
# Drop rows where any value in 'column1' or 'column2' is null
df_clean = df.na.drop(subset=["column1", "column2"])
# Drop rows only if all columns are null
df_clean = df.dropna(how='all')
2. Equivalent in Pandas
In Pandas, the equivalent function for dropping rows with missing values is dropna()
.
Example in Pandas:
import pandas as pd
# Create a Pandas DataFrame
data = {'col1': [1, None, 3], 'col2': [4, 5, None]}
df = pd.DataFrame(data)
# Drop rows with any missing values
df_clean = df.dropna()
Pandas dropna()
offers similar options to PySpark’s dropna()
, such as:
subset
: Specify which columns to consider for null values.how
: Drop rows with any or all missing values.thresh
: Keep rows that have at least a specified number of non-null values.
Summary of Equivalent Methods:
Operation | PySpark | Pandas |
---|---|---|
Drop rows with nulls | df.na.drop() / df.dropna() | df.dropna() |
Both PySpark and Pandas offer flexibility to drop rows with missing data using the respective dropna()
method, with similar arguments to control behavior such as subset
, how
, and thresh
.
What is PySpark DataFrame API? How does it relate to Pyspark SQL?
In PySpark, you can perform operations on DataFrames using two main APIs: the DataFrame API and the Spark SQL API. Both are powerful and can be used interchangeably to some extent.
Here’s a breakdown of key concepts and functionalities:
1. Creating DataFrames:
you can create a PySpark DataFrame from a list of rows
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
df = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df
DataFrame[a: bigint, b: double,c: string, d: date, e: timestamp]
Create a PySpark DataFrame with an explicit schema.
df = spark.createDataFrame([
(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]
From existing data sources:
Read data from various sources like CSV, Parquet, JSON, or Hive tables using spark.read.format("format").load("path")
.
Leverage JDBC to connect to external databases and read tables.
PySpark DataFrame from a pandas DataFrame
pandas_df = pd.DataFrame({
'a': [1, 2, 3],
'b': [2., 3., 4.],
'c': ['string1', 'string2', 'string3'],
'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df
Also we can Create DataFrames from lists, dictionaries, or other Python data structures using spark.createDataFrame(data)
.
PySpark DataFrame from an RDD consisting of a list of tuples.
rdd = spark.sparkContext.parallelize([
(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
])
df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]
The DataFrames created above all have the same results and schema.
# All DataFrames above result same.
df.show()
df.printSchema()
2. DataFrame Structure:
- DataFrames are distributed collections of rows, where each row represents a record.
- Each column has a name and a specific data type (e.g., integer, string, timestamp).
- You can access columns by name (e.g.,
df.column_name
).
3. DataFrame Operations:
- Selection: Select specific columns using
df.select("column1", "column2")
. - Filtering: Filter rows based on certain conditions using
df.where(condition)
. - Sorting: Sort DataFrames using
df.sort(col1.desc(), col2.asc())
. - Aggregation: Calculate summary statistics using functions like
df.groupBy("column").agg(F.avg("value"))
. - Joining: Combine DataFrames based on shared columns using
df1.join(df2, on="column_name", how="inner")
. - Transformations: Apply custom logic to each row using
df.withColumn("new_column", F.col("existing_column") * 2)
. - Action Operations: These operations return results or trigger computations, like
df.show()
,df.count()
,df.write.parquet("path")
.
4. Key Features:
- Laziness: DataFrame operations are not immediately executed but rather built into a logical plan. When an action operation is triggered (e.g.,
df.show()
), the plan is optimized and executed on the cluster. - Distributed Processing: Operations are spread across the Spark cluster for parallel processing, enabling efficient handling of large datasets.
- Data Types: DataFrames support various data types for flexibility in data representation.
5. Relationship with Spark SQL:
- PySpark DataFrame API provides a programmatic way to interact with DataFrames.
- Spark SQL offers a SQL-like syntax for querying DataFrames. You can seamlessly switch between these APIs for data manipulation.
Here are some resources for further exploration:
- PySpark DataFrames documentation: https://spark.apache.org/docs/latest/
The PySpark SQL API offers a powerful way to interact with structured data in PySpark using SQL-like syntax.
Here’s a breakdown of key concepts and functionalities:
1. SparkSession:
- The entry point for working with PySpark SQL is the
SparkSession
object. - It provides methods for creating DataFrames, interacting with catalogs (like Hive tables), and configuring Spark SQL settings.
2. DataFrames:
- DataFrames are the fundamental data structures in PySpark SQL. They represent distributed collections of rows with named columns and specific data types.
- You can create DataFrames from various sources (CSV, Parquet, JSON, Hive tables) or using Python collections (lists, dictionaries).
3. SQL-like Operations:
- PySpark SQL allows you to write SQL-inspired queries on DataFrames.
- You can perform actions like:
- Selection:
SELECT column1, column2 FROM df
- Filtering:
SELECT * FROM df WHERE condition
- Aggregation:
SELECT column, SUM(value) AS total FROM df GROUP BY column
- Joining:
SELECT * FROM df1 JOIN df2 ON df1.column = df2.column
- Sorting:
SELECT * FROM df ORDER BY column ASC
- Selection:
4. Key Features:
- Integration with DataFrame API: You can seamlessly switch between PySpark SQL and DataFrame API for data manipulation.
- SQL Functions: PySpark SQL provides a rich set of built-in functions for various operations (e.g., mathematical, string manipulation, date/time).
- User-Defined Functions (UDFs): You can create custom functions in Python and use them within your SQL queries.
- Catalog Integration: PySpark SQL interacts with catalogs like Hive, allowing you to manage and query external tables.
5. Benefits of PySpark SQL:
- Readability: SQL-like syntax often feels more natural for data analysts and SQL-familiar users.
- Conciseness: Complex data manipulations can be expressed concisely in SQL queries.
- Familiarity: Existing SQL knowledge can be leveraged for working with PySpark.
6. Resources:
- PySpark SQL documentation: https://spark.apache.org/docs/latest/api/python/index.html
- PySpark SQL tutorial: https://sparkbyexamples.com/pyspark-dataframe-tutorial-with-examples/
- While PySpark SQL provides SQL-like syntax, it’s not a full-fledged SQL implementation. Some functionalities might differ from traditional SQL engines.
- PySpark SQL leverages the power of PySpark’s distributed processing for efficient handling of large datasets.
Below is an explanation of each, along with examples.
DataFrame API
The DataFrame API provides a higher-level abstraction for structured data. It allows you to manipulate data in a more Pythonic way.
Example of DataFrame API
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("DataFrame API Example").getOrCreate()
# Create a DataFrame
data = [("James", "Smith", "M", 3000),
("Anna", "Rose", "F", 4100),
("Robert", "Williams", "M", 6200)]
columns = ["FirstName", "LastName", "Gender", "Salary"]
df = spark.createDataFrame(data, schema=columns)
# Show DataFrame
df.show()
# Select specific columns
df.select("FirstName", "Salary").show()
# Filter rows
df.filter(df["Salary"] > 4000).show()
# Group by and aggregation
df.groupBy("Gender").avg("Salary").show()
# Add new column
df.withColumn("Bonus", df["Salary"] * 0.10).show()
# Stop Spark session
spark.stop()
Spark SQL API
The Spark SQL API allows you to run SQL queries on DataFrames, providing a SQL-like interface for querying data. This can be particularly useful for users who are familiar with SQL.
Example of Spark SQL API
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("Spark SQL API Example").getOrCreate()
# Create a DataFrame
data = [("James", "Smith", "M", 3000),
("Anna", "Rose", "F", 4100),
("Robert", "Williams", "M", 6200)]
columns = ["FirstName", "LastName", "Gender", "Salary"]
df = spark.createDataFrame(data, schema=columns)
# Register DataFrame as a temporary SQL table
df.createOrReplaceTempView("employees")
# Show DataFrame using SQL
spark.sql("SELECT * FROM employees").show()
# Select specific columns using SQL
spark.sql("SELECT FirstName, Salary FROM employees").show()
# Filter rows using SQL
spark.sql("SELECT * FROM employees WHERE Salary > 4000").show()
# Group by and aggregation using SQL
spark.sql("SELECT Gender, AVG(Salary) AS AvgSalary FROM employees GROUP BY Gender").show()
# Add new column using SQL
spark.sql("SELECT *, Salary * 0.10 AS Bonus FROM employees").show()
# Stop Spark session
spark.stop()
When to Use Each API
- DataFrame API: Use this for more programmatic operations and when you need to chain multiple transformations together. It is more Pythonic and integrates seamlessly with other PySpark operations.
- Spark SQL API: Use this when you have complex queries that are easier to express in SQL or when working with people who are more familiar with SQL.
Combined Example
You can combine both APIs in a single application. For instance, you might load data and perform initial transformations with the DataFrame API, then register the DataFrame as a table and perform complex queries with Spark SQL.
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("Combined Example").getOrCreate()
# Create a DataFrame
data = [("James", "Smith", "M", 3000),
("Anna", "Rose", "F", 4100),
("Robert", "Williams", "M", 6200)]
columns = ["FirstName", "LastName", "Gender", "Salary"]
df = spark.createDataFrame(data, schema=columns)
# DataFrame API
df = df.withColumn("Bonus", df["Salary"] * 0.10)
# Register DataFrame as a temporary SQL table
df.createOrReplaceTempView("employees")
# Spark SQL API
result_df = spark.sql("SELECT FirstName, LastName, Salary + Bonus AS TotalCompensation FROM employees")
# Show result
result_df.show()
# Stop Spark session
spark.stop()
This flexibility allows you to leverage the strengths of both APIs and use the most appropriate tool for each task in your ETL pipeline.
Questions-
How to run SQL queries in PySpark SQL?
Running SQL Queries in PySpark
To run SQL queries in PySpark, you can use the SparkSession
object, which provides a SQL context for executing SQL queries.
SparkSession in PySpark SQL
SparkSession
is the entry point to programming Spark with the Dataset and DataFrame API. It provides a SQL context for executing SQL queries.
Here’s an example of creating a SparkSession
object:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("My App").getOrCreate()
SQL Context
In PySpark, a SQL context refers to the environment or scope in which SQL queries are executed. It provides a way to interact with Spark’s SQL engine, allowing you to execute SQL queries, create tables, and perform other SQL-related operations.
Think of a SQL context as a container that holds the state and metadata necessary to execute SQL queries. It’s like a workspace where you can write and execute SQL code, and it keeps track of the tables, views, and other objects you create.
When you create a SparkSession
object, it automatically creates a SQL context for you. This SQL context is tied to the SparkSession
object and is used to execute SQL queries, create tables, and perform other SQL-related operations.
Key Components of a SQL Context
- Catalog: The catalog is the metadata store that keeps track of tables, views, and other objects.
- SQL Parser: The SQL parser is responsible for parsing SQL queries and converting them into an abstract syntax tree (AST).
- Optimizer: The optimizer is responsible for optimizing the execution plan of SQL queries.
- Executor: The executor is responsible for executing the optimized execution plan.
SQL Context Operations
- Executing SQL Queries: You can execute SQL queries using the
sql
method. - Creating Tables: You can create tables using the
createTable
method. - Creating Views: You can create views using the
createView
method. - Registering DataFrames as Tables: You can register DataFrames as tables using the
createOrReplaceTempView
method.
By understanding the SQL context, you can effectively use PySpark’s SQL features to analyze and manipulate data.
Spark.sql(“””….. “”””)
PySpark cheat sheet
Here’s a PySpark cheat sheet designed for a quick yet comprehensive revision of PySpark concepts, architecture, optimizations, commands, and common operations. This guide is structured to cover the essentials from architecture to data processing and Spark SQL.
1. PySpark Architecture Basics
Component | Description |
---|---|
Driver Program | Main program; responsible for creating SparkContext, defining transformations, and coordinating actions. |
SparkContext | Core connection to the Spark cluster, used to create RDDs and DataFrames. |
Cluster Manager | Manages resources across nodes (e.g., YARN, Mesos, or Standalone mode). |
Executors | Worker nodes that execute tasks, store data, and return results. |
Tasks | Individual units of work sent to executors by the driver program. |
Job | Each action in Spark triggers a job, consisting of multiple stages and tasks. |
Stage | Jobs are broken into stages at shuffle boundaries. |
DAG Scheduler | Transforms jobs into a DAG (Directed Acyclic Graph) of stages and tasks. |
2. Spark Submit Command
spark-submit
--master yarn
--deploy-mode cluster
--num-executors 10
--executor-cores 4
--executor-memory 16g
--driver-memory 16g
--conf spark.sql.shuffle.partitions=200
script.py
Parameter | Description |
---|---|
--master | Sets the cluster manager (e.g., yarn , local , mesos ). |
--deploy-mode | client or cluster (execution on client or cluster mode). |
--num-executors | Number of executors to use. |
--executor-cores | Number of CPU cores per executor. |
--executor-memory | Memory per executor. |
--driver-memory | Memory allocated for the driver. |
--conf | Additional Spark configurations (e.g., shuffle.partitions ). |
3. DataFrame Basics and Operations
Creating a DataFrame
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()
data = [("Alice", 1), ("Bob", 2)]
columns = ["Name", "Id"]
df = spark.createDataFrame(data, schema=columns)
Viewing Data
Command | Description |
---|---|
df.show(n) | Display first n rows of DataFrame. |
df.printSchema() | Print schema of DataFrame. |
df.columns | List columns of DataFrame. |
df.describe().show() | Summary statistics for numerical columns. |
Selecting Columns
df.select("Name", "Id").show()
df.select(df.Name, df.Id).show()
Filtering Data
df.filter(df["Id"] > 1).show()
df.filter("Id > 1").show() # SQL-style string condition
Adding and Renaming Columns
from pyspark.sql.functions import lit
df = df.withColumn("Country", lit("USA"))
df = df.withColumnRenamed("Id", "Identifier")
Dropping Columns
df = df.drop("Country")
4. Data Transformations
Transformation | Example | Description |
---|---|---|
Map | rdd.map(lambda x: x * 2) | Applies a function to each element. |
Filter | rdd.filter(lambda x: x > 2) | Filters elements based on condition. |
FlatMap | rdd.flatMap(lambda x: x.split(" ")) | Applies function and flattens the result. |
Distinct | df.distinct() | Removes duplicates. |
Union | df.union(df2) | Combines two DataFrames with the same schema. |
GroupBy + Aggregation | df.groupBy("col").count() | Groups and aggregates data. |
Aggregations
from pyspark.sql.functions import avg, sum, max, min
df.groupBy("Name").agg(
sum("Id").alias("Total_Id"),
avg("Id").alias("Average_Id")
).show()
Sorting
df.orderBy("Id", ascending=False).show()
df.sort("Id", "Name").show()
5. Spark SQL
Registering a DataFrame as a SQL Table
df.createOrReplaceTempView("people")
Executing SQL Queries
sql_df = spark.sql("SELECT * FROM people WHERE Id > 1")
sql_df.show()
Useful SQL Commands
Command | Description |
---|---|
SELECT col FROM table | Select specific columns from a table. |
WHERE condition | Filter rows based on condition. |
GROUP BY col | Group rows based on a column. |
ORDER BY col | Sort rows by column. |
JOIN | Perform SQL joins (INNER, LEFT, RIGHT, FULL). |
LIMIT n | Limit the number of rows in the output. |
Pivoting and Melting (Unpivoting)
df.groupBy("Name").pivot("Year").sum("Sales").show()
6. PySpark Functions for Data Manipulation
Function | Description |
---|---|
col("col_name") | Access column in DataFrame. |
lit(value) | Creates a column with a literal value. |
when(condition, value) | Conditional statement similar to if in SQL. |
isNull() , isNotNull() | Check if column values are null/not null. |
alias("alias_name") | Rename column in query. |
concat(col1, col2) | Concatenate multiple columns. |
regexp_replace(col, pattern, replacement) | Replace regex matches. |
7. Optimization Techniques
Technique | Command | Description |
---|---|---|
Persist/Cache | df.cache() , df.persist() | Cache or persist DataFrame in memory for reuse. |
Repartitioning | df.repartition(num_partitions) | Increase or reduce number of partitions. |
Coalesce | df.coalesce(num_partitions) | Reduce number of partitions without shuffle. |
Broadcast Join | broadcast(df) | Optimize joins for small tables using broadcast join. |
Avoid Using Too Many Partitions | spark.sql.shuffle.partitions = 200 | Set reasonable number of partitions for shuffling. |
Partitioning on Keys | df.write.partitionBy("col").parquet("path") | Partition data by column to improve reading efficiency. |
Avoid Data Skew | Use salting or avoid uneven distribution in join columns. | Prevent performance issues caused by skewed data. |
Filter Early | Apply filters early to reduce data size for downstream ops. | Reduces data processing and memory usage. |
8. Handling Missing Data
Operation | Command | Description |
---|---|---|
Drop Missing Values | df.na.drop() | Drop rows with any NaN values. |
Fill Missing Values | df.na.fill(value) | Fill all NaN with a specific value. |
Fill Specific Columns | df.na.fill({'col1': 0, 'col2': 'unknown'}) | Fill NaN values for specific columns. |
9. Window Functions
Syntax for Window Functions
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank
window_spec = Window.partitionBy("col1").orderBy("col2")
# Using row_number
df.withColumn("row_num", row_number().over(window_spec)).show()
# Using rank
df.withColumn("rank", rank().over(window_spec)).show()
Function | Description |
---|---|
row_number() | Assigns a unique number to each row within a window. |
rank() | Ranks rows within a window, ties get same rank. |
dense_rank() | Ranks rows with no gaps in ranking. |
lag() | Accesses previous row’s value within a window. |
lead() | Accesses next row’s value within a window. |
10. Common PySpark Configuration Settings
Configuration | Description |
---|---|
spark.executor.memory | Allocates memory per executor (e.g., 2g ). |
spark.driver.memory | Allocates memory for the driver program. |
spark.sql.shuffle.partitions | Sets number of partitions for shuffling data. |
spark.executor.cores | Number of cores per executor. |
spark.default.parallelism | Default number of partitions in RDDs. |
spark.sql.autoBroadcastJoinThreshold | Sets size threshold for broadcast joins. |
spark.sql.cache.serializer | Serializer to use for cached data. |
spark.dynamicAllocation.enabled | Enables dynamic allocation of executors. |
This cheat sheet offers a high-level overview of PySpark essentials, covering core concepts, commands, and common operations. It serves as a quick reference for PySpark development and optimizations.
Let me know if you need more specific examples or deeper explanations on any section!