Creating DataFrames in PySpark
Creating DataFrames in PySpark is essential for processing large-scale data efficiently. PySpark allows DataFrames to be created from various sources, ranging from manual data entry to structured storage systems. Below are different ways to create PySpark DataFrames, along with interesting examples.
1. Creating DataFrames from List of Tuples (Manual Entry)
This is one of the simplest ways to create a PySpark DataFrame manually.
from pyspark.sql import SparkSession
# Initialize Spark Session
spark = SparkSession.builder.appName("CreateDataFrame").getOrCreate()
# List of tuples
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 35)]
# Define column names
columns = ["ID", "Name", "Age"]
# Create DataFrame
df = spark.createDataFrame(data, columns)
df.show()
Use Case: Best for small, manually defined datasets.
2. Creating DataFrames from CSV Files
PySpark can load structured data from CSV files efficiently.
df = spark.read.csv("people.csv", header=True, inferSchema=True)
df.show()
Use Case: Useful when working with structured tabular data stored in CSV format.
3. Creating DataFrames from JSON Files
JSON files are widely used for semi-structured data.
df = spark.read.json("data.json")
df.show()
Use Case: Best for APIs, logs, or nested data.
4. Creating DataFrames from Parquet Files
Parquet is a columnar storage format optimized for big data processing.
df = spark.read.parquet("data.parquet")
df.show()
Use Case: Recommended for fast data processing in Spark.
5. Creating DataFrames from Databases (JDBC Connection)
Connecting to external databases is common for real-world ETL tasks.
df = spark.read \
.format("jdbc") \
.option("url", "jdbc:mysql://localhost:3306/testdb") \
.option("dbtable", "users") \
.option("user", "root") \
.option("password", "password") \
.load()
df.show()
Use Case: Best for integrating with external SQL databases.
6. Creating DataFrames from RDDs
Sometimes, raw RDDs need to be converted into DataFrames.
from pyspark.sql import Row
rdd = spark.sparkContext.parallelize([
Row(ID=1, Name="Alice", Age=25),
Row(ID=2, Name="Bob", Age=30),
Row(ID=3, Name="Charlie", Age=35)
])
df = spark.createDataFrame(rdd)
df.show()
Use Case: Useful for transitioning from RDD-based transformations to DataFrames.
7. Creating DataFrames from Pandas DataFrames
Converting a Pandas DataFrame to PySpark is helpful when scaling operations.
import pandas as pd
# Create Pandas DataFrame
pdf = pd.DataFrame({"ID": [1, 2, 3], "Name": ["Alice", "Bob", "Charlie"], "Age": [25, 30, 35]})
# Convert to PySpark DataFrame
df = spark.createDataFrame(pdf)
df.show()
Use Case: Best for transitioning from local Pandas to distributed PySpark.
8. Creating DataFrames from API Response (Using JSON Parsing)
For web scraping or API data processing, JSON responses can be converted into DataFrames.
import requests
import json
response = requests.get("https://api.example.com/users")
data = json.loads(response.text)
df = spark.createDataFrame(data)
df.show()
Use Case: Useful for processing real-time API data.
9. Creating DataFrames from XML Data
Spark supports XML parsing through third-party libraries.
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([
StructField("ID", StringType(), True),
StructField("Name", StringType(), True),
StructField("Age", StringType(), True)
])
df = spark.read.format("com.databricks.spark.xml") \
.option("rowTag", "person") \
.schema(schema) \
.load("people.xml")
df.show()
Use Case: Useful for handling structured XML-based datasets.
10. Creating DataFrames Using Range for Auto-Generated Data
If you need a sequence of numbers, range()
can quickly create a DataFrame.
df = spark.range(1, 6).toDF("ID")
df.show()
Use Case: Useful for generating test sequences or dummy IDs.
Summary of Methods
Method | Use Case |
---|---|
List of Tuples | Simple and widely used |
CSV Files | Best for tabular structured data |
JSON Files | Ideal for nested and semi-structured data |
Parquet Files | Best for big data performance |
JDBC Databases | Useful for ETL and database integration |
RDD Conversion | Transitioning from RDDs to DataFrames |
Pandas Conversion | Best for scaling Pandas workloads |
API Response (JSON) | Real-time API data processing |
XML Parsing | Handling structured XML data |
Auto-Generated Range | Generating test data quickly |
Creating Dummy DataFrames in PySpark
Creating dummy DataFrames in PySpark is useful for testing, prototyping, and learning. PySpark provides multiple ways to create DataFrames manually, each suited to different scenarios. Below are various methods to create dummy DataFrames with examples.
1. Using List of Tuples (Most Common Method)
This is one of the most common ways to create a PySpark DataFrame.
from pyspark.sql import SparkSession
# Initialize Spark Session
spark = SparkSession.builder.appName("DummyDataFrame").getOrCreate()
# List of tuples
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 35)]
# Define column names
columns = ["ID", "Name", "Age"]
# Create DataFrame
df = spark.createDataFrame(data, columns)
df.show()
Use Case: Best for small, manually defined datasets.
2. Using List of Lists
This method is similar to the list of tuples but uses lists instead.
data = [[1, "Alice", 25], [2, "Bob", 30], [3, "Charlie", 35]]
df = spark.createDataFrame(data, columns)
df.show()
Use Case: When working with mutable lists instead of immutable tuples.
3. Using Dictionary with Row Objects
Using Row
objects allows for named attributes, making it easy to access values.
from pyspark.sql import Row
data = [Row(ID=1, Name="Alice", Age=25),
Row(ID=2, Name="Bob", Age=30),
Row(ID=3, Name="Charlie", Age=35)]
df = spark.createDataFrame(data)
df.show()
Use Case: When you need named fields and structured data representation.
4. Using Dictionary with Explicit Schema
When you want stricter control over column types, defining a schema is a good approach.
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# Define schema
schema = StructType([
StructField("ID", IntegerType(), True),
StructField("Name", StringType(), True),
StructField("Age", IntegerType(), True)
])
# Create DataFrame from a list of dictionaries
data = [{"ID": 1, "Name": "Alice", "Age": 25},
{"ID": 2, "Name": "Bob", "Age": 30},
{"ID": 3, "Name": "Charlie", "Age": 35}]
df = spark.createDataFrame(data, schema=schema)
df.show()
Use Case: Ensures correct data types and improves performance.
5. Using RDD with Row Objects
If you are working with distributed data, creating an RDD first can be beneficial.
rdd = spark.sparkContext.parallelize([
Row(ID=1, Name="Alice", Age=25),
Row(ID=2, Name="Bob", Age=30),
Row(ID=3, Name="Charlie", Age=35)
])
df = spark.createDataFrame(rdd)
df.show()
Use Case: Best when working with large distributed datasets.
6. Using Pandas DataFrame Conversion
If you already have a Pandas DataFrame, you can convert it to a PySpark DataFrame.
import pandas as pd
# Create Pandas DataFrame
pdf = pd.DataFrame({"ID": [1, 2, 3], "Name": ["Alice", "Bob", "Charlie"], "Age": [25, 30, 35]})
# Convert to PySpark DataFrame
df = spark.createDataFrame(pdf)
df.show()
Use Case: When transitioning from Pandas to PySpark.
7. Using range()
for Auto-Generated Data
If you need a sequence of numbers, range()
can quickly create a DataFrame.
df = spark.range(1, 6).toDF("ID")
df.show()
Use Case: When you need an auto-incrementing column.
Summary of Methods
Method | Use Case |
---|---|
List of Tuples | Simple and widely used |
List of Lists | Similar to tuples but mutable |
Dictionary with Row | Allows named attributes |
Dictionary with Schema | Ensures correct data types |
RDD with Row | Works well for distributed data |
Pandas Conversion | Best for small datasets |
range() for Auto-Generated Data | When you need incremental values |
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,when,count
spark=SparkSession.builder.appName("MySparK Learning1").getOrCreate()
import random
import string
random_numbers=[random.randint(1,100) for _ in range(100)]
domain_names=['gmail.com','xyz.com','hotmail.com','wikipedia.org','hintstoday.com']
def generate_random_username(length=8):
#characters = string.ascii_lowercase + string.digits # Lowercase letters and digits
letter_and_digits=string.ascii_letters+string.digits
username=''.join(random.choice(letter_and_digits) for _ in range(length))
return username
def get_username():
username=''.join(random.choice(string.ascii_lowercase+string.digits) for _ in range(10))
return username
def get_first_name():
length=random.randint(5,10)
first_name=''.join(random.choice(string.ascii_lowercase) for _ in range(length))
return first_name
email_adresses=[]
for n in range(5):
user_name=generate_random_username()
chosen_names=random.choice(domain_names)
email_add=f"{user_name}@{chosen_names}"
email_adresses.append(email_add)
print(email_adresses)
data=[(i+1,(datetime.now()).strftime('%y-%m-%d'), random.randint(1000,100000),get_username() + "@" + random.choice(domain_names), get_first_name()+ ' '+ get_first_name()) for i in range(100)]
columns=['Sl_No',"Date_Val","Ran_id","email","Name"]
df=spark.createDataFrame(data,columns)
df.show()
data=((1,),(2,),(3,))
columns=['No']
df=spark.createDataFrame(data,columns)
df.show()
df1=[(1,)]
col=['r1']
df2=spark.createDataFrame(df1,col)
df2.show()
from pyspark.sql.functions import upper
str(df.columns).split()
''.join(str(df.columns).split())
' '.join(df.columns)
df.toDF(*[column.replace('_','X')+'_v1' for column in df.columns])
df1=df.select('b').filter(col('b')%2==0 )
df2=df.join(df1.select('b').filter(col('b')%2==0 ), 'b')
df2.show()
from pyspark.sql import Row
rdd = spark.sparkContext.parallelize([
Row(ID=1, Name="Alice", Age=25),
Row(ID=2, Name="Bob", Age=30),
Row(ID=3, Name="Charlie", Age=35)
])
df = spark.createDataFrame(rdd)
df.show()
import random
import string
username = ''.join(list(random.choice(string.ascii_lowercase + string.digits) for _ in range(10)))
def get_address():
Flat_Name=''.join([random.choice(string.ascii_letters + string.digits) for _ in range(5)])
username = ''.join(list(random.choice(string.ascii_lowercase + string.digits) for _ in range(10)))
street_name=''.join(random.choice(string.ascii_letters + string.digits) for _ in range(5))
City=''.join(random.choice(string.ascii_letters + string.digits) for _ in range(7))
Area_Name=''.join(random.choice(string.ascii_letters + string.digits) for _ in range(5))
pincode=random.randint(800000,900000)
Country='India'
return f"{username} {Flat_Name} , {street_name} , {City} , {pincode} , {Country}"
print(get_address())
from pyspark.sql import Row
rdd=spark.sparkContext.parallelize(Row(Sl_No=i+1,nickname=''.join(list(random.choice(string.ascii_lowercase + string.digits) for _ in range(10))) , Address=get_address()) for i in range(100))
df_data= spark.createDataFrame(rdd)
df_data.show()
RDD (Resilient Distributed Dataset) is the fundamental data structure in Apache Spark. It provides an abstraction for distributed data and allows parallel processing. Below is an overview of RDD-based programming in PySpark.
RDD-Based Programming in PySpark
1. What is an RDD?
An RDD (Resilient Distributed Dataset) is an immutable, distributed collection of objects that can be processed in parallel across a Spark cluster. It supports fault tolerance, lazy evaluation, and partitioning for optimized processing.
Key Features of RDDs:
- Immutable: Once created, RDDs cannot be changed.
- Distributed: RDDs are stored across multiple nodes in a cluster.
- Fault-Tolerant: Spark can automatically recover lost data.
- Lazy Evaluation: Transformations are not executed immediately but only when an action is triggered.
- Partitioned: Data is divided across nodes for parallel execution.
2. Creating RDDs in PySpark
There are two primary ways to create RDDs:
A. Creating RDDs from a List
from pyspark.sql import SparkSession
# Initialize Spark Session
spark = SparkSession.builder.appName("RDDExample").getOrCreate()
sc = spark.sparkContext # SparkContext
# Creating RDD from List
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
# Show RDD Elements
print(rdd.collect())
Use Case: Useful for quick testing with small datasets.
B. Creating RDDs from an External File
rdd = sc.textFile("sample.txt") # Reads a text file
print(rdd.collect())
Use Case: When loading data from external storage such as HDFS, S3, or local files.
3. RDD Transformations
Transformations return new RDDs without modifying the existing ones. They follow lazy evaluation.
Common Transformations:
A. map()
Applies a function to each element.
rdd_squared = rdd.map(lambda x: x * x)
print(rdd_squared.collect()) # Example output: [1, 4, 9, 16, 25]
B. filter()
Filters elements based on a condition.
rdd_even = rdd.filter(lambda x: x % 2 == 0)
print(rdd_even.collect()) # Example output: [2, 4]
C. flatMap()
Flattens nested structures.
rdd_words = sc.parallelize(["hello world", "hi there"])
rdd_split = rdd_words.flatMap(lambda line: line.split(" "))
print(rdd_split.collect()) # Example output: ['hello', 'world', 'hi', 'there']
D. distinct()
Removes duplicate elements.
rdd_duplicate = sc.parallelize([1, 2, 2, 3, 4, 4, 5])
rdd_distinct = rdd_duplicate.distinct()
print(rdd_distinct.collect()) # Example output: [1, 2, 3, 4, 5]
E. union()
Combines two RDDs.
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([4, 5, 6])
rdd_union = rdd1.union(rdd2)
print(rdd_union.collect()) # Example output: [1, 2, 3, 4, 5, 6]
4. RDD Actions
Actions trigger execution and return results.
Common Actions:
A. collect()
Retrieves all elements from the RDD.
print(rdd.collect())
B. count()
Counts the number of elements.
print(rdd.count())
C. reduce()
Aggregates elements using a function.
sum_rdd = rdd.reduce(lambda x, y: x + y)
print(sum_rdd) # Example output: 15
D. take(n)
Returns the first n
elements.
print(rdd.take(3)) # Example output: [1, 2, 3]
E. first()
Returns the first element.
print(rdd.first()) # Example output: 1
5. RDD Persistence (Caching)
RDDs can be cached to optimize performance when reused.
Using cache()
rdd_cached = rdd.cache()
rdd_cached.count() # Forces caching
Using persist()
from pyspark import StorageLevel
rdd_persist = rdd.persist(StorageLevel.MEMORY_AND_DISK)
Use Case: Useful when an RDD is used multiple times to avoid recomputation.
6. Key-Value RDD Operations (Pair RDDs)
Pair RDDs store data in (key, value)
format, enabling grouping, sorting, and aggregations.
A. Creating a Pair RDD
pair_rdd = sc.parallelize([("Alice", 25), ("Bob", 30), ("Alice", 28)])
B. reduceByKey()
Aggregates values by key.
rdd_age_sum = pair_rdd.reduceByKey(lambda x, y: x + y)
print(rdd_age_sum.collect()) # Example output: [('Alice', 53), ('Bob', 30)]
C. groupByKey()
Groups values by key.
rdd_group = pair_rdd.groupByKey()
print([(k, list(v)) for k, v in rdd_group.collect()])
# Example output: [('Alice', [25, 28]), ('Bob', [30])]
D. sortByKey()
Sorts data based on keys.
rdd_sorted = pair_rdd.sortByKey()
print(rdd_sorted.collect()) # Example output: [('Alice', 25), ('Alice', 28), ('Bob', 30)]
7. Converting RDDs to DataFrames
PySpark supports easy conversion from RDDs to DataFrames.
from pyspark.sql import Row
# Convert RDD to DataFrame
df = rdd.map(lambda x: Row(number=x)).toDF()
df.show()
Use Case: When transitioning from RDDs to the more optimized DataFrame API.
8. When to Use RDDs Over DataFrames
Feature | RDD | DataFrame |
---|---|---|
Ease of Use | More complex | Simpler |
Performance | Slower | Faster (Optimized with Catalyst) |
Schema | Not enforced | Enforced |
Operations | Low-level control | High-level SQL-like operations |
Use RDDs when:
- You need fine-grained control over data transformations.
- The data is unstructured, and schema enforcement is not needed.
- Custom low-level optimizations are required.
Use DataFrames when:
- Performance is critical (DataFrames are optimized).
- SQL-like operations are needed.
Conclusion
RDD-based programming in PySpark provides low-level control and distributed data processing capabilities. While DataFrames are preferred for most workloads due to optimizations, RDDs are still useful for specific scenarios where fine-tuned operations are needed.
from pyspark.sql import SparkSession
from pyspark.sql import Row
# Initialize Spark Session (SparkSession internally manages SparkContext)
spark = SparkSession.builder.appName("RDD_Based_Processing").getOrCreate()
# Access SparkContext from SparkSession
sc = spark.sparkContext # Still available if needed
# Explanation:
# - RDDs require SparkContext (sc.parallelize()) because they are a low-level API.
# - DataFrames use SparkSession (spark.createDataFrame()), which manages SparkContext internally.
# Step 1: Creating an RDD from a list of tuples
data = [(1, "Alice", 25, "Engineer"), (2, "Bob", 30, "Doctor"), (3, "Charlie", 35, "Teacher")]
rdd = sc.parallelize(data) # Using SparkContext explicitly
# Step 2: Transforming RDD (Mapping to Rows)
row_rdd = rdd.map(lambda x: Row(ID=x[0], Name=x[1], Age=x[2], Occupation=x[3]))
# Step 3: Converting RDD to DataFrame
df = spark.createDataFrame(row_rdd) # Using SparkSession (No need for sc)
df.show()
# Step 4: Filtering Data
filtered_rdd = rdd.filter(lambda x: x[2] > 28) # Keep people older than 28
# Step 5: Applying Transformation (Mapping)
mapped_rdd = filtered_rdd.map(lambda x: (x[0], x[1].upper(), x[2] + 5, x[3]))
# Step 6: Reducing Data (Counting occupations)
occu_rdd = rdd.map(lambda x: (x[3], 1)).reduceByKey(lambda x, y: x + y)
# Step 7: Sorting Data
sorted_rdd = mapped_rdd.sortBy(lambda x: x[2], ascending=False)
# Step 8: Collecting and Printing Data
print("Filtered Data:", filtered_rdd.collect())
print("Mapped Data:", mapped_rdd.collect())
print("Occupation Count:", occu_rdd.collect())
print("Sorted Data:", sorted_rdd.collect())
# Stopping Spark Session
spark.stop()
RDDs in PySpark do not inherently have a schema or column names. Unlike DataFrames, RDDs are just distributed collections of objects (tuples, lists, dictionaries, etc.), and their structure is determined by how they are used.
Example of an RDD Without Schema:
# Creating an RDD from a list
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
# Printing the RDD
print(rdd.collect()) # Output: [1, 2, 3, 4, 5]
Here, the RDD is just a collection of numbers without any predefined schema.
Adding Schema to an RDD (Converting to DataFrame)
If you want to assign column names or a schema, you need to convert the RDD into a DataFrame using Row
or StructType
.
Example 1: Using Row
for Named Columns
from pyspark.sql import Row
# Creating an RDD with structured data
data = [(1, "Alice", 25), (2, "Bob", 30)]
rdd = sc.parallelize(data)
# Converting RDD to DataFrame with column names
df = spark.createDataFrame(rdd.map(lambda x: Row(ID=x[0], Name=x[1], Age=x[2])))
df.show()
Output:
+---+-----+---+
| ID| Name|Age|
+---+-----+---+
| 1|Alice| 25|
| 2| Bob| 30|
+---+-----+---+
Here, we used Row
to provide column names.
Example 2: Using StructType
for an Explicit Schema
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("ID", IntegerType(), True),
StructField("Name", StringType(), True),
StructField("Age", IntegerType(), True)
])
df = spark.createDataFrame(rdd, schema)
df.show()
This method is preferred when defining explicit schemas.
Summary
- RDDs do NOT have column names or schemas by default.
- If you need named columns, convert the RDD to a DataFrame using
Row
orStructType
. - RDDs are useful for low-level transformations, while DataFrames provide structured data handling.
Yes, the code you provided fails because PySpark cannot directly convert an RDD of primitive types (like integers) into a DataFrame. PySpark expects a structured format (such as a list of tuples or Row objects) when creating a DataFrame.
data = [1, 2, 3, 4, 5]
rdd = scforrdd.parallelize(data)
df=spark.createDataFrame(rdd)
df.show()
this fails
How to Fix It?
You need to convert the RDD into a structured format by specifying column names or using Row objects.
Solution 1: Convert to a List of Tuples (Recommended)
from pyspark.sql import SparkSession
# Initialize SparkSession
spark = SparkSession.builder.appName("RDD_to_DF").getOrCreate()
sc = spark.sparkContext # Get SparkContext
# Create RDD from a list
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize([(x,) for x in data]) # Convert to a list of tuples
# Create DataFrame with a column name
df = spark.createDataFrame(rdd, ["Numbers"])
df.show()
Output:
+-------+
|Numbers|
+-------+
| 1|
| 2|
| 3|
| 4|
| 5|
+-------+
Solution 2: Use Row Objects
from pyspark.sql import Row
rdd = sc.parallelize([Row(Numbers=x) for x in data])
df = spark.createDataFrame(rdd)
df.show()
Both methods ensure the data is structured properly before creating a DataFrame.
Here’s a detailed guide on PySpark DataFrame column and row manipulation with useful implementations:
1. Column Manipulation in PySpark DataFrames
1.1 Renaming Columns
Rename a Single Column
df = df.withColumnRenamed("old_column", "new_column")
Rename Multiple Columns
new_column_names = {"old1": "new1", "old2": "new2"}
for old, new in new_column_names.items():
df = df.withColumnRenamed(old, new)
Add a Suffix to All Column Names
df = df.toDF(*[col + "_v1" for col in df.columns])
1.2 Checking Data Types
Check Data Type of a Specific Column
print(df.schema["column_name"].dataType)
Get Data Types of All Columns
df.dtypes # Returns a list of (column_name, data_type)
Check Schema of DataFrame
df.printSchema()
1.3 Apply Dynamic Logic to All Columns
Example: Trim All String Columns
from pyspark.sql.functions import col, trim
df = df.select([trim(col(c)).alias(c) if dtype == "string" else col(c) for c, dtype in df.dtypes])
Example: Convert All Integer Columns to Double
from pyspark.sql.functions import col
df = df.select([col(c).cast("double") if dtype == "int" else col(c) for c, dtype in df.dtypes])
Example: Replace Nulls in All String Columns with “Unknown”
df = df.fillna("Unknown")
2. Row-Based DataFrame Manipulation
2.1 Collecting Rows One by One
Convert DataFrame to a List of Rows
rows = df.collect()
for row in rows:
print(row)
Using toLocalIterator()
for Large DataFrames (Efficient)
for row in df.toLocalIterator():
print(row)
2.2 Filtering Rows
Filter Rows Based on a Condition
df_filtered = df.filter(df["Age"] > 30)
Filter Multiple Conditions
df_filtered = df.filter((df["Age"] > 30) & (df["Gender"] == "Male"))
2.3 Sorting Rows
df_sorted = df.orderBy("Age", ascending=False)
2.4 Adding a New Row (Using Union)
from pyspark.sql import Row
new_row = Row(ID=100, Name="John Doe", Age=40)
df_new = df.union(spark.createDataFrame([new_row], df.schema))
3. Useful Implementations
3.1 Finding Duplicate Rows
df.groupBy(df.columns).count().filter("count > 1").show()
3.2 Removing Duplicate Rows
df = df.dropDuplicates()
3.3 Adding a New Column Dynamically
from pyspark.sql.functions import lit
df = df.withColumn("NewColumn", lit("DefaultValue"))
Conclusion
- PySpark allows flexible column manipulations like renaming, checking types, and applying transformations.
- Row operations like filtering, sorting, and iterating can be done efficiently.
- Collecting data should be handled carefully to avoid memory overload.
- Dynamic transformations make it easy to process large datasets.
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import col, trim, lit
# Initialize Spark Session
spark = SparkSession.builder.appName("DataFrame_Manipulation").getOrCreate()
# Sample Data
data = [(1, "Alice", 25, "Engineer"), (2, "Bob", 30, "Doctor"), (3, "Charlie", 35, "Teacher")]
df = spark.createDataFrame(data, ["ID", "Name", "Age", "Occupation"])
# 1. Column Manipulation
# Rename a Single Column
df = df.withColumnRenamed("Occupation", "Job")
# Rename Multiple Columns
column_rename_map = {"ID": "UserID", "Name": "FullName"}
for old, new in column_rename_map.items():
df = df.withColumnRenamed(old, new)
# Add a Suffix to All Columns
df = df.toDF(*[col + "_v1" for col in df.columns])
# Check Data Types
df.printSchema()
# Apply Dynamic Logic to All Columns: Trim All String Columns
df = df.select([trim(col(c)).alias(c) if dtype == "string" else col(c) for c, dtype in df.dtypes])
# Convert All Integer Columns to Double
df = df.select([col(c).cast("double") if dtype == "int" else col(c) for c, dtype in df.dtypes])
# 2. Row-Based Manipulation
# Collect Rows One by One
for row in df.collect():
print(row)
# Efficient Row Iteration
for row in df.toLocalIterator():
print(row)
# Filtering Rows
df_filtered = df.filter((df["Age_v1"] > 28.0) & (df["FullName_v1"] != "Bob"))
df_filtered.show()
# Sorting Rows
df_sorted = df.orderBy("Age_v1", ascending=False)
df_sorted.show()
# Adding a New Row
df_new = df.union(spark.createDataFrame([Row(UserID_v1=4.0, FullName_v1="David", Age_v1=40.0, Job_v1="Scientist")], df.schema))
df_new.show()
# Finding Duplicate Rows
df.groupBy(df.columns).count().filter("count > 1").show()
# Removing Duplicate Rows
df = df.dropDuplicates()
df.show()
# Adding a New Column Dynamically
df = df.withColumn("NewColumn_v1", lit("DefaultValue"))
df.show()
# Stop Spark Session
spark.stop()