Pyspark Dataframe programming – operations, functions, all statements, syntax with Examples

PySpark provides a powerful API for data manipulation, similar to pandas, but optimized for big data processing. Below is a comprehensive overview of DataFrame operations, functions, and syntax in PySpark with examples.

Creating DataFrames

Creating DataFrames from various sources is a common task in PySpark. Below are examples for creating DataFrames from CSV files, Excel files, Python List, Python Tuple, Python Dictionary, Pandas DataFrames, Hive tables, values, RDDs, Oracle databases, and HBase tables.

1. Creating DataFrames from CSV Files

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from CSV") \
.getOrCreate()

# Read CSV file
df_csv = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
df_csv.show()

2. Creating DataFrames from Excel Files

To read Excel files, you need to install the spark-excel library.

# Add the following dependency when initializing SparkSession
# .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.5")

df_excel = spark.read \
.format("com.crealytics.spark.excel") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("path/to/file.xlsx")
df_excel.show()

3. Creating DataFrames from Python List

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from List") \
.getOrCreate()

# Create DataFrame from list
data = [("Alice", 30), ("Bob", 25), ("Cathy", 28)]
columns = ["name", "age"]
df_list = spark.createDataFrame(data, columns)
df_list.show()

4. Creating DataFrames from Python Tuple

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from Tuple") \
.getOrCreate()

# Create DataFrame from tuple
data = (("Alice", 30), ("Bob", 25), ("Cathy", 28))
columns = ["name", "age"]
df_tuple = spark.createDataFrame(data, columns)
df_tuple.show()

5. Creating DataFrames from Python Dictionary

pythonCopy codefrom pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Create DataFrames from Dictionary") \
    .getOrCreate()

# Create DataFrame from dictionary
data = {"name": ["Alice", "Bob", "Cathy"], "age": [30, 25, 28]}
df_dict = spark.createDataFrame(pd.DataFrame(data))
df_dict.show()

6. Creating DataFrames from Pandas DataFrame

import pandas as pd
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from Pandas") \
.getOrCreate()

# Create a Pandas DataFrame
pdf = pd.DataFrame({"name": ["Alice", "Bob", "Cathy"], "age": [30, 25, 28]})

# Convert Pandas DataFrame to Spark DataFrame
df_pandas = spark.createDataFrame(pdf)
df_pandas.show()

7. Creating DataFrames from Hive Tables

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from Hive") \
.enableHiveSupport() \
.getOrCreate()

# Read Hive table
df_hive = spark.sql("SELECT * FROM database.table_name")
df_hive.show()

8. Creating DataFrames from Values

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from Values") \
.getOrCreate()

# Create DataFrame from values
data = [("Alice", 30), ("Bob", 25), ("Cathy", 28)]
columns = ["name", "age"]
df_values = spark.createDataFrame(data, columns)
df_values.show()

9. Creating DataFrames from RDDs

from pyspark.sql import Row, SparkSession

# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from RDD") \
.getOrCreate()

# Create an RDD
rdd = spark.sparkContext.parallelize([
Row(name="Alice", age=30),
Row(name="Bob", age=25),
Row(name="Cathy", age=28)
])

# Convert RDD to DataFrame
df_rdd = spark.createDataFrame(rdd)
df_rdd.show()

10. Creating DataFrames from Oracle Database

To read from Oracle, you need to have the JDBC driver for Oracle in your classpath.

# Add the following dependency when initializing SparkSession
# .config("spark.jars", "path/to/ojdbc8.jar")

jdbc_url = "jdbc:oracle:thin:@hostname:port:SID"
connection_properties = {
"user": "username",
"password": "password",
"driver": "oracle.jdbc.driver.OracleDriver"
}

df_oracle = spark.read.jdbc(jdbc_url, "schema.table_name", properties=connection_properties)
df_oracle.show()

11. Creating DataFrames from HBase Tables

To read from HBase, you need the hbase-spark connector.

# Add the following dependency when initializing SparkSession
# .config("spark.jars.packages", "org.apache.hbase.connectors.spark:hbase-spark:1.0.0")

from pyspark.sql import SparkSession
from pyspark.sql import DataFrame

# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from HBase") \
.getOrCreate()

# Define HBase catalog
catalog = ''.join("""{
"table":{"namespace":"default", "name":"tablename"},
"rowkey":"key",
"columns":{
"col0":{"cf":"rowkey", "col":"key", "type":"string"},
"col1":{"cf":"cf1", "col":"col1", "type":"string"},
"col2":{"cf":"cf2", "col":"col2", "type":"string"}
}
}""".split())

df_hbase = spark.read \
.options(catalog=catalog) \
.format("org.apache.spark.sql.execution.datasources.hbase") \
.load()

df_hbase.show()

12. Creating DataFrames from JSON Files

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from JSON") \
.getOrCreate()

# Read JSON file
df_json = spark.read.json("path/to/file.json")
df_json.show()

13. Creating DataFrames from Parquet Files

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from Parquet") \
.getOrCreate()

# Read Parquet file
df_parquet = spark.read.parquet("path/to/file.parquet")
df_parquet.show()

PySpark Operations on DataFrames

PySpark DataFrame operations can be broadly classified into two categories: Transformations and Actions.

Transformations

Transformations are operations that are applied on a DataFrame to produce a new DataFrame. Transformations are lazy, meaning they are not executed immediately. Instead, they are recorded as a lineage of transformations to be applied when an action is called.

Here are some common transformations:

select()

  • Description: Selects a subset of columns.
  • Syntax: df.select("col1", "col2", ...)
  • Example:

                      df.select("name", "age").show()

filter() or where()

  • Description: Filters rows based on a condition.
  • Syntax: df.filter(df["age"] > 30)
  • Example:df.filter(df[“age”] > 30).show()

 

groupBy()

  • Description: Groups rows by specified columns.
  • Syntax: df.groupBy("department").agg({"salary": "mean"})
  • Example: df.groupBy("department").mean("salary").show()

orderBy() or sort()

  • Description: Sorts rows by specified columns.
  • Syntax: df.orderBy(df["age"].desc())
  • Example:df.orderBy(df["age"].desc()).show()

withColumn()

  • Description: Adds or replaces a column.
  • Syntax: df.withColumn("new_col", df["existing_col"] + 1)
  • Example:df.withColumn("new_age", df["age"] + 1).show()

drop()

  • Description: Drops specified columns.
  • Syntax: df.drop("col1", "col2")Example:

df.drop("age").show()

distinct()

  • Description: Returns distinct rows.
  • Syntax: df.distinct()
  • Example:df.distinct().show()

join()

  • Description: Joins two DataFrames.
  • Syntax: df1.join(df2, df1["key"] == df2["key"], "inner")
  • Example:df1.join(df2, df1["id"] == df2["id"], "inner").show()

union()

  • Description: Unions two DataFrames.
  • Syntax: df1.union(df2)
  • Example:df1.union(df2).show()

sample()

  • Description: Returns a sampled subset of rows.
  • Syntax: df.sample(withReplacement=False, fraction=0.1)
  • Example:df.sample(withReplacement=False, fraction=0.1).show()

Actions

Actions trigger the execution of the transformations recorded in the DAG and return results.

Here are some common actions:

show()

  • Description: Displays the top rows of the DataFrame.
  • Syntax: df.show(n)
  • Example:df.show(5)

collect()

  • Description: Returns all rows as a list of Row objects.
  • Syntax: df.collect()
  • Example:rows = df.collect() for row in rows: print(row)

count()

  • Description: Returns the number of rows in the DataFrame.
  • Syntax: df.count()
  • Example:print(df.count())

first()

  • Description: Returns the first row.
  • Syntax: df.first()
  • Example:print(df.first())

take()

  • Description: Returns the first n rows.
  • Syntax: df.take(n)
  • Example:print(df.take(5))

describe()

  • Description: Computes basic statistics.Syntax: df.describe()Example:df.describe().show()

cache()

  • Description: Caches the DataFrame in memory.Syntax: df.cache()Example:

df.cache()

persist()

  • Description: Persists the DataFrame with specified storage level.
  • Syntax: df.persist(storageLevel)
  • Example:from pyspark import StorageLevel df.persist(StorageLevel.MEMORY_AND_DISK)

write()

  • Description: Writes the DataFrame to a specified format.
  • Syntax: df.write.format("parquet").save("path")
  • Example:df.write.format(“parquet”).save(“path/to/parquet”)

foreach()

  • Description: Applies a function to each row.
  • Syntax: df.foreach(f)
  • Example:def print_row(row): print(row) df.foreach(print_row)

Example Project: Comprehensive ETL Pipeline in PySpark

Let’s create a comprehensive example project that demonstrates these operations.

  1. Setup Spark Session
  2. Load Data from CSV, Hive, Oracle, and create DataFrames
  3. Apply Transformations
  4. Perform Actions
  5. Persist DataFrames
  6. Convert to Pandas for further analysis and visualization

1. Setup Spark Session

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
.appName("ETL Pipeline Example") \
.enableHiveSupport() \
.getOrCreate()

2. Load Data

From CSV
df_csv = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
From Hive
df_hive = spark.sql("SELECT * FROM database.table_name")
From Oracle
jdbc_url = "jdbc:oracle:thin:@hostname:port:SID"
connection_properties = {
"user": "username",
"password": "password",
"driver": "oracle.jdbc.driver.OracleDriver"
}

df_oracle = spark.read.jdbc(jdbc_url, "schema.table_name", properties=connection_properties)

3. Apply Transformations

# Select columns
df_selected = df_csv.select("name", "age", "salary")

# Filter rows
df_filtered = df_selected.filter(df_selected["age"] > 30)

# Group by and aggregate
df_grouped = df_filtered.groupBy("department").mean("salary")

# Join with another DataFrame
df_joined = df_grouped.join(df_hive, df_grouped["department"] == df_hive["dept_id"])

# Add new column
df_with_new_col = df_joined.withColumn("new_col", df_joined["salary"] + 1000)

# Drop column
df_final = df_with_new_col.drop("dept_id")

4. Perform Actions

# Show DataFrame
df_final.show()

# Collect DataFrame
rows = df_final.collect()
for row in rows:
print(row)

# Count rows
print(df_final.count())

# Describe DataFrame
df_final.describe().show()

5. Persist DataFrames

# Cache DataFrame
df_final.cache()

# Persist DataFrame
from pyspark import StorageLevel
df_final.persist(StorageLevel.MEMORY_AND_DISK)

6. Convert to Pandas for Further Analysis and Visualization

import pandas as pd

# Convert to Pandas DataFrame
pdf = df_final.toPandas()

# Analyze with Pandas
print(pdf.describe())

# Visualize with Matplotlib
import matplotlib.pyplot as plt

pdf['salary'].hist()
plt.show()

you can use SQL expressions inside DataFrame operations, including the filter or where functions, in PySpark. This is done using the expr function from the pyspark.sql.functions module, which allows you to pass SQL expressions as strings.

Example Usage

1.how to use SQL expressions inside the filter or where functions?

Here is an example demonstrating how to use SQL expressions inside the filter or where functions:

from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

# Initialize Spark session
spark = SparkSession.builder \
.appName("SQL Expression Example") \
.getOrCreate()

# Sample DataFrame
data = [
(1, "Alice", 25, 2000),
(2, "Bob", 30, 3000),
(3, "Charlie", 35, 4000)
]

columns = ["id", "name", "age", "salary"]
df = spark.createDataFrame(data, columns)

# Using SQL expression in filter
filtered_df = df.filter(expr("age > 30"))

filtered_df.show()

Detailed Example

Let’s go through a more detailed example with multiple conditions and transformations using SQL expressions.

from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

# Initialize Spark session
spark = SparkSession.builder \
.appName("SQL Expression Example") \
.getOrCreate()

# Sample DataFrame
data = [
(1, "Alice", 25, 2000),
(2, "Bob", 30, 3000),
(3, "Charlie", 35, 4000),
(4, "David", 40, 5000)
]

columns = ["id", "name", "age", "salary"]
df = spark.createDataFrame(data, columns)

# Filter using SQL expression
filtered_df = df.filter(expr("age > 30 AND salary > 3000"))

# Select specific columns using SQL expression
selected_df = filtered_df.selectExpr("id", "name", "salary * 1.1 AS adjusted_salary")

# Adding a new column using SQL expression
with_new_column_df = selected_df.withColumn("salary_category", expr("CASE WHEN adjusted_salary > 4000 THEN 'High' ELSE 'Medium' END"))

with_new_column_df.show()

Output

+---+-------+---------------+---------------+
| id| name|adjusted_salary|salary_category|
+---+-------+---------------+---------------+
| 3|Charlie| 4400.0| High|
| 4| David| 5500.0| High|
+---+-------+---------------+---------------+

2. select function syntax variations and examples:

The select function in PySpark is used to select a subset of columns or expressions from a DataFrame. It can be used in several ways to achieve different results.

Basic Syntax

The basic syntax for the select function is:

DataFrame.select(*cols)
Here, *cols can be a list of column names, Column objects, or expressions.

Selecting Specific Columns

df.select('column1', 'column2').show()
This will select and display the ‘column1’ and ‘column2’ columns.

Selecting Columns with Expressions

from pyspark.sql.functions import col, expr
df.select(col('column1'), expr('column2 + 1').alias('column2_plus_one')).show()

This will select ‘column1’ and an expression ‘column2 + 1’ as ‘column2_plus_one’.

Using Column Objects

Example:

from pyspark.sql.functions import col
df.select(col('column1'), col('column2')).show()
This will select 'column1' and 'column2' using Column objects.

Renaming Columns

df.select(col('column1').alias('new_column1')).show()
This will select ‘column1’ and rename it to ‘new_column1’.

Selecting All Columns

df.select('*').show()
This will select all columns in the DataFrame.

Using SQL Expressions

df.selectExpr('column1', 'column2 + 1 as column2_plus_one').show()

This allows you to use SQL expressions directly.

 

3.PySpark DataFrame groupBy Operation

The groupBy operation in PySpark is used to group data based on one or more columns. Once grouped, you can apply various aggregation functions like count, sum, avg, max, min, etc.

Syntax

DataFrame.groupBy(*cols)

Here, *cols can be one or more column names or expressions.

Common Aggregation Functions

  • count()
  • sum(column)
  • avg(column)
  • max(column)
  • min(column)
  • agg(*exprs)

Examples

Import Required Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, max, min, count
Create Spark Session
spark = SparkSession.builder \
.appName("GroupBy Examples") \
.getOrCreate()
Sample DataFrame
data = [
("Alice", "Sales", 3000),
("Bob", "Sales", 4000),
("Alice", "Sales", 4000),
("Catherine", "HR", 4000),
("David", "HR", 5000),
("Eve", "IT", 3000)
]
columns = ["Name", "Department", "Salary"]

df = spark.createDataFrame(data, schema=columns)
df.show()
1. Group By Single Column and Count
df.groupBy("Department").count().show()

Output:

+----------+-----+
|Department|count|
+----------+-----+
| HR| 2|
| IT| 1|
| Sales| 3|
+----------+-----+
2. Group By Multiple Columns and Count
df.groupBy("Department", "Name").count().show()

Output:

+----------+-----+-----+
|Department| Name|count|
+----------+-----+-----+
| Sales|Alice| 2|
| HR|David| 1|
| HR|Catherine| 1|
| IT| Eve| 1|
| Sales| Bob| 1|
+----------+-----+-----+
3. Group By with Sum Aggregation
df.groupBy("Department").sum("Salary").show()

Output:

+----------+-----------+
|Department|sum(Salary)|
+----------+-----------+
| HR| 9000 |
| IT| 3000 |
| Sales| 11000 |
+----------+-----------+
4. Group By with Multiple Aggregations
df.groupBy("Department").agg(
count("*").alias("Total_Count"),
sum("Salary").alias("Total_Salary"),
avg("Salary").alias("Average_Salary"),
max("Salary").alias("Max_Salary"),
min("Salary").alias("Min_Salary")
).show()

Output:

+----------+-----------+-----------+--------------+-----------+-----------+
|Department|Total_Count|Total_Salary|Average_Salary|Max_Salary|Min_Salary|
+----------+-----------+-----------+--------------+-----------+-----------+
| HR| 2| 9000| 4500.0| 5000| 4000|
| IT| 1| 3000| 3000.0| 3000| 3000|
| Sales| 3| 11000| 3666.67| 4000| 3000|
+----------+-----------+-----------+--------------+-----------+-----------+
5. Using agg Method with Expressions
from pyspark.sql.functions import expr

df.groupBy("Department").agg(
expr("count(*) as Total_Count"),
expr("sum(Salary) as Total_Salary"),
expr("avg(Salary) as Average_Salary"),
expr("max(Salary) as Max_Salary"),
expr("min(Salary) as Min_Salary")
).show()

Output is the same as above.

6. Group By with Having Clause (Filtering Groups)
df.groupBy("Department").agg(
sum("Salary").alias("Total_Salary")
).filter(col("Total_Salary") > 5000).show()

Output:

+----------+-----------+
|Department|Total_Salary|
+----------+-----------+
| HR| 9000|
| Sales| 11000|
+----------+-----------+

3.PySpark orderBy() and sort() Operations

In PySpark, both orderBy() and sort() are used to sort the rows of a DataFrame. They can be used interchangeably, as they provide the same functionality.

Syntax

pythonCopy codeDataFrame.orderBy(*cols, ascending=True)
DataFrame.sort(*cols, ascending=True)
  • cols: List of column names or expressions to sort by.
  • ascending: Boolean or list of booleans. If a single boolean is provided, it applies to all columns. If a list is provided, it specifies the sort order for each corresponding column.

 

data = [
("Alice", 34, "HR", 3000),
("Bob", 45, "IT", 4000),
("Catherine", 29, "HR", 5000),
("David", 36, "IT", 2500),
("Eve", 28, "Sales", 2800)
]
columns = ["Name", "Age", "Department", "Salary"]

df = spark.createDataFrame(data, schema=columns)
df.show()
1. Order By Single Column Ascending
df.orderBy("Age").show()

Equivalent:

df.sort("Age").show()

Output:

+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Eve| 28| Sales| 2800|
|Catherine| 29| HR| 5000|
| Alice| 34| HR| 3000|
| David| 36| IT| 2500|
| Bob| 45| IT| 4000|
+---------+---+----------+------+
2. Order By Single Column Descending
df.orderBy(col("Age").desc()).show()

Equivalent:

df.sort(col("Age").desc()).show()

Output:

+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Bob| 45| IT| 4000|
| David| 36| IT| 2500|
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 5000|
| Eve| 28| Sales| 2800|
+---------+---+----------+------+
3. Order By Multiple Columns
df.orderBy("Department", "Age").show()

Equivalent:

df.sort("Department", "Age").show()

Output:

+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 5000|
| Bob| 45| IT| 4000|
| David| 36| IT| 2500|
| Eve| 28| Sales| 2800|
+---------+---+----------+------+
4. Order By Multiple Columns with Different Sort Orders
df.orderBy(["Department", "Age"], ascending=[True, False]).show()

Equivalent:

df.sort(["Department", "Age"], ascending=[True, False]).show()

Output:

+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
|Catherine| 29| HR| 5000|
| Alice| 34| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 2500|
| Eve| 28| Sales| 2800|
+---------+---+----------+------+

Useful Examples

Sorting by Salary in Descending Order
df.orderBy(col("Salary").desc()).show()

Output:

+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
|Catherine| 29| HR| 5000|
| Bob| 45| IT| 4000|
| Alice| 34| HR| 3000|
| Eve| 28| Sales| 2800|
| David| 36| IT| 2500|
+---------+---+----------+------+
Sorting by Department and then by Salary within each Department
df.orderBy("Department", col("Salary").desc()).show()

Output:

+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
|Catherine| 29| HR| 5000|
| Alice| 34| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 2500|
| Eve| 28| Sales| 2800|
+---------+---+----------+------+
Sorting with Expression
df.orderBy(expr("Salary + Age").desc()).show()

Output:

+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Bob| 45| IT| 4000|
|Catherine| 29| HR| 5000|
| Alice| 34| HR| 3000|
| David| 36| IT| 2500|
| Eve| 28| Sales| 2800|
+---------+---+----------+------+

