Pyspark Dataframe programming – operations, functions, all statements, syntax with Examples

by lochan2014 | Jul 2, 2024 | Pyspark | 0 comments

Creating DataFrames in PySpark

Creating DataFrames in PySpark is essential for processing large-scale data efficiently. PySpark allows DataFrames to be created from various sources, ranging from manual data entry to structured storage systems. Below are different ways to create PySpark DataFrames, along with interesting examples.


1. Creating DataFrames from List of Tuples (Manual Entry)

This is one of the simplest ways to create a PySpark DataFrame manually.

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("CreateDataFrame").getOrCreate()

# List of tuples
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 35)]

# Define column names
columns = ["ID", "Name", "Age"]

# Create DataFrame
df = spark.createDataFrame(data, columns)
df.show()

Use Case: Best for small, manually defined datasets.


2. Creating DataFrames from CSV Files

PySpark can load structured data from CSV files efficiently.

df = spark.read.csv("people.csv", header=True, inferSchema=True)
df.show()

Use Case: Useful when working with structured tabular data stored in CSV format.


3. Creating DataFrames from JSON Files

JSON files are widely used for semi-structured data.

df = spark.read.json("data.json")
df.show()

Use Case: Best for APIs, logs, or nested data.


4. Creating DataFrames from Parquet Files

Parquet is a columnar storage format optimized for big data processing.

df = spark.read.parquet("data.parquet")
df.show()

Use Case: Recommended for fast data processing in Spark.


5. Creating DataFrames from Databases (JDBC Connection)

Connecting to external databases is common for real-world ETL tasks.

df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/testdb") \
    .option("dbtable", "users") \
    .option("user", "root") \
    .option("password", "password") \
    .load()
df.show()

Use Case: Best for integrating with external SQL databases.


6. Creating DataFrames from RDDs

Sometimes, raw RDDs need to be converted into DataFrames.

from pyspark.sql import Row

rdd = spark.sparkContext.parallelize([
    Row(ID=1, Name="Alice", Age=25),
    Row(ID=2, Name="Bob", Age=30),
    Row(ID=3, Name="Charlie", Age=35)
])

df = spark.createDataFrame(rdd)
df.show()

Use Case: Useful for transitioning from RDD-based transformations to DataFrames.


7. Creating DataFrames from Pandas DataFrames

Converting a Pandas DataFrame to PySpark is helpful when scaling operations.

import pandas as pd

# Create Pandas DataFrame
pdf = pd.DataFrame({"ID": [1, 2, 3], "Name": ["Alice", "Bob", "Charlie"], "Age": [25, 30, 35]})

# Convert to PySpark DataFrame
df = spark.createDataFrame(pdf)
df.show()

Use Case: Best for transitioning from local Pandas to distributed PySpark.


8. Creating DataFrames from API Response (Using JSON Parsing)

For web scraping or API data processing, JSON responses can be converted into DataFrames.

import requests
import json

response = requests.get("https://api.example.com/users")
data = json.loads(response.text)

df = spark.createDataFrame(data)
df.show()

Use Case: Useful for processing real-time API data.


9. Creating DataFrames from XML Data

Spark supports XML parsing through third-party libraries.

from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField("ID", StringType(), True),
    StructField("Name", StringType(), True),
    StructField("Age", StringType(), True)
])

df = spark.read.format("com.databricks.spark.xml") \
    .option("rowTag", "person") \
    .schema(schema) \
    .load("people.xml")
df.show()

Use Case: Useful for handling structured XML-based datasets.


10. Creating DataFrames Using Range for Auto-Generated Data

If you need a sequence of numbers, range() can quickly create a DataFrame.

df = spark.range(1, 6).toDF("ID")
df.show()

Use Case: Useful for generating test sequences or dummy IDs.


Summary of Methods

MethodUse Case
List of TuplesSimple and widely used
CSV FilesBest for tabular structured data
JSON FilesIdeal for nested and semi-structured data
Parquet FilesBest for big data performance
JDBC DatabasesUseful for ETL and database integration
RDD ConversionTransitioning from RDDs to DataFrames
Pandas ConversionBest for scaling Pandas workloads
API Response (JSON)Real-time API data processing
XML ParsingHandling structured XML data
Auto-Generated RangeGenerating test data quickly

Creating Dummy DataFrames in PySpark

Creating dummy DataFrames in PySpark is useful for testing, prototyping, and learning. PySpark provides multiple ways to create DataFrames manually, each suited to different scenarios. Below are various methods to create dummy DataFrames with examples.


1. Using List of Tuples (Most Common Method)

This is one of the most common ways to create a PySpark DataFrame.

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("DummyDataFrame").getOrCreate()

# List of tuples
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 35)]

# Define column names
columns = ["ID", "Name", "Age"]

# Create DataFrame
df = spark.createDataFrame(data, columns)
df.show()

Use Case: Best for small, manually defined datasets.


2. Using List of Lists

This method is similar to the list of tuples but uses lists instead.

data = [[1, "Alice", 25], [2, "Bob", 30], [3, "Charlie", 35]]
df = spark.createDataFrame(data, columns)
df.show()

Use Case: When working with mutable lists instead of immutable tuples.


3. Using Dictionary with Row Objects

Using Row objects allows for named attributes, making it easy to access values.

from pyspark.sql import Row

data = [Row(ID=1, Name="Alice", Age=25),
        Row(ID=2, Name="Bob", Age=30),
        Row(ID=3, Name="Charlie", Age=35)]

df = spark.createDataFrame(data)
df.show()

Use Case: When you need named fields and structured data representation.


4. Using Dictionary with Explicit Schema

When you want stricter control over column types, defining a schema is a good approach.

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Define schema
schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True)
])

# Create DataFrame from a list of dictionaries
data = [{"ID": 1, "Name": "Alice", "Age": 25},
        {"ID": 2, "Name": "Bob", "Age": 30},
        {"ID": 3, "Name": "Charlie", "Age": 35}]

df = spark.createDataFrame(data, schema=schema)
df.show()

Use Case: Ensures correct data types and improves performance.


5. Using RDD with Row Objects

If you are working with distributed data, creating an RDD first can be beneficial.

rdd = spark.sparkContext.parallelize([
    Row(ID=1, Name="Alice", Age=25),
    Row(ID=2, Name="Bob", Age=30),
    Row(ID=3, Name="Charlie", Age=35)
])

df = spark.createDataFrame(rdd)
df.show()

Use Case: Best when working with large distributed datasets.


6. Using Pandas DataFrame Conversion

If you already have a Pandas DataFrame, you can convert it to a PySpark DataFrame.

import pandas as pd

# Create Pandas DataFrame
pdf = pd.DataFrame({"ID": [1, 2, 3], "Name": ["Alice", "Bob", "Charlie"], "Age": [25, 30, 35]})

# Convert to PySpark DataFrame
df = spark.createDataFrame(pdf)
df.show()

Use Case: When transitioning from Pandas to PySpark.


7. Using range() for Auto-Generated Data

If you need a sequence of numbers, range() can quickly create a DataFrame.

df = spark.range(1, 6).toDF("ID")
df.show()

Use Case: When you need an auto-incrementing column.


Summary of Methods

MethodUse Case
List of TuplesSimple and widely used
List of ListsSimilar to tuples but mutable
Dictionary with RowAllows named attributes
Dictionary with SchemaEnsures correct data types
RDD with RowWorks well for distributed data
Pandas ConversionBest for small datasets
range() for Auto-Generated DataWhen you need incremental values

from pyspark.sql import  SparkSession
from pyspark.sql.functions import col,when,count
spark=SparkSession.builder.appName("MySparK Learning1").getOrCreate()
import random
import string
random_numbers=[random.randint(1,100) for _ in range(100)]
domain_names=['gmail.com','xyz.com','hotmail.com','wikipedia.org','hintstoday.com']
def generate_random_username(length=8):
  #characters = string.ascii_lowercase + string.digits  # Lowercase letters and digits
  letter_and_digits=string.ascii_letters+string.digits
  username=''.join(random.choice(letter_and_digits) for _ in range(length))
  return username
def get_username():
  username=''.join(random.choice(string.ascii_lowercase+string.digits) for _ in range(10))
  return username
def get_first_name():
  length=random.randint(5,10)
  first_name=''.join(random.choice(string.ascii_lowercase) for _ in range(length))
  return first_name


email_adresses=[]
for n in range(5):
  user_name=generate_random_username()
  chosen_names=random.choice(domain_names)
  email_add=f"{user_name}@{chosen_names}"
  email_adresses.append(email_add)

print(email_adresses)

data=[(i+1,(datetime.now()).strftime('%y-%m-%d'), random.randint(1000,100000),get_username() + "@" + random.choice(domain_names), get_first_name()+ ' '+ get_first_name()) for i in range(100)]
columns=['Sl_No',"Date_Val","Ran_id","email","Name"]
df=spark.createDataFrame(data,columns)
df.show()

data=((1,),(2,),(3,))
columns=['No']
df=spark.createDataFrame(data,columns)
df.show()
df1=[(1,)]
col=['r1']
df2=spark.createDataFrame(df1,col)
df2.show()

from pyspark.sql.functions import upper
str(df.columns).split()
''.join(str(df.columns).split())
' '.join(df.columns)
df.toDF(*[column.replace('_','X')+'_v1' for column in df.columns])
df1=df.select('b').filter(col('b')%2==0 )
df2=df.join(df1.select('b').filter(col('b')%2==0 ), 'b')
df2.show()




from pyspark.sql import Row
rdd = spark.sparkContext.parallelize([
    Row(ID=1, Name="Alice", Age=25),
    Row(ID=2, Name="Bob", Age=30),
    Row(ID=3, Name="Charlie", Age=35)
])

df = spark.createDataFrame(rdd)
df.show()


import random
import string

username = ''.join(list(random.choice(string.ascii_lowercase + string.digits) for _ in range(10)))

def get_address():
  Flat_Name=''.join([random.choice(string.ascii_letters + string.digits) for _ in range(5)])
  username = ''.join(list(random.choice(string.ascii_lowercase + string.digits) for _ in range(10)))
  street_name=''.join(random.choice(string.ascii_letters + string.digits) for _ in range(5))
  City=''.join(random.choice(string.ascii_letters + string.digits) for _ in range(7))
  Area_Name=''.join(random.choice(string.ascii_letters + string.digits) for _ in range(5))
  pincode=random.randint(800000,900000)
  Country='India'
  return f"{username} {Flat_Name} , {street_name} , {City} , {pincode} , {Country}"

print(get_address())

from pyspark.sql import Row
rdd=spark.sparkContext.parallelize(Row(Sl_No=i+1,nickname=''.join(list(random.choice(string.ascii_lowercase + string.digits) for _ in range(10))) , Address=get_address()) for i in range(100))
df_data= spark.createDataFrame(rdd)
df_data.show()

RDD (Resilient Distributed Dataset) is the fundamental data structure in Apache Spark. It provides an abstraction for distributed data and allows parallel processing. Below is an overview of RDD-based programming in PySpark.


RDD-Based Programming in PySpark

1. What is an RDD?

An RDD (Resilient Distributed Dataset) is an immutable, distributed collection of objects that can be processed in parallel across a Spark cluster. It supports fault tolerance, lazy evaluation, and partitioning for optimized processing.

Key Features of RDDs:

  • Immutable: Once created, RDDs cannot be changed.
  • Distributed: RDDs are stored across multiple nodes in a cluster.
  • Fault-Tolerant: Spark can automatically recover lost data.
  • Lazy Evaluation: Transformations are not executed immediately but only when an action is triggered.
  • Partitioned: Data is divided across nodes for parallel execution.

2. Creating RDDs in PySpark

There are two primary ways to create RDDs:

A. Creating RDDs from a List

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("RDDExample").getOrCreate()
sc = spark.sparkContext  # SparkContext

# Creating RDD from List
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

# Show RDD Elements
print(rdd.collect())

Use Case: Useful for quick testing with small datasets.


B. Creating RDDs from an External File

rdd = sc.textFile("sample.txt")  # Reads a text file
print(rdd.collect())

Use Case: When loading data from external storage such as HDFS, S3, or local files.


3. RDD Transformations

Transformations return new RDDs without modifying the existing ones. They follow lazy evaluation.

Common Transformations:

A. map()

Applies a function to each element.

rdd_squared = rdd.map(lambda x: x * x)
print(rdd_squared.collect())  # Example output: [1, 4, 9, 16, 25]

B. filter()

Filters elements based on a condition.

rdd_even = rdd.filter(lambda x: x % 2 == 0)
print(rdd_even.collect())  # Example output: [2, 4]

C. flatMap()

Flattens nested structures.

rdd_words = sc.parallelize(["hello world", "hi there"])
rdd_split = rdd_words.flatMap(lambda line: line.split(" "))
print(rdd_split.collect())  # Example output: ['hello', 'world', 'hi', 'there']

D. distinct()

Removes duplicate elements.

rdd_duplicate = sc.parallelize([1, 2, 2, 3, 4, 4, 5])
rdd_distinct = rdd_duplicate.distinct()
print(rdd_distinct.collect())  # Example output: [1, 2, 3, 4, 5]

E. union()

Combines two RDDs.

rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([4, 5, 6])
rdd_union = rdd1.union(rdd2)
print(rdd_union.collect())  # Example output: [1, 2, 3, 4, 5, 6]

4. RDD Actions

Actions trigger execution and return results.

Common Actions:

A. collect()

Retrieves all elements from the RDD.

print(rdd.collect())

B. count()

Counts the number of elements.

print(rdd.count())

C. reduce()

Aggregates elements using a function.

sum_rdd = rdd.reduce(lambda x, y: x + y)
print(sum_rdd)  # Example output: 15

D. take(n)

Returns the first n elements.

print(rdd.take(3))  # Example output: [1, 2, 3]

E. first()

Returns the first element.

print(rdd.first())  # Example output: 1

5. RDD Persistence (Caching)

RDDs can be cached to optimize performance when reused.

Using cache()

rdd_cached = rdd.cache()
rdd_cached.count()  # Forces caching

Using persist()

from pyspark import StorageLevel

rdd_persist = rdd.persist(StorageLevel.MEMORY_AND_DISK)

Use Case: Useful when an RDD is used multiple times to avoid recomputation.


6. Key-Value RDD Operations (Pair RDDs)

Pair RDDs store data in (key, value) format, enabling grouping, sorting, and aggregations.

A. Creating a Pair RDD

pair_rdd = sc.parallelize([("Alice", 25), ("Bob", 30), ("Alice", 28)])

B. reduceByKey()

Aggregates values by key.

rdd_age_sum = pair_rdd.reduceByKey(lambda x, y: x + y)
print(rdd_age_sum.collect())  # Example output: [('Alice', 53), ('Bob', 30)]

C. groupByKey()

Groups values by key.

rdd_group = pair_rdd.groupByKey()
print([(k, list(v)) for k, v in rdd_group.collect()])
# Example output: [('Alice', [25, 28]), ('Bob', [30])]

D. sortByKey()

Sorts data based on keys.

rdd_sorted = pair_rdd.sortByKey()
print(rdd_sorted.collect())  # Example output: [('Alice', 25), ('Alice', 28), ('Bob', 30)]

7. Converting RDDs to DataFrames

PySpark supports easy conversion from RDDs to DataFrames.

from pyspark.sql import Row

# Convert RDD to DataFrame
df = rdd.map(lambda x: Row(number=x)).toDF()
df.show()

Use Case: When transitioning from RDDs to the more optimized DataFrame API.


8. When to Use RDDs Over DataFrames

FeatureRDDDataFrame
Ease of UseMore complexSimpler
PerformanceSlowerFaster (Optimized with Catalyst)
SchemaNot enforcedEnforced
OperationsLow-level controlHigh-level SQL-like operations

Use RDDs when:

  • You need fine-grained control over data transformations.
  • The data is unstructured, and schema enforcement is not needed.
  • Custom low-level optimizations are required.

Use DataFrames when:

  • Performance is critical (DataFrames are optimized).
  • SQL-like operations are needed.

Conclusion

RDD-based programming in PySpark provides low-level control and distributed data processing capabilities. While DataFrames are preferred for most workloads due to optimizations, RDDs are still useful for specific scenarios where fine-tuned operations are needed.

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Initialize Spark Session (SparkSession internally manages SparkContext)
spark = SparkSession.builder.appName("RDD_Based_Processing").getOrCreate()

# Access SparkContext from SparkSession
sc = spark.sparkContext  # Still available if needed

# Explanation:
# - RDDs require SparkContext (sc.parallelize()) because they are a low-level API.
# - DataFrames use SparkSession (spark.createDataFrame()), which manages SparkContext internally.

# Step 1: Creating an RDD from a list of tuples
data = [(1, "Alice", 25, "Engineer"), (2, "Bob", 30, "Doctor"), (3, "Charlie", 35, "Teacher")]
rdd = sc.parallelize(data)  # Using SparkContext explicitly

# Step 2: Transforming RDD (Mapping to Rows)
row_rdd = rdd.map(lambda x: Row(ID=x[0], Name=x[1], Age=x[2], Occupation=x[3]))

# Step 3: Converting RDD to DataFrame
df = spark.createDataFrame(row_rdd)  # Using SparkSession (No need for sc)
df.show()

# Step 4: Filtering Data
filtered_rdd = rdd.filter(lambda x: x[2] > 28)  # Keep people older than 28

# Step 5: Applying Transformation (Mapping)
mapped_rdd = filtered_rdd.map(lambda x: (x[0], x[1].upper(), x[2] + 5, x[3]))

# Step 6: Reducing Data (Counting occupations)
occu_rdd = rdd.map(lambda x: (x[3], 1)).reduceByKey(lambda x, y: x + y)

# Step 7: Sorting Data
sorted_rdd = mapped_rdd.sortBy(lambda x: x[2], ascending=False)

# Step 8: Collecting and Printing Data
print("Filtered Data:", filtered_rdd.collect())
print("Mapped Data:", mapped_rdd.collect())
print("Occupation Count:", occu_rdd.collect())
print("Sorted Data:", sorted_rdd.collect())

# Stopping Spark Session
spark.stop()

RDDs in PySpark do not inherently have a schema or column names. Unlike DataFrames, RDDs are just distributed collections of objects (tuples, lists, dictionaries, etc.), and their structure is determined by how they are used.

Example of an RDD Without Schema:

# Creating an RDD from a list
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

# Printing the RDD
print(rdd.collect())  # Output: [1, 2, 3, 4, 5]

Here, the RDD is just a collection of numbers without any predefined schema.


Adding Schema to an RDD (Converting to DataFrame)

If you want to assign column names or a schema, you need to convert the RDD into a DataFrame using Row or StructType.

Example 1: Using Row for Named Columns

from pyspark.sql import Row

# Creating an RDD with structured data
data = [(1, "Alice", 25), (2, "Bob", 30)]
rdd = sc.parallelize(data)

# Converting RDD to DataFrame with column names
df = spark.createDataFrame(rdd.map(lambda x: Row(ID=x[0], Name=x[1], Age=x[2])))

df.show()

Output:

+---+-----+---+
| ID| Name|Age|
+---+-----+---+
|  1|Alice| 25|
|  2|  Bob| 30|
+---+-----+---+

Here, we used Row to provide column names.


Example 2: Using StructType for an Explicit Schema

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True)
])

df = spark.createDataFrame(rdd, schema)
df.show()

This method is preferred when defining explicit schemas.


Summary

  • RDDs do NOT have column names or schemas by default.
  • If you need named columns, convert the RDD to a DataFrame using Row or StructType.
  • RDDs are useful for low-level transformations, while DataFrames provide structured data handling.

Yes, the code you provided fails because PySpark cannot directly convert an RDD of primitive types (like integers) into a DataFrame. PySpark expects a structured format (such as a list of tuples or Row objects) when creating a DataFrame.

data = [1, 2, 3, 4, 5]
rdd = scforrdd.parallelize(data)
df=spark.createDataFrame(rdd)
df.show()
this fails

How to Fix It?

You need to convert the RDD into a structured format by specifying column names or using Row objects.

Solution 1: Convert to a List of Tuples (Recommended)

from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.appName("RDD_to_DF").getOrCreate()
sc = spark.sparkContext  # Get SparkContext

# Create RDD from a list
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize([(x,) for x in data])  # Convert to a list of tuples

# Create DataFrame with a column name
df = spark.createDataFrame(rdd, ["Numbers"])
df.show()

Output:

+-------+
|Numbers|
+-------+
|      1|
|      2|
|      3|
|      4|
|      5|
+-------+

Solution 2: Use Row Objects

from pyspark.sql import Row

rdd = sc.parallelize([Row(Numbers=x) for x in data])
df = spark.createDataFrame(rdd)
df.show()

Both methods ensure the data is structured properly before creating a DataFrame.

Here’s a detailed guide on PySpark DataFrame column and row manipulation with useful implementations:


1. Column Manipulation in PySpark DataFrames

1.1 Renaming Columns

Rename a Single Column

df = df.withColumnRenamed("old_column", "new_column")

Rename Multiple Columns

new_column_names = {"old1": "new1", "old2": "new2"}
for old, new in new_column_names.items():
    df = df.withColumnRenamed(old, new)

Add a Suffix to All Column Names

df = df.toDF(*[col + "_v1" for col in df.columns])

1.2 Checking Data Types

Check Data Type of a Specific Column

print(df.schema["column_name"].dataType)

Get Data Types of All Columns

df.dtypes  # Returns a list of (column_name, data_type)

Check Schema of DataFrame

df.printSchema()

1.3 Apply Dynamic Logic to All Columns

Example: Trim All String Columns

from pyspark.sql.functions import col, trim

df = df.select([trim(col(c)).alias(c) if dtype == "string" else col(c) for c, dtype in df.dtypes])

Example: Convert All Integer Columns to Double

from pyspark.sql.functions import col

df = df.select([col(c).cast("double") if dtype == "int" else col(c) for c, dtype in df.dtypes])

Example: Replace Nulls in All String Columns with “Unknown”

df = df.fillna("Unknown")

2. Row-Based DataFrame Manipulation

2.1 Collecting Rows One by One

Convert DataFrame to a List of Rows

rows = df.collect()
for row in rows:
    print(row)

Using toLocalIterator() for Large DataFrames (Efficient)

for row in df.toLocalIterator():
    print(row)

2.2 Filtering Rows

Filter Rows Based on a Condition

df_filtered = df.filter(df["Age"] > 30)

Filter Multiple Conditions

df_filtered = df.filter((df["Age"] > 30) & (df["Gender"] == "Male"))

2.3 Sorting Rows

df_sorted = df.orderBy("Age", ascending=False)

2.4 Adding a New Row (Using Union)

from pyspark.sql import Row

new_row = Row(ID=100, Name="John Doe", Age=40)
df_new = df.union(spark.createDataFrame([new_row], df.schema))

3. Useful Implementations

3.1 Finding Duplicate Rows

df.groupBy(df.columns).count().filter("count > 1").show()

3.2 Removing Duplicate Rows

df = df.dropDuplicates()

3.3 Adding a New Column Dynamically

from pyspark.sql.functions import lit

df = df.withColumn("NewColumn", lit("DefaultValue"))

Conclusion

  • PySpark allows flexible column manipulations like renaming, checking types, and applying transformations.
  • Row operations like filtering, sorting, and iterating can be done efficiently.
  • Collecting data should be handled carefully to avoid memory overload.
  • Dynamic transformations make it easy to process large datasets.
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import col, trim, lit

# Initialize Spark Session
spark = SparkSession.builder.appName("DataFrame_Manipulation").getOrCreate()

# Sample Data
data = [(1, "Alice", 25, "Engineer"), (2, "Bob", 30, "Doctor"), (3, "Charlie", 35, "Teacher")]
df = spark.createDataFrame(data, ["ID", "Name", "Age", "Occupation"])

# 1. Column Manipulation

# Rename a Single Column
df = df.withColumnRenamed("Occupation", "Job")

# Rename Multiple Columns
column_rename_map = {"ID": "UserID", "Name": "FullName"}
for old, new in column_rename_map.items():
    df = df.withColumnRenamed(old, new)

# Add a Suffix to All Columns
df = df.toDF(*[col + "_v1" for col in df.columns])

# Check Data Types
df.printSchema()

# Apply Dynamic Logic to All Columns: Trim All String Columns
df = df.select([trim(col(c)).alias(c) if dtype == "string" else col(c) for c, dtype in df.dtypes])

# Convert All Integer Columns to Double
df = df.select([col(c).cast("double") if dtype == "int" else col(c) for c, dtype in df.dtypes])

# 2. Row-Based Manipulation

# Collect Rows One by One
for row in df.collect():
    print(row)

# Efficient Row Iteration
for row in df.toLocalIterator():
    print(row)

# Filtering Rows
df_filtered = df.filter((df["Age_v1"] > 28.0) & (df["FullName_v1"] != "Bob"))
df_filtered.show()

# Sorting Rows
df_sorted = df.orderBy("Age_v1", ascending=False)
df_sorted.show()

# Adding a New Row
df_new = df.union(spark.createDataFrame([Row(UserID_v1=4.0, FullName_v1="David", Age_v1=40.0, Job_v1="Scientist")], df.schema))
df_new.show()

# Finding Duplicate Rows
df.groupBy(df.columns).count().filter("count > 1").show()

# Removing Duplicate Rows
df = df.dropDuplicates()
df.show()

# Adding a New Column Dynamically
df = df.withColumn("NewColumn_v1", lit("DefaultValue"))
df.show()

# Stop Spark Session
spark.stop()

Most Important PySpark DataFrame Transformation Operations 🚀

PySpark transformations are lazy operations that create a new DataFrame without modifying the original one. Here are the most essential transformation operations in PySpark:


1. Selecting and Renaming Columns

Select Specific Columns

df_selected = df.select("column1", "column2")

Rename Columns

df_renamed = df.withColumnRenamed("old_col", "new_col")

2. Filtering Data

Filter Based on Condition

df_filtered = df.filter(df["age"] > 25)

Multiple Conditions

df_filtered = df.filter((df["age"] > 25) & (df["gender"] == "Male"))

3. Adding or Modifying Columns

Create a New Column

from pyspark.sql.functions import lit

df_new = df.withColumn("new_column", lit("default_value"))

Modify Existing Column

df_modified = df.withColumn("salary_incremented", df["salary"] * 1.10)

4. Dropping Columns

df_dropped = df.drop("column_to_remove")

5. Handling Missing Data

Fill Missing Values

df_filled = df.fillna({"age": 0, "name": "Unknown"})

Drop Rows with Nulls

df_cleaned = df.dropna()

6. Aggregations & Grouping

Group By and Aggregate

from pyspark.sql.functions import sum, avg, count

df_grouped = df.groupBy("department").agg(sum("salary").alias("total_salary"), avg("age"))

Count Distinct Values

df.select("department").distinct().count()

7. Sorting Data

df_sorted = df.orderBy("age", ascending=False)

8. Joining DataFrames

Inner Join

df_joined = df1.join(df2, df1["id"] == df2["id"], "inner")

Left Join

df_left = df1.join(df2, df1["id"] == df2["id"], "left")

9. Union (Appending DataFrames)

df_combined = df1.union(df2)

10. Exploding Nested Data

from pyspark.sql.functions import explode

df_exploded = df.withColumn("exploded_column", explode(df["nested_column"]))

Conclusion

  • These transformations do not modify the original DataFrame but return a new one.
  • PySpark applies lazy evaluation, meaning transformations are only executed when an action is performed.

Most Useful PySpark DataFrame Functions

PySpark provides many built-in functions for column aliasing, distinct values, transformations, and aggregations. Here’s a collection of the most useful ones:


1. Column Aliasing (alias())

  • Used to rename a column temporarily within a query.
from pyspark.sql.functions import col

df_alias = df.select(col("name").alias("full_name"), col("age"))
df_alias.show()

2. Removing Duplicates (distinct())

  • Removes duplicate rows from the DataFrame.
df_distinct = df.distinct()
df_distinct.show()
  • Count distinct values in a column:
df.select("department").distinct().count()

3. Filtering Data (filter() & where())

  • Using .filter():
df_filtered = df.filter(df["age"] > 25)
  • Using .where() (same as filter but SQL-like syntax):
df_filtered = df.where("age > 25")

4. Column Operations

withColumn() – Create or Modify Columns

from pyspark.sql.functions import lit

df_new = df.withColumn("new_column", lit("default_value"))

cast() – Change Data Type

df_casted = df.withColumn("salary", df["salary"].cast("double"))

5. Aggregations

groupBy() with Aggregations

from pyspark.sql.functions import sum, avg, count

