PySpark Operations on DataFrames

PySpark DataFrames provide various operations to manipulate and analyze data. These operations can be classified into two categories: Transformations and Actions.


Transformations

Transformations create a new DataFrame by applying operations on an existing one. They are lazy, meaning they don’t execute immediately but wait for an action to trigger execution.


1. select(): Selecting Columns

This operation is used to select a subset of columns or expressions from a DataFrame.

Syntax
df.select(*cols)
Examples
  1. Selecting Specific Columns: df.select("name", "age").show()
  2. Selecting Columns Using col: from pyspark.sql.functions import col df.select(col("name"), col("age")).show()
  3. Renaming Columns: df.select(col("name").alias("full_name")).show()
  4. Using SQL Expressions: df.selectExpr("name", "age + 1 AS age_plus_one").show()
  5. Selecting All Columns: df.select("*").show()

2. withColumn(): Adding or Modifying Columns

The withColumn() function adds or replaces a column in a DataFrame.

Syntax
df.withColumn("new_col", expression)
Use Cases and Examples
  1. Adding a New Column: df.withColumn("new_age", df["age"] + 1).show()
  2. Conditional Column Creation: from pyspark.sql.functions import when, col df.withColumn("is_adult", when(col("age") >= 18, "Yes").otherwise("No")).show()
  3. Using UDFs to Transform Data: from pyspark.sql.functions import udf from pyspark.sql.types import StringType def to_upper(name): return name.upper() udf_upper = udf(to_upper, StringType()) df.withColumn("uppercase_name", udf_upper(df["name"])).show()
  4. Column Calculation Using Multiple Columns: df.withColumn("total_price", col("quantity") * col("price")).show()
  5. Creating Structs or Arrays: from pyspark.sql.functions import struct, array, lit df.withColumn("address", struct(lit("123 St"), lit("City"), lit("Country"))).show()

3. drop(): Removing Columns

Drops specified columns from the DataFrame.

Syntax
df.drop(*cols)
Example
df.drop("age").show()

4. distinct(): Removing Duplicate Rows

Returns distinct rows from the DataFrame.

Syntax
df.distinct()
Example
df.distinct().show()

5. filter() or where(): Filtering Rows

Filters rows based on a condition.

Syntax
df.filter(condition)
Examples
  1. Basic Condition: df.filter(df["age"] > 30).show()
  2. Using SQL Expressions: from pyspark.sql.functions import expr df.filter(expr("age > 30 AND salary > 3000")).show()

6. groupBy(): Grouping Data

Groups rows based on specified columns and allows aggregations.

Syntax
df.groupBy("col").agg({"col2": "agg_func"})
Examples
  1. Basic Aggregation: df.groupBy("department").mean("salary").show()
  2. Multiple Aggregations: from pyspark.sql.functions import sum, avg df.groupBy("department").agg(sum("salary").alias("total_salary"), avg("age").alias("average_age")).show()

7. orderBy() or sort(): Sorting Data

Sorts rows based on specified columns.

Syntax
df.orderBy(df["col"].asc())
Example
df.orderBy(df["age"].desc()).show()

8. join(): Joining DataFrames

Joins two DataFrames on a condition.

Syntax
df1.join(df2, df1["key"] == df2["key"], "join_type")
Example
df1.join(df2, df1["id"] == df2["id"], "inner").show()

9. union(): Combining DataFrames

Combines two DataFrames with the same schema.

Syntax
df1.union(df2)
Example
df1.union(df2).show()

10. sample(): Sampling Data

Returns a random sample of rows.

Syntax
df.sample(withReplacement=False, fraction=0.1)
Example
df.sample(withReplacement=False, fraction=0.2).show()

Actions

Actions trigger the execution of the transformations recorded in the DAG (Directed Acyclic Graph).


1. collect(): Fetching All Rows

Returns all rows as a list of Row objects.

Example
rows = df.collect()
for row in rows:
    print(row)

2. first() and take(): Fetching Specific Rows

  1. First Row: print(df.first())
  2. First n Rows: print(df.take(5))

3. cache() and persist(): Caching Data

  1. Caching: df.cache()
  2. Persisting: from pyspark import StorageLevel df.persist(StorageLevel.MEMORY_AND_DISK)

4. write(): Writing Data

Writes the DataFrame to the specified format.

Example
df.write.format("parquet").save("/path/to/output")

Using SQL Expressions with expr()

The expr() function allows SQL-like expressions inside PySpark operations.

Example with filter()
from pyspark.sql.functions import expr
df.filter(expr("age > 30 AND salary > 3000")).show()
Detailed Example
df.selectExpr("name", "age + 1 AS age_plus_one").show()

These operations cover most use cases for working with PySpark DataFrames, enabling efficient data transformation and analysis.



Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Pages ( 3 of 8 ): « Previous12 3 45 ... 8Next »

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading