Cheat sheet at First
Category | Function | Description | Example |
---|---|---|---|
Data Reading | spark.read.csv() | Read a CSV file into a DataFrame. | df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True) |
spark.read.json() | Read a JSON file into a DataFrame. | df = spark.read.json("path/to/file.json") | |
spark.read.parquet() | Read a Parquet file into a DataFrame. | df = spark.read.parquet("path/to/file.parquet") | |
DataFrame Creation | createDataFrame() | Create a DataFrame from an RDD or a list. | df = spark.createDataFrame(data, schema) |
from_records() | Create a DataFrame from structured data (e.g., list of tuples). | df = spark.createDataFrame(data, ["col1", "col2"]) | |
Transformation | select() | Select specific columns from the DataFrame. | df.select("column1", "column2") |
filter() | Filter rows based on a condition. | df.filter(df["column"] > 100) | |
groupBy() | Group rows by a specific column and perform aggregations. | df.groupBy("column").count() | |
withColumn() | Add a new column or replace an existing column. | df.withColumn("new_col", df["col1"] + 1) | |
join() | Join two DataFrames together. | df1.join(df2, "common_col") | |
union() | Combine two DataFrames with the same schema. | df1.union(df2) | |
drop() | Drop specified columns from the DataFrame. | df.drop("column1") | |
distinct() | Return a new DataFrame with distinct rows. | df.distinct() | |
orderBy() | Sort the DataFrame based on one or more columns. | df.orderBy("column1", ascending=False) | |
pivot() | Pivot a DataFrame to reshape it. | df.groupBy("column1").pivot("column2").agg(F.sum("value")) | |
transpose() | Transpose the DataFrame (not directly available). | Use other methods to achieve it. | |
Window Functions | Window.partitionBy() | Create a window specification for calculations over specified partitions. | windowSpec = Window.partitionBy("column").orderBy("value") |
row_number() | Assign a unique row number to rows within a partition. | df.withColumn("row_num", F.row_number().over(windowSpec)) | |
UDFs | udf.register() | Register a Python function as a UDF. | spark.udf.register("my_udf", my_function) |
withColumn() | Use a UDF to create a new column. | df.withColumn("new_col", udf("my_udf")(df["col"])) | |
String Manipulation | F.concat() | Concatenate multiple strings into one. | df.withColumn("new_col", F.concat(df["col1"], df["col2"])) |
F.substring() | Extract a substring from a string column. | df.withColumn("sub_col", F.substring("col", 1, 3)) | |
F.lower() | Convert a string column to lowercase. | df.withColumn("lower_col", F.lower("col")) | |
Date Manipulation | F.current_date() | Get the current date. | df.withColumn("current_date", F.current_date()) |
F.date_add() | Add days to a date column. | df.withColumn("new_date", F.date_add("date_col", 5)) | |
F.year() | Extract the year from a date column. | df.withColumn("year", F.year("date_col")) | |
Schema and Data Types | StructType() | Define the schema of a DataFrame. | schema = StructType([...]) |
DataType | Define data types for DataFrame columns. | IntegerType(), StringType(), ... | |
Actions | show(n) | Displays the first n rows of the DataFrame. Defaults to 20. | df.show(5) |
printSchema() | Prints the schema of the DataFrame. | df.printSchema() | |
count() | Count the number of rows in the DataFrame. | row_count = df.count() | |
collect() | Retrieve all rows from the DataFrame as a list. | data = df.collect() | |
Optimization | persist() | Store DataFrame in memory/disk for re-use. | df.persist() |
cache() | Cache the DataFrame in memory. | df.cache() | |
repartition() | Change the number of partitions of the DataFrame. | df.repartition(4) | |
coalesce() | Reduce the number of partitions without a full shuffle. | df.coalesce(2) | |
Broadcast Joins | Optimize joins by broadcasting the smaller DataFrame to all nodes. | df1.join(broadcast(df2), "common_col") | |
Data Management | Bucketing | Split data into buckets for optimized joins and queries. | Use CLUSTER BY in Hive or bucketBy(n, "col") in DataFrame. |
Query Optimization | Catalyst Optimizer | Spark’s query engine that optimizes execution plans. | Managed implicitly by Spark. |
Predicate Pushdown | Filter data at the source instead of loading into memory. | Automatically handled by optimizer. | |
Advanced Joins | Sort Merge Join | Efficient for large datasets with sorted keys. | df1.join(df2, "key", "outer") |
Skew Joins | Optimized for skewed data distributions. | Use skew hints in join methods. |
Creating DataFrames in PySpark
Creating DataFrames from various sources is a common task in PySpark. Below are examples of creating DataFrames from multiple sources with properly structured headings.
1. Creating DataFrames from CSV Files
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from CSV") \
.getOrCreate()
# Read CSV file
df_csv = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
df_csv.show()
2. Creating DataFrames from Excel Files
To read Excel files, the spark-excel library is required.
# Add the following dependency when initializing SparkSession
# .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.5")
df_excel = spark.read \
.format("com.crealytics.spark.excel") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("path/to/file.xlsx")
df_excel.show()
3. Creating DataFrames from Python List
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from List") \
.getOrCreate()
# Create DataFrame from list
data = [("Alice", 30), ("Bob", 25), ("Cathy", 28)]
columns = ["name", "age"]
df_list = spark.createDataFrame(data, columns)
df_list.show()
4. Creating DataFrames from Python Tuple
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from Tuple") \
.getOrCreate()
# Create DataFrame from tuple
data = (("Alice", 30), ("Bob", 25), ("Cathy", 28))
columns = ["name", "age"]
df_tuple = spark.createDataFrame(data, columns)
df_tuple.show()
5. Creating DataFrames from Python Dictionary
from pyspark.sql import SparkSession
import pandas as pd
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from Dictionary") \
.getOrCreate()
# Create DataFrame from dictionary
data = {"name": ["Alice", "Bob", "Cathy"], "age": [30, 25, 28]}
df_dict = spark.createDataFrame(pd.DataFrame(data))
df_dict.show()
6. Creating DataFrames from Pandas DataFrame
import pandas as pd
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from Pandas") \
.getOrCreate()
# Create a Pandas DataFrame
pdf = pd.DataFrame({"name": ["Alice", "Bob", "Cathy"], "age": [30, 25, 28]})
# Convert Pandas DataFrame to Spark DataFrame
df_pandas = spark.createDataFrame(pdf)
df_pandas.show()
7. Creating DataFrames from Hive Tables
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from Hive") \
.enableHiveSupport() \
.getOrCreate()
# Read Hive table
df_hive = spark.sql("SELECT * FROM database.table_name")
df_hive.show()
8. Creating DataFrames from Values
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from Values") \
.getOrCreate()
# Create DataFrame from values
data = [("Alice", 30), ("Bob", 25), ("Cathy", 28)]
columns = ["name", "age"]
df_values = spark.createDataFrame(data, columns)
df_values.show()
9. Creating DataFrames from RDDs
from pyspark.sql import Row, SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from RDD") \
.getOrCreate()
# Create an RDD
rdd = spark.sparkContext.parallelize([
Row(name="Alice", age=30),
Row(name="Bob", age=25),
Row(name="Cathy", age=28)
])
# Convert RDD to DataFrame
df_rdd = spark.createDataFrame(rdd)
df_rdd.show()
10. Creating DataFrames from Oracle Database
To read from Oracle, ensure the JDBC driver is in your classpath.
# Add the following dependency when initializing SparkSession
# .config("spark.jars", "path/to/ojdbc8.jar")
jdbc_url = "jdbc:oracle:thin:@hostname:port:SID"
connection_properties = {
"user": "username",
"password": "password",
"driver": "oracle.jdbc.driver.OracleDriver"
}
df_oracle = spark.read.jdbc(jdbc_url, "schema.table_name", properties=connection_properties)
df_oracle.show()
11. Creating DataFrames from HBase Tables
To read from HBase, the HBase-Spark connector is required.
# Add the following dependency when initializing SparkSession
# .config("spark.jars.packages", "org.apache.hbase.connectors.spark:hbase-spark:1.0.0")
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from HBase") \
.getOrCreate()
# Define HBase catalog
catalog = '''{
"table":{"namespace":"default", "name":"tablename"},
"rowkey":"key",
"columns":{
"col0":{"cf":"rowkey", "col":"key", "type":"string"},
"col1":{"cf":"cf1", "col":"col1", "type":"string"},
"col2":{"cf":"cf2", "col":"col2", "type":"string"}
}
}'''
df_hbase = spark.read \
.options(catalog=catalog) \
.format("org.apache.spark.sql.execution.datasources.hbase") \
.load()
df_hbase.show()
12. Creating DataFrames from JSON Files
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from JSON") \
.getOrCreate()
# Read JSON file
df_json = spark.read.json("path/to/file.json")
df_json.show()
13. Creating DataFrames from Parquet Files
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Create DataFrames from Parquet") \
.getOrCreate()
# Read Parquet file
df_parquet = spark.read.parquet("path/to/file.parquet")
df_parquet.show()
Discover more from HintsToday
Subscribe to get the latest posts sent to your email.