df_grouped = df.groupBy("department").agg(
    sum("salary").alias("total_salary"),
    avg("age").alias("average_age")
)
df_grouped.show()

6. Sorting (orderBy())

df_sorted = df.orderBy("age", ascending=False)
df_sorted.show()

7. Joins

df_joined = df1.join(df2, df1["id"] == df2["id"], "inner")

8. Exploding Nested Data (explode())

from pyspark.sql.functions import explode

df_exploded = df.withColumn("exploded_column", explode(df["nested_column"]))
df_exploded.show()

9. Collecting Rows

rows = df.collect()
for row in rows:
    print(row)

10. Row Numbering & Ranking

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

windowSpec = Window.partitionBy("department").orderBy("salary")

df_ranked = df.withColumn("rank", row_number().over(windowSpec))
df_ranked.show()

Conclusion

  • .alias() is useful for renaming columns temporarily.
  • .distinct() removes duplicates.
  • .filter() and .where() allow conditional selection.
  • .groupBy() and .orderBy() are useful for aggregations and sorting.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, sum, avg, count, explode, row_number
from pyspark.sql.window import Window

# Initialize Spark Session
spark = SparkSession.builder.appName("PySpark_Useful_Functions").getOrCreate()

# Sample Data
data = [(1, "Alice", 25, "HR", 50000),
        (2, "Bob", 30, "IT", 60000),
        (3, "Charlie", 35, "IT", 70000),
        (4, "David", 40, "Finance", 80000),
        (5, "Eve", 45, "Finance", 90000)]
columns = ["ID", "Name", "Age", "Department", "Salary"]

df = spark.createDataFrame(data, columns)

# 1. Alias (Renaming Columns Temporarily)
df_alias = df.select(col("Name").alias("Full_Name"), col("Age"))
df_alias.show()

# 2. Distinct (Remove Duplicates)
df_distinct = df.select("Department").distinct()
df_distinct.show()

# 3. Filtering Data
df_filtered = df.filter((df["Age"] > 30) & (df["Department"] == "IT"))
df_filtered.show()

# 4. Adding & Modifying Columns
df_new = df.withColumn("New_Column", lit("DefaultValue"))
df_casted = df.withColumn("Salary", df["Salary"].cast("double"))
df_new.show()

df_casted.printSchema()

# 5. Aggregations (Sum, Average, Count)
df_grouped = df.groupBy("Department").agg(
    sum("Salary").alias("Total_Salary"),
    avg("Age").alias("Average_Age")
)
df_grouped.show()

# 6. Sorting
df_sorted = df.orderBy("Age", ascending=False)
df_sorted.show()

# 7. Joining DataFrames
extra_data = [(1, "US"), (2, "Canada"), (3, "UK"), (4, "Germany"), (5, "India")]
columns_extra = ["ID", "Country"]
df_extra = spark.createDataFrame(extra_data, columns_extra)
df_joined = df.join(df_extra, "ID", "inner")
df_joined.show()

# 8. Exploding Nested Data
df_nested = df.withColumn("Hobbies", lit("['Reading', 'Sports']"))
df_exploded = df_nested.withColumn("Hobby", explode(lit(["Reading", "Sports"])))
df_exploded.show()

# 9. Collecting Rows
rows = df.collect()
for row in rows:
    print(row)

# 10. Row Numbering & Ranking
windowSpec = Window.partitionBy("Department").orderBy("Salary")
df_ranked = df.withColumn("Rank", row_number().over(windowSpec))
df_ranked.show()

# Stop Spark Session
spark.stop()

Checking Schema, Data Types & Displaying Records with Conditions in PySpark 🚀


1. Checking Schema & Data Types

Method 1: Using .printSchema()

Prints the schema of the DataFrame in a tree format.

df.printSchema()

Example Output:

root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: double (nullable = true)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])
for field in schema.fields:
    print(f"Column: {field.name}, DataType: {field.dataType.simpleString()}")

#df.schema.fields: A list of StructField objects, each representing a column in your DataFrame.
#field.dataType: Retrieves the data type (like StringType(), IntegerType()) for that column.
#simpleString(): Converts the data type object to a simple, readable string (e.g., "string", "int").

Method 2: Using .dtypes

Returns a list of tuples containing column names and their data types.

df.dtypes

'''df.dtypes is an attribute of a PySpark DataFrame that returns a list of tuples, where each tuple contains the column name and its data type as a string.'''

Alternative Using df.schema.fields
You can also get a similar output to df.dtypes using:
[(field.name, field.dataType.simpleString()) for field in df.schema.fields]

Example Output:

[('ID', 'int'), ('Name', 'string'), ('Age', 'int'), ('Department', 'string'), ('Salary', 'double')]

Key Differences Between df.dtypes and df.schema.fields

Featuredf.dtypesdf.schema.fields
Type of OutputList of tuples ([(col_name, dtype)])List of StructField objects
Data Type FormatString representation (e.g., "int", "string")Full DataType object (e.g., IntegerType(), StringType())
Use CaseQuick lookup of column names & typesMore detailed schema operations

Method 3: Using .schema

Returns a StructType object describing the schema.

df.schema

Example Output:

StructType([StructField('ID', IntegerType(), True),
            StructField('Name', StringType(), True),
            StructField('Age', IntegerType(), True),
            StructField('Department', StringType(), True),
            StructField('Salary', DoubleType(), True)])

2. Displaying Records

Method 1: .show()

Displays default 20 rows.

df.show()

Method 2: Show Specific Number of Rows

df.show(5)

Method 3: Show with Truncated Columns

df.show(truncate=False)

3. Filtering & Displaying Records with Multiple Conditions

Using .filter() with Multiple Conditions

df_filtered = df.filter((df["Age"] > 30) & (df["Salary"] > 50000))
df_filtered.show()

Using .where() (Same as .filter())

df_filtered = df.where((df["Age"] > 30) & (df["Department"] == "IT"))
df_filtered.show()

Using SQL Expressions (expr())

from pyspark.sql.functions import expr

df_filtered = df.filter(expr("Age > 30 AND Salary > 50000"))
df_filtered.show()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, sum, avg, count, explode, row_number
from pyspark.sql.window import Window

# Initialize Spark Session
spark = SparkSession.builder.appName("PySpark_Useful_Functions").getOrCreate()

# Sample Data
data = [(1, "Alice", 25, "HR", 50000),
        (2, "Bob", 30, "IT", 60000),
        (3, "Charlie", 35, "IT", 70000),
        (4, "David", 40, "Finance", 80000),
        (5, "Eve", 45, "Finance", 90000)]
columns = ["ID", "Name", "Age", "Department", "Salary"]

df = spark.createDataFrame(data, columns)

# 1. Alias (Renaming Columns Temporarily)
df_alias = df.select(col("Name").alias("Full_Name"), col("Age"))
df_alias.show()

# 2. Distinct (Remove Duplicates)
df_distinct = df.select("Department").distinct()
df_distinct.show()

# 3. Filtering Data
df_filtered = df.filter((df["Age"] > 30) & (df["Department"] == "IT"))
df_filtered.show()

# 4. Adding & Modifying Columns
df_new = df.withColumn("New_Column", lit("DefaultValue"))
df_casted = df.withColumn("Salary", df["Salary"].cast("double"))
df_new.show()

df_casted.printSchema()

# 5. Aggregations (Sum, Average, Count)
df_grouped = df.groupBy("Department").agg(
    sum("Salary").alias("Total_Salary"),
    avg("Age").alias("Average_Age")
)
df_grouped.show()

# 6. Sorting
df_sorted = df.orderBy("Age", ascending=False)
df_sorted.show()

# 7. Joining DataFrames
extra_data = [(1, "US"), (2, "Canada"), (3, "UK"), (4, "Germany"), (5, "India")]
columns_extra = ["ID", "Country"]
df_extra = spark.createDataFrame(extra_data, columns_extra)
df_joined = df.join(df_extra, "ID", "inner")
df_joined.show()

# 8. Exploding Nested Data
df_nested = df.withColumn("Hobbies", lit("['Reading', 'Sports']"))
df_exploded = df_nested.withColumn("Hobby", explode(lit(["Reading", "Sports"])))
df_exploded.show()

# 9. Collecting Rows
rows = df.collect()
for row in rows:
    print(row)

# 10. Row Numbering & Ranking
windowSpec = Window.partitionBy("Department").orderBy("Salary")
df_ranked = df.withColumn("Rank", row_number().over(windowSpec))
df_ranked.show()

# 11. Checking Schema & Data Types
df.printSchema()
print(df.dtypes)
print(df.schema)

# 12. Displaying Data with .show() Options
# Default show (20 rows, truncated at 20 characters)
df.show()

# Show a specific number of rows
df.show(5)

# Prevent truncation (full column display)
df.show(truncate=False)

# Adjust truncation length
df.show(truncate=50)

# Show data in vertical format
df.show(n=5, vertical=True)

# Combine options
df.show(n=10, truncate=40, vertical=True)

# Stop Spark Session
spark.stop()

The .show() method in PySpark has multiple options to customize how the DataFrame is displayed. Here’s a breakdown:


1. Basic Usage

df.show()
  • Displays the first 20 rows by default.
  • Truncates long strings to 20 characters.

2. Show a Specific Number of Rows

df.show(5)  # Show first 5 rows
  • Limits the output to 5 rows.

3. Prevent Truncation (Full Column Display)

df.show(truncate=False)
  • By default, columns are truncated to 20 characters.
  • Setting truncate=False displays the full content of each column.

Example:

data = [(1, "A very long text that should not be truncated"),
        (2, "Short text")]
df = spark.createDataFrame(data, ["ID", "Description"])

df.show(truncate=False)

Output:

+---+----------------------------------------------+
| ID | Description                                |
+---+----------------------------------------------+
| 1  | A very long text that should not be truncated |
| 2  | Short text                                |
+---+----------------------------------------------+

4. Adjust Column Truncation Length

df.show(truncate=50)
  • Instead of truncating at 20 characters, this truncates at 50 characters.

5. Display with Row Indexes (vertical=True)

df.show(n=5, vertical=True)
  • Displays each row vertically, which is useful when there are too many columns.

Example Output:

-RECORD 0------------------------
 ID          | 1
 Name        | Alice
 Age         | 25
 Department  | HR
 Salary      | 50000.0
-RECORD 1------------------------
 ID          | 2
 Name        | Bob
 Age         | 30
 Department  | IT
 Salary      | 60000.0

6. Combine Options

df.show(n=10, truncate=40, vertical=True)
  • Shows 10 rows
  • Truncates column data at 40 characters
  • Displays data in vertical format

Summary of .show() Options

OptionDescriptionExample
df.show()Default (20 rows, truncates long values at 20 chars)df.show()
df.show(n)Displays first n rowsdf.show(5)
df.show(truncate=False)Displays full column values without truncationdf.show(truncate=False)
df.show(truncate=n)Truncates column values at n charactersdf.show(truncate=50)
df.show(vertical=True)Displays rows vertically instead of in tabular formatdf.show(vertical=True)
# Apply Dynamic Logic to All Columns: Trim All String Columns
df = df.select([trim(col(c)).alias(c) if dtype == "string" else col(c) for c, dtype in df.dtypes])

Does python not have trim function inbuilt then why we want to import from pyspark sql?

You’re right to ask about this! In Python, we do have a built-in strip() function for trimming strings. However, when working with PySpark DataFrames, we need to use PySpark functions instead of native Python functions. Here’s why:

1. PySpark Works on Distributed Data

PySpark operates on distributed data across multiple machines using the Spark engine. Standard Python functions like strip() only work on local Python objects, not on distributed DataFrame columns.

2. Need for PySpark SQL Functions

PySpark provides its own set of SQL functions (pyspark.sql.functions), which are optimized for distributed processing. The function trim() from pyspark.sql.functions works on entire DataFrame columns efficiently.

3. Why Not Use strip() Instead?

If you tried something like:

df = df.withColumn("col1", df["col1"].strip())  # ❌ This won't work!

It would throw an error, because strip() is a method of Python strings, not a PySpark column.

Instead, you must use:

from pyspark.sql.functions import trim

df = df.withColumn("col1", trim(df["col1"]))  # ✅ This works in PySpark

4. Applying to All String Columns

The logic I suggested:

from pyspark.sql.functions import trim, col

df = df.select([trim(col(c)).alias(c) if dtype == "string" else col(c) for c, dtype in df.dtypes])
  • Uses trim(col(c)) to apply trimming only to string-type columns.
  • Keeps other column types unchanged.

Final Thought

If you’re working outside PySpark, use Python’s strip(). But inside PySpark, always use pyspark.sql.functions.trim() for better performance and distributed execution.

Yes! PySpark provides a rich set of string functions in the pyspark.sql.functions module. Here’s a list of commonly used string functions along with their descriptions:


🔹 1. String Manipulation Functions

FunctionDescriptionExample
trim(col)Removes leading and trailing spaces from a string.trim(col("name"))
ltrim(col)Removes leading spaces from a string.ltrim(col("name"))
rtrim(col)Removes trailing spaces from a string.rtrim(col("name"))
lower(col)Converts a string to lowercase.lower(col("name"))"hello"
upper(col)Converts a string to uppercase.upper(col("name"))"HELLO"
initcap(col)Capitalizes the first letter of each word.initcap(col("name"))"Hello World"
reverse(col)Reverses the string.reverse(col("name"))"olleH"
concat(col1, col2, …)Concatenates multiple string columns.concat(col("first"), col("last"))
concat_ws(sep, col1, col2, …)Concatenates strings with a separator.concat_ws("-", col("first"), col("last"))"John-Doe"
format_string(fmt, col1, col2, …)Formats strings using C-style format.format_string("Name: %s", col("name"))
repeat(col, n)Repeats a string n times.repeat(col("name"), 3)"HelloHelloHello"

🔹 2. Substring & Splitting Functions

FunctionDescriptionExample
substring(col, pos, len)Extracts a substring starting at pos (1-based index) with length len.substring(col("name"), 2, 3)"ell"
split(col, pattern)Splits a string using a regex pattern and returns an array.split(col("email"), "@")
regexp_extract(col, pattern, group)Extracts regex pattern from a string.regexp_extract(col("email"), "(\\w+)@(\\w+)", 1)
regexp_replace(col, pattern, replacement)Replaces occurrences of a regex pattern in a string.regexp_replace(col("name"), "a", "o")

🔹 3. Padding Functions

FunctionDescriptionExample
lpad(col, len, pad)Left-pads a string with pad to a total length of len.lpad(col("name"), 10, "*")"*****Hello"
rpad(col, len, pad)Right-pads a string with pad to a total length of len.rpad(col("name"), 10, "*")"Hello*****"

🔹 4. Finding & Replacing Functions

FunctionDescriptionExample
locate(substr, col, pos=1)Finds position of substring substr (1-based index). Returns 0 if not found.locate("l", col("name"))3
instr(col, substr)Finds position of substring (same as locate).instr(col("name"), "l")
translate(col, from_str, to_str)Replaces characters in a string (like tr in Unix).translate(col("name"), "aeiou", "12345")
overlay(col, replace, pos, len)Replaces part of a string with another string.overlay(col("name"), "XYZ", 2, 3)

🔹 5. Length & Case Checking Functions

FunctionDescriptionExample
length(col)Returns the length of a string.length(col("name"))
char_length(col)Alias for length().char_length(col("name"))
octet_length(col)Returns the number of bytes in a string.octet_length(col("name"))
bit_length(col)Returns the bit length of a string.bit_length(col("name"))

🔹 6. Checking String Patterns

FunctionDescriptionExample
like(col, pattern)Checks if the column matches a SQL LIKE pattern (% = any characters, _ = single character).col("name").like("J%")
rlike(col, pattern)Checks if the column matches a regex pattern.col("name").rlike("^J.*")
contains(col, substr)Checks if the column contains a substring.col("name").contains("John")
startswith(col, substr)Checks if the column starts with a substring.col("name").startswith("J")
endswith(col, substr)Checks if the column ends with a substring.col("name").endswith("n")

Example Usage

Here’s how you can use these functions in a PySpark DataFrame:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, upper, length, split, concat_ws

# Initialize Spark
spark = SparkSession.builder.appName("StringFunctions").getOrCreate()

# Sample DataFrame
data = [(" John  ", "john.doe@example.com"),
        ("Alice", "alice@example.com"),
        ("Bob   ", "bob@example.com")]

df = spark.createDataFrame(data, ["name", "email"])

# Apply string functions
df = df.select(
    trim(col("name")).alias("trimmed_name"),  # Remove spaces
    upper(col("name")).alias("uppercase_name"),  # Convert to uppercase
    length(col("name")).alias("name_length"),  # Get string length
    split(col("email"), "@").alias("email_parts"),  # Split email
    concat_ws("-", col("name"), col("email")).alias("full_info")  # Concatenate columns
)

df.show(truncate=False)

Conclusion

PySpark has many built-in string functions that help with cleaning, formatting, and processing text data efficiently. Using native PySpark functions ensures performance optimization in a distributed environment.

So for string functions We will use inbuilt pyspark functions but then what about list and dictionary objects which are frequently used in pyspark script what functions we will use and where it will get executed considering distributed nature!!

Great question! In PySpark, handling lists and dictionaries depends on whether they are used inside:

  1. PySpark DataFrame operations (distributed execution)
  2. Python driver code (local execution)

1. List & Dictionary in PySpark (Distributed Execution)

When working with DataFrame columns, PySpark has built-in functions to manipulate lists and dictionaries in a distributed manner. These functions run on Spark workers across the cluster.

Handling Lists (ArrayType in PySpark)

PySpark provides SQL functions for handling list (array) columns inside DataFrames.

  • Create Lists in a Column: from pyspark.sql.functions import array, col df = df.withColumn("new_col", array(col("col1"), col("col2"))) # Creates a list [col1, col2]
  • Extract Elements from Lists: from pyspark.sql.functions import col df = df.withColumn("first_element", col("list_col")[0]) # Get first element of array column
  • Check If a Value Exists in a List Column: from pyspark.sql.functions import array_contains df = df.withColumn("has_value", array_contains(col("list_col"), "target_value"))

Handling Dictionaries (MapType in PySpark)

PySpark has a MapType for handling dictionary-like data.

  • Create a Dictionary Column: from pyspark.sql.types import MapType, StringType from pyspark.sql.functions import create_map, lit df = df.withColumn("dict_col", create_map(lit("key1"), col("value1"), lit("key2"), col("value2")))
  • Extract Value from Dictionary: df = df.withColumn("value1", col("dict_col")["key1"]) # Accessing a dictionary key in PySpark
  • Check If Key Exists in a Dictionary Column: from pyspark.sql.functions import map_keys df = df.withColumn("has_key", map_keys(col("dict_col")).contains("key1"))

2. List & Dictionary in Python Driver Code (Local Execution)

If you’re working outside the DataFrame (in normal Python code), you use Python’s built-in list and dictionary functions. These operations run on the driver node, not on Spark workers.

Example:

# Local execution (not distributed)
my_list = [1, 2, 3]
my_dict = {"key1": "value1", "key2": "value2"}

print(my_list[0])         # Output: 1
print(my_dict["key1"])    # Output: "value1"

3. Where Do These Executions Happen?

OperationExecution LocationExample
Python lists/dictsLocal (Driver Node)my_list[0], my_dict["key"]
PySpark array (list)Distributed (Workers)col("array_col")[0]
PySpark map (dict)Distributed (Workers)col("map_col")["key"]

4. Converting Between PySpark & Python Objects

If you need to process data using Python’s list/dict functions, you must collect the data to the driver (use with caution for large data!).

list_from_spark = df.select("array_col").rdd.map(lambda row: row[0]).collect()  # Collect list column
dict_from_spark = df.select("map_col").rdd.map(lambda row: row[0]).collect()  # Collect dict column

Final Takeaway

  • Use PySpark functions (array, create_map, etc.) for distributed list/dict processing.
  • Use Python’s built-in list/dict methods only in driver code.
  • Avoid collect() on large DataFrames—it pulls data into the driver, defeating Spark’s distributed nature.

Different ways to collect a PySpark DataFrame as rows and use them in different scenarios:


1. Collecting DataFrame as a List of Rows

df_list = df.collect()
for row in df_list:
    print(row)  # Each row is a Row object

🔹 Use Case: Useful when working with small DataFrames and when processing needs to be done in Python.


2. Collecting DataFrame as a List of Lists

df_list_of_lists = [list(row) for row in df.collect()]
print(df_list_of_lists)

🔹 Use Case: If you need a list of values for each row instead of Row objects.


3. Collecting DataFrame as a Dictionary (Column-wise)

df_dict = {col: df.select(col).rdd.flatMap(lambda x: x).collect() for col in df.columns}
print(df_dict)

🔹 Use Case: When we want to access values based on column names.


4. Counting Number of Nulls in Each Column

from pyspark.sql.functions import col, sum

null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts.show()

🔹 Use Case: Checking missing values for data quality analysis.


5. Collecting Data Types of Columns

df_schema = {col_name: dtype for col_name, dtype in df.dtypes}
print(df_schema)  # Output: {'col1': 'int', 'col2': 'string', ...}

🔹 Use Case: Understanding the schema of a DataFrame dynamically.


6. Collecting Only Column Names in a List

columns_list = df.columns
print(columns_list)

🔹 Use Case: If you want to loop through columns or dynamically select columns.


from pyspark.sql.functions import col, isnan, count, when

# Create a dictionary to hold the null counts
null_counts = {}

