Creating DataFrames in PySpark

Creating DataFrames in PySpark is essential for processing large-scale data efficiently. PySpark allows DataFrames to be created from various sources, ranging from manual data entry to structured storage systems. Below are different ways to create PySpark DataFrames, along with interesting examples.


1. Creating DataFrames from List of Tuples (Manual Entry)

This is one of the simplest ways to create a PySpark DataFrame manually.

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("CreateDataFrame").getOrCreate()

# List of tuples
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 35)]

# Define column names
columns = ["ID", "Name", "Age"]

# Create DataFrame
df = spark.createDataFrame(data, columns)
df.show()

Use Case: Best for small, manually defined datasets.


2. Creating DataFrames from CSV Files

PySpark can load structured data from CSV files efficiently.

df = spark.read.csv("people.csv", header=True, inferSchema=True)
df.show()

Use Case: Useful when working with structured tabular data stored in CSV format.


3. Creating DataFrames from JSON Files

JSON files are widely used for semi-structured data.

df = spark.read.json("data.json")
df.show()

Use Case: Best for APIs, logs, or nested data.


4. Creating DataFrames from Parquet Files

Parquet is a columnar storage format optimized for big data processing.

df = spark.read.parquet("data.parquet")
df.show()

Use Case: Recommended for fast data processing in Spark.


5. Creating DataFrames from Databases (JDBC Connection)

Connecting to external databases is common for real-world ETL tasks.

df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/testdb") \
    .option("dbtable", "users") \
    .option("user", "root") \
    .option("password", "password") \
    .load()
df.show()

Use Case: Best for integrating with external SQL databases.


6. Creating DataFrames from RDDs

Sometimes, raw RDDs need to be converted into DataFrames.

from pyspark.sql import Row

rdd = spark.sparkContext.parallelize([
    Row(ID=1, Name="Alice", Age=25),
    Row(ID=2, Name="Bob", Age=30),
    Row(ID=3, Name="Charlie", Age=35)
])

df = spark.createDataFrame(rdd)
df.show()

Use Case: Useful for transitioning from RDD-based transformations to DataFrames.


7. Creating DataFrames from Pandas DataFrames

Converting a Pandas DataFrame to PySpark is helpful when scaling operations.

import pandas as pd

# Create Pandas DataFrame
pdf = pd.DataFrame({"ID": [1, 2, 3], "Name": ["Alice", "Bob", "Charlie"], "Age": [25, 30, 35]})

# Convert to PySpark DataFrame
df = spark.createDataFrame(pdf)
df.show()

Use Case: Best for transitioning from local Pandas to distributed PySpark.


8. Creating DataFrames from API Response (Using JSON Parsing)

For web scraping or API data processing, JSON responses can be converted into DataFrames.

import requests
import json

response = requests.get("https://api.example.com/users")
data = json.loads(response.text)

df = spark.createDataFrame(data)
df.show()

Use Case: Useful for processing real-time API data.


9. Creating DataFrames from XML Data

Spark supports XML parsing through third-party libraries.

from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField("ID", StringType(), True),
    StructField("Name", StringType(), True),
    StructField("Age", StringType(), True)
])

df = spark.read.format("com.databricks.spark.xml") \
    .option("rowTag", "person") \
    .schema(schema) \
    .load("people.xml")
df.show()

Use Case: Useful for handling structured XML-based datasets.


10. Creating DataFrames Using Range for Auto-Generated Data

If you need a sequence of numbers, range() can quickly create a DataFrame.

df = spark.range(1, 6).toDF("ID")
df.show()

Use Case: Useful for generating test sequences or dummy IDs.


Summary of Methods

MethodUse Case
List of TuplesSimple and widely used
CSV FilesBest for tabular structured data
JSON FilesIdeal for nested and semi-structured data
Parquet FilesBest for big data performance
JDBC DatabasesUseful for ETL and database integration
RDD ConversionTransitioning from RDDs to DataFrames
Pandas ConversionBest for scaling Pandas workloads
API Response (JSON)Real-time API data processing
XML ParsingHandling structured XML data
Auto-Generated RangeGenerating test data quickly

Creating Dummy DataFrames in PySpark

Creating dummy DataFrames in PySpark is useful for testing, prototyping, and learning. PySpark provides multiple ways to create DataFrames manually, each suited to different scenarios. Below are various methods to create dummy DataFrames with examples.


1. Using List of Tuples (Most Common Method)

This is one of the most common ways to create a PySpark DataFrame.

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("DummyDataFrame").getOrCreate()

# List of tuples
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 35)]

# Define column names
columns = ["ID", "Name", "Age"]

# Create DataFrame
df = spark.createDataFrame(data, columns)
df.show()

Use Case: Best for small, manually defined datasets.


2. Using List of Lists

This method is similar to the list of tuples but uses lists instead.

data = [[1, "Alice", 25], [2, "Bob", 30], [3, "Charlie", 35]]
df = spark.createDataFrame(data, columns)
df.show()

Use Case: When working with mutable lists instead of immutable tuples.


3. Using Dictionary with Row Objects

Using Row objects allows for named attributes, making it easy to access values.

from pyspark.sql import Row

data = [Row(ID=1, Name="Alice", Age=25),
        Row(ID=2, Name="Bob", Age=30),
        Row(ID=3, Name="Charlie", Age=35)]

df = spark.createDataFrame(data)
df.show()

Use Case: When you need named fields and structured data representation.


4. Using Dictionary with Explicit Schema