4.PySpark Join Operations

In PySpark, the join operation is used to combine two DataFrames based on a condition. The join operation can be performed in several ways, such as inner join, left join, right join, outer join, semi join, and anti join.

Syntax

DataFrame.join(other, on=None, how=None)
  • other: The DataFrame to join with.
  • on: The column(s) to join on. This can be a string representing a single column name, a list of column names, or a condition.
  • how: The type of join to perform. Default is inner.

Join Types

  • inner: Inner join.
  • left or left_outer: Left outer join.
  • right or right_outer: Right outer join.
  • outer or full or full_outer: Full outer join.
  • left_semi: Left semi join.
  • left_anti: Left anti join.
https://www.hintstoday.com/spark-sql-join-types-syntax-examples-comparision

Examples

data1 = [
("Alice", 34, "HR"),
("Bob", 45, "IT"),
("Catherine", 29, "HR"),
("David", 36, "IT")
]
columns1 = ["Name", "Age", "Department"]

data2 = [
("HR", 3000),
("IT", 4000),
("Sales", 2800)
]
columns2 = ["Department", "Salary"]

df1 = spark.createDataFrame(data1, schema=columns1)
df2 = spark.createDataFrame(data2, schema=columns2)

df1.show()
df2.show()
1. Inner Join
inner_join_df = df1.join(df2, on="Department", how="inner")
inner_join_df.show()

Output:

+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 4000|
+---------+---+----------+------+
2. Left Outer Join
left_join_df = df1.join(df2, on="Department", how="left")
left_join_df.show()

Output:

+---------+---+----------+------+
| Name|Age|Department|Salary|
+---------+---+----------+------+
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 4000|
+---------+---+----------+------+
3. Right Outer Join
right_join_df = df1.join(df2, on="Department", how="right")
right_join_df.show()

Output:

+---------+----+----------+------+
| Name| Age|Department|Salary|
+---------+----+----------+------+
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 4000|
| null|null| Sales| 2800|
+---------+----+----------+------+
4. Full Outer Join
outer_join_df = df1.join(df2, on="Department", how="outer")
outer_join_df.show()

Output:

+---------+----+----------+------+
| Name| Age|Department|Salary|
+---------+----+----------+------+
| Alice| 34| HR| 3000|
|Catherine| 29| HR| 3000|
| Bob| 45| IT| 4000|
| David| 36| IT| 4000|
| null|null| Sales| 2800|
+---------+----+----------+------+
5. Left Semi Join
semi_join_df = df1.join(df2, on="Department", how="left_semi")
semi_join_df.show()

Output:

+---------+---+----------+
| Name|Age|Department|
+---------+---+----------+
| Alice| 34| HR|
|Catherine| 29| HR|
| Bob| 45| IT|
| David| 36| IT|
+---------+---+----------+
6. Left Anti Join
anti_join_df = df1.join(df2, on="Department", how="left_anti")
anti_join_df.show()

Output:

+----+---+----------+
|Name|Age|Department|
+----+---+----------+
+----+---+----------+
7. Join on Multiple Columns
data3 = [
("Alice", 34, "HR", 3000),
("Bob", 45, "IT", 4000),
("Catherine", 29, "HR", 5000),
("David", 36, "IT", 2500),
("Eve", 28, "Sales", 2800)
]
columns3 = ["Name", "Age", "Department", "Salary"]

data4 = [
("HR", 34, 3000, "Manager"),
("IT", 45, 4000, "Developer"),
("Sales", 28, 2800, "Salesman")
]
columns4 = ["Department", "Age", "Salary", "Role"]

df3 = spark.createDataFrame(data3, schema=columns3)
df4 = spark.createDataFrame(data4, schema=columns4)

multi_col_join_df = df3.join(df4, on=["Department", "Age", "Salary"], how="inner")
multi_col_join_df.show()

Output:

+-----+---+----------+------+--------+
| Name|Age|Department|Salary| Role|
+-----+---+----------+------+--------+
|Alice| 34| HR| 3000| Manager|
| Bob| 45| IT| 4000|Developer|
+-----+---+----------+------+--------+

Discover more from HintsToday

Subscribe to get the latest posts sent to your email.