PySpark Operations on DataFrames
PySpark DataFrames provide various operations to manipulate and analyze data. These operations can be classified into two categories: Transformations and Actions.
Transformations
Transformations create a new DataFrame by applying operations on an existing one. They are lazy, meaning they don’t execute immediately but wait for an action to trigger execution.
1. select()
: Selecting Columns
This operation is used to select a subset of columns or expressions from a DataFrame.
Syntax
df.select(*cols)
Examples
- Selecting Specific Columns:
df.select("name", "age").show()
- Selecting Columns Using
col
:from pyspark.sql.functions import col df.select(col("name"), col("age")).show()
- Renaming Columns:
df.select(col("name").alias("full_name")).show()
- Using SQL Expressions:
df.selectExpr("name", "age + 1 AS age_plus_one").show()
- Selecting All Columns:
df.select("*").show()
2. withColumn()
: Adding or Modifying Columns
The withColumn()
function adds or replaces a column in a DataFrame.
Syntax
df.withColumn("new_col", expression)
Use Cases and Examples
- Adding a New Column:
df.withColumn("new_age", df["age"] + 1).show()
- Conditional Column Creation:
from pyspark.sql.functions import when, col df.withColumn("is_adult", when(col("age") >= 18, "Yes").otherwise("No")).show()
- Using UDFs to Transform Data:
from pyspark.sql.functions import udf from pyspark.sql.types import StringType def to_upper(name): return name.upper() udf_upper = udf(to_upper, StringType()) df.withColumn("uppercase_name", udf_upper(df["name"])).show()
- Column Calculation Using Multiple Columns:
df.withColumn("total_price", col("quantity") * col("price")).show()
- Creating Structs or Arrays:
from pyspark.sql.functions import struct, array, lit df.withColumn("address", struct(lit("123 St"), lit("City"), lit("Country"))).show()
3. drop()
: Removing Columns
Drops specified columns from the DataFrame.
Syntax
df.drop(*cols)
Example
df.drop("age").show()
4. distinct()
: Removing Duplicate Rows
Returns distinct rows from the DataFrame.
Syntax
df.distinct()
Example
df.distinct().show()
5. filter()
or where()
: Filtering Rows
Filters rows based on a condition.
Syntax
df.filter(condition)
Examples
- Basic Condition:
df.filter(df["age"] > 30).show()
- Using SQL Expressions:
from pyspark.sql.functions import expr df.filter(expr("age > 30 AND salary > 3000")).show()
6. groupBy()
: Grouping Data
Groups rows based on specified columns and allows aggregations.
Syntax
df.groupBy("col").agg({"col2": "agg_func"})
Examples
- Basic Aggregation:
df.groupBy("department").mean("salary").show()
- Multiple Aggregations:
from pyspark.sql.functions import sum, avg df.groupBy("department").agg(sum("salary").alias("total_salary"), avg("age").alias("average_age")).show()
7. orderBy()
or sort()
: Sorting Data
Sorts rows based on specified columns.
Syntax
df.orderBy(df["col"].asc())
Example
df.orderBy(df["age"].desc()).show()
8. join()
: Joining DataFrames
Joins two DataFrames on a condition.
Syntax
df1.join(df2, df1["key"] == df2["key"], "join_type")
Example
df1.join(df2, df1["id"] == df2["id"], "inner").show()
9. union()
: Combining DataFrames
Combines two DataFrames with the same schema.
Syntax
df1.union(df2)
Example
df1.union(df2).show()
10. sample()
: Sampling Data
Returns a random sample of rows.
Syntax
df.sample(withReplacement=False, fraction=0.1)
Example
df.sample(withReplacement=False, fraction=0.2).show()
Actions
Actions trigger the execution of the transformations recorded in the DAG (Directed Acyclic Graph).
1. collect()
: Fetching All Rows
Returns all rows as a list of Row objects.
Example
rows = df.collect()
for row in rows:
print(row)
2. first()
and take()
: Fetching Specific Rows
- First Row:
print(df.first())
- First n Rows:
print(df.take(5))
3. cache()
and persist()
: Caching Data
- Caching:
df.cache()
- Persisting:
from pyspark import StorageLevel df.persist(StorageLevel.MEMORY_AND_DISK)
4. write()
: Writing Data
Writes the DataFrame to the specified format.
Example
df.write.format("parquet").save("/path/to/output")
Using SQL Expressions with expr()
The expr()
function allows SQL-like expressions inside PySpark operations.
Example with filter()
from pyspark.sql.functions import expr
df.filter(expr("age > 30 AND salary > 3000")).show()
Detailed Example
df.selectExpr("name", "age + 1 AS age_plus_one").show()
These operations cover most use cases for working with PySpark DataFrames, enabling efficient data transformation and analysis.
Discover more from HintsToday
Subscribe to get the latest posts sent to your email.