for column in df.columns:
    count_val = df.select(
        count(when(isnan(col(column)) | col(column).isNull(), 1)).alias("null_count")
    ).collect()[0]["null_count"]
    null_counts[column] = count_val

# Now you can print out the results
for column, count_val in null_counts.items():
    print(f"{column} has {count_val} null values")
print(null_counts)

from pyspark.sql.functions import isnan, when, count, col

for column in df.columns:
    exec(f"{column}_nc = df.select(count(when(isnan(col(column)) | col(column).isNull(), 1))).collect()[0][0]")
for column in df.columns:
    print(f"{column} has {eval(f'{column}_nc')} null values")

from pyspark.sql.functions import col, isnan, count, when

# Create a dictionary to hold the null counts
null_counts = {}

for column in df.columns:
    count_val = df.select(
        count(when(isnan(col(column)) | col(column).isNull(), 1)).alias("null_count")
    ).collect()[0]["null_count"]
    null_counts[column] = count_val

# Now you can print out the results
for column, count_val in null_counts.items():
    print(f"{column} has {count_val} null values")
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, current_timestamp, lit
from pyspark.sql.types import StringType, StructType, StructField, IntegerType, DoubleType, TimestampType

# Create your SparkSession if not already created
spark = SparkSession.builder.getOrCreate()

def get_table_create_time(table_name):
    """
    Extracts the table creation time from the formatted description.
    Note: The exact extraction might depend on your Spark/Metastore.
    """
    desc_df = spark.sql(f"DESCRIBE FORMATTED {table_name}")
    # Filter rows that might contain 'CreateTime' info (case may vary).
    create_time_row = desc_df.filter(desc_df.col_name.contains("CreateTime")).head()
    if create_time_row:
        # Assume the formatted output has the create time in the 'data_type' field.
        # You might need to adjust the parsing depending on your metastore's format.
        return create_time_row.data_type.strip()
    return None

def compute_dq_for_df(df, source_name, primary_key, created_time):
    """
    For each column in df, compute:
      - Data type (from schema)
      - Null count
      - Non-null count
      - Non-null ratio (non-null count / total rows)

    Returns a DataFrame with the following columns:
      Source, CreatedTime, ColumnName, DataType, NullCount, NonNullCount, TotalCount, NonNullRatio
    """
    total_count = df.count()
    results = []

    for field in df.schema.fields:
        col_name = field.name
        data_type = field.dataType.simpleString()

        # Compute null count: covers both None and NaN (for numeric columns)
        null_count = df.filter( col(col_name).isNull() | isnan(col(col_name)) ).count()
        non_null_count = total_count - null_count
        ratio = non_null_count / total_count if total_count > 0 else None

        results.append((source_name, created_time, col_name, data_type, null_count, non_null_count, total_count, ratio))

    schema = StructType([
        StructField("Source", StringType(), True),
        StructField("CreatedTime", StringType(), True),  # or TimestampType() if parsed
        StructField("ColumnName", StringType(), True),
        StructField("DataType", StringType(), True),
        StructField("NullCount", IntegerType(), True),
        StructField("NonNullCount", IntegerType(), True),
        StructField("TotalCount", IntegerType(), True),
        StructField("NonNullRatio", DoubleType(), True)
    ])

    return spark.createDataFrame(results, schema=schema)

# List to collect DQ report DataFrames
dq_report_dfs = []

### Case 1: Table sources ###
table_names = ["table1", "table2"]  # list your table names here

for table in table_names:
    # Get creation time from metadata (could be None if not found)
    create_time = get_table_create_time(table)
    # Load the table as DataFrame
    df_table = spark.table(table)
    dq_df = compute_dq_for_df(df_table, source_name=table, primary_key="your_primary_key", created_time=create_time)
    dq_report_dfs.append(dq_df)

### Case 2: CSV-based DataFrames ###
csv_files = {
    "csv_data1": "/path/to/data1.csv",
    "csv_data2": "/path/to/data2.csv"
}

for source_name, csv_path in csv_files.items():
    # Read CSV into DataFrame (add any specific options as needed)
    df_csv = spark.read.option("header", True).csv(csv_path)
    # For CSV files, use current timestamp (or another logic) as the creation time
    created_time = str(spark.sql("SELECT current_timestamp()").head()[0])
    dq_df = compute_dq_for_df(df_csv, source_name=source_name, primary_key="your_primary_key", created_time=created_time)
    dq_report_dfs.append(dq_df)

# Combine all DQ report DataFrames into one final report
if dq_report_dfs:
    final_dq_report = dq_report_dfs[0]
    for df_temp in dq_report_dfs[1:]:
        final_dq_report = final_dq_report.union(df_temp)

# Show the DQ report
final_dq_report.show(truncate=False)

# Optionally, write the final report to a table for future use
final_dq_report.write.mode("overwrite").saveAsTable("dq_report_table")
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, isnan
from datetime import datetime

# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()

def compute_dq_summary(df, source_name, created_time):
    """
    Computes a DQ summary for the entire DataFrame and returns a single-row dictionary.

    The returned dictionary has:
      - source_name: identifier of the data source (table or CSV)
      - created_time: creation time as string
      - data_types: dictionary {column: data type}
      - null_counts: dictionary {column: null count}
      - non_null_counts: dictionary {column: non-null count}
      - total_count: total number of rows in the DataFrame
      - null_ratios: dictionary {column: non-null ratio}
    """
    total_count = df.count()

    # Create a dictionary for data types from the schema
    data_types = {field.name: field.dataType.simpleString() for field in df.schema.fields}

    # Initialize dictionaries to hold column-wise metrics
    null_counts = {}
    non_null_counts = {}
    null_ratios = {}

    for col_name in df.columns:
        # Count nulls (including NaN for numeric columns)
        n_null = df.filter(col(col_name).isNull() | isnan(col(col_name))).count()
        n_non_null = total_count - n_null
        ratio = n_non_null / total_count if total_count > 0 else None

        null_counts[col_name] = n_null
        non_null_counts[col_name] = n_non_null
        null_ratios[col_name] = ratio

    # Build the summary dictionary
    summary = {
        "source_name": source_name,
        "created_time": created_time,
        "data_types": data_types,
        "null_counts": null_counts,
        "non_null_counts": non_null_counts,
        "total_count": total_count,
        "null_ratios": null_ratios
    }

    return summary

# Example usage for a DataFrame from a table:
table_name = "your_table_name"
df_table = spark.table(table_name)

# For a table, assume you have a method to retrieve the creation time from metadata
# Here, we just use the current timestamp for illustration
created_time_table = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

summary_table = compute_dq_summary(df_table, source_name=table_name, created_time=created_time_table)

# Example usage for a CSV-based DataFrame:
csv_source = "csv_data1"
df_csv = spark.read.option("header", True).csv("/path/to/data1.csv")
created_time_csv = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
summary_csv = compute_dq_summary(df_csv, source_name=csv_source, created_time=created_time_csv)

# Combine the summaries into a list
summaries = [summary_table, summary_csv]

# Create a DataFrame from the list of dictionaries
dq_report_df = spark.createDataFrame([Row(**s) for s in summaries])

# Show the report
dq_report_df.show(truncate=False)

# Optionally, save this report as a table
dq_report_df.write.mode("overwrite").saveAsTable("dq_report_summary")

Written By

undefined

Related Posts

Submit a Comment

Your email address will not be published. Required fields are marked *