When you want stricter control over column types, defining a schema is a good approach.

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Define schema
schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True)
])

# Create DataFrame from a list of dictionaries
data = [{"ID": 1, "Name": "Alice", "Age": 25},
        {"ID": 2, "Name": "Bob", "Age": 30},
        {"ID": 3, "Name": "Charlie", "Age": 35}]

df = spark.createDataFrame(data, schema=schema)
df.show()

Use Case: Ensures correct data types and improves performance.


5. Using RDD with Row Objects

If you are working with distributed data, creating an RDD first can be beneficial.

rdd = spark.sparkContext.parallelize([
    Row(ID=1, Name="Alice", Age=25),
    Row(ID=2, Name="Bob", Age=30),
    Row(ID=3, Name="Charlie", Age=35)
])

df = spark.createDataFrame(rdd)
df.show()

Use Case: Best when working with large distributed datasets.


6. Using Pandas DataFrame Conversion

If you already have a Pandas DataFrame, you can convert it to a PySpark DataFrame.

import pandas as pd

# Create Pandas DataFrame
pdf = pd.DataFrame({"ID": [1, 2, 3], "Name": ["Alice", "Bob", "Charlie"], "Age": [25, 30, 35]})

# Convert to PySpark DataFrame
df = spark.createDataFrame(pdf)
df.show()

Use Case: When transitioning from Pandas to PySpark.


7. Using range() for Auto-Generated Data

If you need a sequence of numbers, range() can quickly create a DataFrame.

df = spark.range(1, 6).toDF("ID")
df.show()

Use Case: When you need an auto-incrementing column.


Summary of Methods

MethodUse Case
List of TuplesSimple and widely used
List of ListsSimilar to tuples but mutable
Dictionary with RowAllows named attributes
Dictionary with SchemaEnsures correct data types
RDD with RowWorks well for distributed data
Pandas ConversionBest for small datasets
range() for Auto-Generated DataWhen you need incremental values

from pyspark.sql import  SparkSession
from pyspark.sql.functions import col,when,count
spark=SparkSession.builder.appName("MySparK Learning1").getOrCreate()
import random
import string
random_numbers=[random.randint(1,100) for _ in range(100)]
domain_names=['gmail.com','xyz.com','hotmail.com','wikipedia.org','hintstoday.com']
def generate_random_username(length=8):
  #characters = string.ascii_lowercase + string.digits  # Lowercase letters and digits
  letter_and_digits=string.ascii_letters+string.digits
  username=''.join(random.choice(letter_and_digits) for _ in range(length))
  return username
def get_username():
  username=''.join(random.choice(string.ascii_lowercase+string.digits) for _ in range(10))
  return username
def get_first_name():
  length=random.randint(5,10)
  first_name=''.join(random.choice(string.ascii_lowercase) for _ in range(length))
  return first_name


email_adresses=[]
for n in range(5):
  user_name=generate_random_username()
  chosen_names=random.choice(domain_names)
  email_add=f"{user_name}@{chosen_names}"
  email_adresses.append(email_add)

print(email_adresses)  

data=[(i+1,(datetime.now()).strftime('%y-%m-%d'), random.randint(1000,100000),get_username() + "@" + random.choice(domain_names), get_first_name()+ ' '+ get_first_name()) for i in range(100)]
columns=['Sl_No',"Date_Val","Ran_id","email","Name"]
df=spark.createDataFrame(data,columns)
df.show()

data=((1,),(2,),(3,))
columns=['No']
df=spark.createDataFrame(data,columns)
df.show()
df1=[(1,)]
col=['r1']
df2=spark.createDataFrame(df1,col)
df2.show()

from pyspark.sql.functions import upper
str(df.columns).split()
''.join(str(df.columns).split())
' '.join(df.columns)
df.toDF(*[column.replace('_','X')+'_v1' for column in df.columns])
df1=df.select('b').filter(col('b')%2==0 )
df2=df.join(df1.select('b').filter(col('b')%2==0 ), 'b')
df2.show()


  

from pyspark.sql import Row
rdd = spark.sparkContext.parallelize([
    Row(ID=1, Name="Alice", Age=25),
    Row(ID=2, Name="Bob", Age=30),
    Row(ID=3, Name="Charlie", Age=35)
])

df = spark.createDataFrame(rdd)
df.show()


import random
import string

username = ''.join(list(random.choice(string.ascii_lowercase + string.digits) for _ in range(10)))

def get_address():
  Flat_Name=''.join([random.choice(string.ascii_letters + string.digits) for _ in range(5)])
  username = ''.join(list(random.choice(string.ascii_lowercase + string.digits) for _ in range(10)))
  street_name=''.join(random.choice(string.ascii_letters + string.digits) for _ in range(5))
  City=''.join(random.choice(string.ascii_letters + string.digits) for _ in range(7))
  Area_Name=''.join(random.choice(string.ascii_letters + string.digits) for _ in range(5))
  pincode=random.randint(800000,900000)
  Country='India' 
  return f"{username} {Flat_Name} , {street_name} , {City} , {pincode} , {Country}"

print(get_address())

from pyspark.sql import Row
rdd=spark.sparkContext.parallelize(Row(Sl_No=i+1,nickname=''.join(list(random.choice(string.ascii_lowercase + string.digits) for _ in range(10))) , Address=get_address()) for i in range(100))
df_data= spark.createDataFrame(rdd)
df_data.show()


Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Pages ( 1 of 7 ): 1 23 ... 7Next »

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading