Cheat sheet at First

CategoryFunctionDescriptionExample
Data Readingspark.read.csv()Read a CSV file into a DataFrame.df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
spark.read.json()Read a JSON file into a DataFrame.df = spark.read.json("path/to/file.json")
spark.read.parquet()Read a Parquet file into a DataFrame.df = spark.read.parquet("path/to/file.parquet")
DataFrame CreationcreateDataFrame()Create a DataFrame from an RDD or a list.df = spark.createDataFrame(data, schema)
from_records()Create a DataFrame from structured data (e.g., list of tuples).df = spark.createDataFrame(data, ["col1", "col2"])
Transformationselect()Select specific columns from the DataFrame.df.select("column1", "column2")
filter()Filter rows based on a condition.df.filter(df["column"] > 100)
groupBy()Group rows by a specific column and perform aggregations.df.groupBy("column").count()
withColumn()Add a new column or replace an existing column.df.withColumn("new_col", df["col1"] + 1)
join()Join two DataFrames together.df1.join(df2, "common_col")
union()Combine two DataFrames with the same schema.df1.union(df2)
drop()Drop specified columns from the DataFrame.df.drop("column1")
distinct()Return a new DataFrame with distinct rows.df.distinct()
orderBy()Sort the DataFrame based on one or more columns.df.orderBy("column1", ascending=False)
pivot()Pivot a DataFrame to reshape it.df.groupBy("column1").pivot("column2").agg(F.sum("value"))
transpose()Transpose the DataFrame (not directly available).Use other methods to achieve it.
Window FunctionsWindow.partitionBy()Create a window specification for calculations over specified partitions.windowSpec = Window.partitionBy("column").orderBy("value")
row_number()Assign a unique row number to rows within a partition.df.withColumn("row_num", F.row_number().over(windowSpec))
UDFsudf.register()Register a Python function as a UDF.spark.udf.register("my_udf", my_function)
withColumn()Use a UDF to create a new column.df.withColumn("new_col", udf("my_udf")(df["col"]))
String ManipulationF.concat()Concatenate multiple strings into one.df.withColumn("new_col", F.concat(df["col1"], df["col2"]))
F.substring()Extract a substring from a string column.df.withColumn("sub_col", F.substring("col", 1, 3))
F.lower()Convert a string column to lowercase.df.withColumn("lower_col", F.lower("col"))
Date ManipulationF.current_date()Get the current date.df.withColumn("current_date", F.current_date())
F.date_add()Add days to a date column.df.withColumn("new_date", F.date_add("date_col", 5))
F.year()Extract the year from a date column.df.withColumn("year", F.year("date_col"))
Schema and Data TypesStructType()Define the schema of a DataFrame.schema = StructType([...])
DataTypeDefine data types for DataFrame columns.IntegerType(), StringType(), ...
Actionsshow(n)Displays the first n rows of the DataFrame. Defaults to 20.df.show(5)
printSchema()Prints the schema of the DataFrame.df.printSchema()
count()Count the number of rows in the DataFrame.row_count = df.count()
collect()Retrieve all rows from the DataFrame as a list.data = df.collect()
Optimizationpersist()Store DataFrame in memory/disk for re-use.df.persist()
cache()Cache the DataFrame in memory.df.cache()
repartition()Change the number of partitions of the DataFrame.df.repartition(4)
coalesce()Reduce the number of partitions without a full shuffle.df.coalesce(2)
Broadcast JoinsOptimize joins by broadcasting the smaller DataFrame to all nodes.df1.join(broadcast(df2), "common_col")
Data ManagementBucketingSplit data into buckets for optimized joins and queries.Use CLUSTER BY in Hive or bucketBy(n, "col") in DataFrame.
Query OptimizationCatalyst OptimizerSpark’s query engine that optimizes execution plans.Managed implicitly by Spark.
Predicate PushdownFilter data at the source instead of loading into memory.Automatically handled by optimizer.
Advanced JoinsSort Merge JoinEfficient for large datasets with sorted keys.df1.join(df2, "key", "outer")
Skew JoinsOptimized for skewed data distributions.Use skew hints in join methods.

Creating DataFrames in PySpark

Creating DataFrames from various sources is a common task in PySpark. Below are examples of creating DataFrames from multiple sources with properly structured headings.


1. Creating DataFrames from CSV Files

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Create DataFrames from CSV") \
    .getOrCreate()

# Read CSV file
df_csv = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
df_csv.show()

2. Creating DataFrames from Excel Files

To read Excel files, the spark-excel library is required.

# Add the following dependency when initializing SparkSession
# .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.5")

df_excel = spark.read \
    .format("com.crealytics.spark.excel") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("path/to/file.xlsx")
df_excel.show()

3. Creating DataFrames from Python List

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Create DataFrames from List") \
    .getOrCreate()

# Create DataFrame from list
data = [("Alice", 30), ("Bob", 25), ("Cathy", 28)]
columns = ["name", "age"]
df_list = spark.createDataFrame(data, columns)
df_list.show()

4. Creating DataFrames from Python Tuple

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Create DataFrames from Tuple") \
    .getOrCreate()

# Create DataFrame from tuple
data = (("Alice", 30), ("Bob", 25), ("Cathy", 28))
columns = ["name", "age"]
df_tuple = spark.createDataFrame(data, columns)
df_tuple.show()

5. Creating DataFrames from Python Dictionary

from pyspark.sql import SparkSession
import pandas as pd

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Create DataFrames from Dictionary") \
    .getOrCreate()

# Create DataFrame from dictionary
data = {"name": ["Alice", "Bob", "Cathy"], "age": [30, 25, 28]}
df_dict = spark.createDataFrame(pd.DataFrame(data))
df_dict.show()

6. Creating DataFrames from Pandas DataFrame

import pandas as pd
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Create DataFrames from Pandas") \
    .getOrCreate()

# Create a Pandas DataFrame
pdf = pd.DataFrame({"name": ["Alice", "Bob", "Cathy"], "age": [30, 25, 28]})

# Convert Pandas DataFrame to Spark DataFrame
df_pandas = spark.createDataFrame(pdf)
df_pandas.show()

7. Creating DataFrames from Hive Tables

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Create DataFrames from Hive") \
    .enableHiveSupport() \
    .getOrCreate()

# Read Hive table
df_hive = spark.sql("SELECT * FROM database.table_name")
df_hive.show()

8. Creating DataFrames from Values

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Create DataFrames from Values") \
    .getOrCreate()

# Create DataFrame from values
data = [("Alice", 30), ("Bob", 25), ("Cathy", 28)]
columns = ["name", "age"]
df_values = spark.createDataFrame(data, columns)
df_values.show()

9. Creating DataFrames from RDDs

from pyspark.sql import Row, SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Create DataFrames from RDD") \
    .getOrCreate()

# Create an RDD
rdd = spark.sparkContext.parallelize([
    Row(name="Alice", age=30),
    Row(name="Bob", age=25),
    Row(name="Cathy", age=28)
])

# Convert RDD to DataFrame
df_rdd = spark.createDataFrame(rdd)
df_rdd.show()

10. Creating DataFrames from Oracle Database

To read from Oracle, ensure the JDBC driver is in your classpath.

# Add the following dependency when initializing SparkSession
# .config("spark.jars", "path/to/ojdbc8.jar")

jdbc_url = "jdbc:oracle:thin:@hostname:port:SID"
connection_properties = {
    "user": "username",
    "password": "password",
    "driver": "oracle.jdbc.driver.OracleDriver"
}

df_oracle = spark.read.jdbc(jdbc_url, "schema.table_name", properties=connection_properties)
df_oracle.show()

11. Creating DataFrames from HBase Tables

To read from HBase, the HBase-Spark connector is required.

# Add the following dependency when initializing SparkSession
# .config("spark.jars.packages", "org.apache.hbase.connectors.spark:hbase-spark:1.0.0")

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Create DataFrames from HBase") \
    .getOrCreate()

# Define HBase catalog
catalog = '''{
    "table":{"namespace":"default", "name":"tablename"},
    "rowkey":"key",
    "columns":{
        "col0":{"cf":"rowkey", "col":"key", "type":"string"},
        "col1":{"cf":"cf1", "col":"col1", "type":"string"},
        "col2":{"cf":"cf2", "col":"col2", "type":"string"}
    }
}'''

df_hbase = spark.read \
    .options(catalog=catalog) \
    .format("org.apache.spark.sql.execution.datasources.hbase") \
    .load()
df_hbase.show()

12. Creating DataFrames from JSON Files

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Create DataFrames from JSON") \
    .getOrCreate()

# Read JSON file
df_json = spark.read.json("path/to/file.json")
df_json.show()

13. Creating DataFrames from Parquet Files

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Create DataFrames from Parquet") \
    .getOrCreate()

# Read Parquet file
df_parquet = spark.read.parquet("path/to/file.parquet")
df_parquet.show()

Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Pages ( 1 of 8 ): 1 23 ... 8Next »

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading