In PySpark, you can perform operations on DataFrames using two main APIs: the DataFrame API and the Spark SQL API. Both are powerful and can be used interchangeably to some extent.
Here’s a breakdown of key concepts and functionalities:
1. Creating DataFrames:
you can create a PySpark DataFrame from a list of rows
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
df = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df
DataFrame[a: bigint, b: double,c: string, d: date, e: timestamp]
Create a PySpark DataFrame with an explicit schema.
df = spark.createDataFrame([
(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]
From existing data sources:
Read data from various sources like CSV, Parquet, JSON, or Hive tables using spark.read.format("format").load("path")
.
Leverage JDBC to connect to external databases and read tables.
PySpark DataFrame from a pandas DataFrame
pandas_df = pd.DataFrame({
'a': [1, 2, 3],
'b': [2., 3., 4.],
'c': ['string1', 'string2', 'string3'],
'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df
Also we can Create DataFrames from lists, dictionaries, or other Python data structures using spark.createDataFrame(data)
.
PySpark DataFrame from an RDD consisting of a list of tuples.
rdd = spark.sparkContext.parallelize([
(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
])
df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]
The DataFrames created above all have the same results and schema.
# All DataFrames above result same.
df.show()
df.printSchema()
2. DataFrame Structure:
- DataFrames are distributed collections of rows, where each row represents a record.
- Each column has a name and a specific data type (e.g., integer, string, timestamp).
- You can access columns by name (e.g.,
df.column_name
).
3. DataFrame Operations:
- Selection: Select specific columns using
df.select("column1", "column2")
. - Filtering: Filter rows based on certain conditions using
df.where(condition)
. - Sorting: Sort DataFrames using
df.sort(col1.desc(), col2.asc())
. - Aggregation: Calculate summary statistics using functions like
df.groupBy("column").agg(F.avg("value"))
. - Joining: Combine DataFrames based on shared columns using
df1.join(df2, on="column_name", how="inner")
. - Transformations: Apply custom logic to each row using
df.withColumn("new_column", F.col("existing_column") * 2)
. - Action Operations: These operations return results or trigger computations, like
df.show()
,df.count()
,df.write.parquet("path")
.
4. Key Features:
- Laziness: DataFrame operations are not immediately executed but rather built into a logical plan. When an action operation is triggered (e.g.,
df.show()
), the plan is optimized and executed on the cluster. - Distributed Processing: Operations are spread across the Spark cluster for parallel processing, enabling efficient handling of large datasets.
- Data Types: DataFrames support various data types for flexibility in data representation.
5. Relationship with Spark SQL:
- PySpark DataFrame API provides a programmatic way to interact with DataFrames.
- Spark SQL offers a SQL-like syntax for querying DataFrames. You can seamlessly switch between these APIs for data manipulation.
Here are some resources for further exploration:
- PySpark DataFrames documentation: https://spark.apache.org/docs/latest/
The PySpark SQL API offers a powerful way to interact with structured data in PySpark using SQL-like syntax. Here’s a breakdown of key concepts and functionalities:
1. SparkSession:
- The entry point for working with PySpark SQL is the
SparkSession
object. - It provides methods for creating DataFrames, interacting with catalogs (like Hive tables), and configuring Spark SQL settings.
2. DataFrames:
- DataFrames are the fundamental data structures in PySpark SQL. They represent distributed collections of rows with named columns and specific data types.
- You can create DataFrames from various sources (CSV, Parquet, JSON, Hive tables) or using Python collections (lists, dictionaries).
3. SQL-like Operations:
- PySpark SQL allows you to write SQL-inspired queries on DataFrames.
- You can perform actions like:
- Selection:
SELECT column1, column2 FROM df
- Filtering:
SELECT * FROM df WHERE condition
- Aggregation:
SELECT column, SUM(value) AS total FROM df GROUP BY column
- Joining:
SELECT * FROM df1 JOIN df2 ON df1.column = df2.column
- Sorting:
SELECT * FROM df ORDER BY column ASC
- Selection:
4. Key Features:
- Integration with DataFrame API: You can seamlessly switch between PySpark SQL and DataFrame API for data manipulation.
- SQL Functions: PySpark SQL provides a rich set of built-in functions for various operations (e.g., mathematical, string manipulation, date/time).
- User-Defined Functions (UDFs): You can create custom functions in Python and use them within your SQL queries.
- Catalog Integration: PySpark SQL interacts with catalogs like Hive, allowing you to manage and query external tables.
5. Benefits of PySpark SQL:
- Readability: SQL-like syntax often feels more natural for data analysts and SQL-familiar users.
- Conciseness: Complex data manipulations can be expressed concisely in SQL queries.
- Familiarity: Existing SQL knowledge can be leveraged for working with PySpark.
6. Resources:
- PySpark SQL documentation: https://spark.apache.org/docs/latest/api/python/index.html
- PySpark SQL tutorial: https://sparkbyexamples.com/pyspark-dataframe-tutorial-with-examples/
- While PySpark SQL provides SQL-like syntax, it’s not a full-fledged SQL implementation. Some functionalities might differ from traditional SQL engines.
- PySpark SQL leverages the power of PySpark’s distributed processing for efficient handling of large datasets.
Below is an explanation of each, along with examples.
DataFrame API
The DataFrame API provides a higher-level abstraction for structured data. It allows you to manipulate data in a more Pythonic way.
Example of DataFrame API
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("DataFrame API Example").getOrCreate()
# Create a DataFrame
data = [("James", "Smith", "M", 3000),
("Anna", "Rose", "F", 4100),
("Robert", "Williams", "M", 6200)]
columns = ["FirstName", "LastName", "Gender", "Salary"]
df = spark.createDataFrame(data, schema=columns)
# Show DataFrame
df.show()
# Select specific columns
df.select("FirstName", "Salary").show()
# Filter rows
df.filter(df["Salary"] > 4000).show()
# Group by and aggregation
df.groupBy("Gender").avg("Salary").show()
# Add new column
df.withColumn("Bonus", df["Salary"] * 0.10).show()
# Stop Spark session
spark.stop()
Spark SQL API
The Spark SQL API allows you to run SQL queries on DataFrames, providing a SQL-like interface for querying data. This can be particularly useful for users who are familiar with SQL.
Example of Spark SQL API
pythonCopy codefrom pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("Spark SQL API Example").getOrCreate()
# Create a DataFrame
data = [("James", "Smith", "M", 3000),
("Anna", "Rose", "F", 4100),
("Robert", "Williams", "M", 6200)]
columns = ["FirstName", "LastName", "Gender", "Salary"]
df = spark.createDataFrame(data, schema=columns)
# Register DataFrame as a temporary SQL table
df.createOrReplaceTempView("employees")
# Show DataFrame using SQL
spark.sql("SELECT * FROM employees").show()
# Select specific columns using SQL
spark.sql("SELECT FirstName, Salary FROM employees").show()
# Filter rows using SQL
spark.sql("SELECT * FROM employees WHERE Salary > 4000").show()
# Group by and aggregation using SQL
spark.sql("SELECT Gender, AVG(Salary) AS AvgSalary FROM employees GROUP BY Gender").show()
# Add new column using SQL
spark.sql("SELECT *, Salary * 0.10 AS Bonus FROM employees").show()
# Stop Spark session
spark.stop()
When to Use Each API
- DataFrame API: Use this for more programmatic operations and when you need to chain multiple transformations together. It is more Pythonic and integrates seamlessly with other PySpark operations.
- Spark SQL API: Use this when you have complex queries that are easier to express in SQL or when working with people who are more familiar with SQL.
Combined Example
You can combine both APIs in a single application. For instance, you might load data and perform initial transformations with the DataFrame API, then register the DataFrame as a table and perform complex queries with Spark SQL.
pythonCopy codefrom pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("Combined Example").getOrCreate()
# Create a DataFrame
data = [("James", "Smith", "M", 3000),
("Anna", "Rose", "F", 4100),
("Robert", "Williams", "M", 6200)]
columns = ["FirstName", "LastName", "Gender", "Salary"]
df = spark.createDataFrame(data, schema=columns)
# DataFrame API
df = df.withColumn("Bonus", df["Salary"] * 0.10)
# Register DataFrame as a temporary SQL table
df.createOrReplaceTempView("employees")
# Spark SQL API
result_df = spark.sql("SELECT FirstName, LastName, Salary + Bonus AS TotalCompensation FROM employees")
# Show result
result_df.show()
# Stop Spark session
spark.stop()
This flexibility allows you to leverage the strengths of both APIs and use the most appropriate tool for each task in your ETL pipeline.
Discover more from HintsToday
Subscribe to get the latest posts sent to your email.