PySpark provides a powerful API for data manipulation, similar to pandas, but optimized for big data processing. Below is a comprehensive overview of DataFrame operations, functions, and syntax in PySpark with examples.
Creating DataFrames
Creating DataFrames from various sources is a common task in PySpark. Below are examples for creating DataFrames from CSV files, Excel files, Python List, Python Tuple, Python Dictionary, Pandas DataFrames, Hive tables, values, RDDs, Oracle databases, and HBase tables.
1. Creating DataFrames from CSV Files
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from CSV")
.getOrCreate()
# Read CSV file
df_csv = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
df_csv.show()
2. Creating DataFrames from Excel Files
To read Excel files, you need to install the spark-excel
library.
# Add the following dependency when initializing SparkSession
# .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.5")
df_excel = spark.read
.format("com.crealytics.spark.excel")
.option("header", "true")
.option("inferSchema", "true")
.load("path/to/file.xlsx")
df_excel.show()
3. Creating DataFrames from Python List
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from List")
.getOrCreate()
# Create DataFrame from list
data = [("Alice", 30), ("Bob", 25), ("Cathy", 28)]
columns = ["name", "age"]
df_list = spark.createDataFrame(data, columns)
df_list.show()
4. Creating DataFrames from Python Tuple
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from Tuple")
.getOrCreate()
# Create DataFrame from tuple
data = (("Alice", 30), ("Bob", 25), ("Cathy", 28))
columns = ["name", "age"]
df_tuple = spark.createDataFrame(data, columns)
df_tuple.show()
5. Creating DataFrames from Python Dictionary
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from Dictionary")
.getOrCreate()
# Create DataFrame from dictionary
data = {"name": ["Alice", "Bob", "Cathy"], "age": [30, 25, 28]}
df_dict = spark.createDataFrame(pd.DataFrame(data))
df_dict.show()
6. Creating DataFrames from Pandas DataFrame
import pandas as pd
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from Pandas")
.getOrCreate()
# Create a Pandas DataFrame
pdf = pd.DataFrame({"name": ["Alice", "Bob", "Cathy"], "age": [30, 25, 28]})
# Convert Pandas DataFrame to Spark DataFrame
df_pandas = spark.createDataFrame(pdf)
df_pandas.show()
7. Creating DataFrames from Hive Tables
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from Hive")
.enableHiveSupport()
.getOrCreate()
# Read Hive table
df_hive = spark.sql("SELECT * FROM database.table_name")
df_hive.show()
8. Creating DataFrames from Values
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from Values")
.getOrCreate()
# Create DataFrame from values
data = [("Alice", 30), ("Bob", 25), ("Cathy", 28)]
columns = ["name", "age"]
df_values = spark.createDataFrame(data, columns)
df_values.show()
9. Creating DataFrames from RDDs
from pyspark.sql import Row, SparkSession
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from RDD")
.getOrCreate()
# Create an RDD
rdd = spark.sparkContext.parallelize([
Row(name="Alice", age=30),
Row(name="Bob", age=25),
Row(name="Cathy", age=28)
])
# Convert RDD to DataFrame
df_rdd = spark.createDataFrame(rdd)
df_rdd.show()
10. Creating DataFrames from Oracle Database
To read from Oracle, you need to have the JDBC driver for Oracle in your classpath.
# Add the following dependency when initializing SparkSession
# .config("spark.jars", "path/to/ojdbc8.jar")
jdbc_url = "jdbc:oracle:thin:@hostname:port:SID"
connection_properties = {
"user": "username",
"password": "password",
"driver": "oracle.jdbc.driver.OracleDriver"
}
df_oracle = spark.read.jdbc(jdbc_url, "schema.table_name", properties=connection_properties)
df_oracle.show()
11. Creating DataFrames from HBase Tables
To read from HBase, you need the hbase-spark
connector.
# Add the following dependency when initializing SparkSession
# .config("spark.jars.packages", "org.apache.hbase.connectors.spark:hbase-spark:1.0.0")
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from HBase")
.getOrCreate()
# Define HBase catalog
catalog = ''.join("""{
"table":{"namespace":"default", "name":"tablename"},
"rowkey":"key",
"columns":{
"col0":{"cf":"rowkey", "col":"key", "type":"string"},
"col1":{"cf":"cf1", "col":"col1", "type":"string"},
"col2":{"cf":"cf2", "col":"col2", "type":"string"}
}
}""".split())
df_hbase = spark.read
.options(catalog=catalog)
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
df_hbase.show()
12. Creating DataFrames from JSON Files
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from JSON")
.getOrCreate()
# Read JSON file
df_json = spark.read.json("path/to/file.json")
df_json.show()
13. Creating DataFrames from Parquet Files
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder
.appName("Create DataFrames from Parquet")
.getOrCreate()
# Read Parquet file
df_parquet = spark.read.parquet("path/to/file.parquet")
df_parquet.show()
Leave a Reply