PySpark cheat sheet


Here’s a PySpark cheat sheet designed for a quick yet comprehensive revision of PySpark concepts, architecture, optimizations, commands, and common operations. This guide is structured to cover the essentials from architecture to data processing and Spark SQL.


1. PySpark Architecture Basics

ComponentDescription
Driver ProgramMain program; responsible for creating SparkContext, defining transformations, and coordinating actions.
SparkContextCore connection to the Spark cluster, used to create RDDs and DataFrames.
Cluster ManagerManages resources across nodes (e.g., YARN, Mesos, or Standalone mode).
ExecutorsWorker nodes that execute tasks, store data, and return results.
TasksIndividual units of work sent to executors by the driver program.
JobEach action in Spark triggers a job, consisting of multiple stages and tasks.
StageJobs are broken into stages at shuffle boundaries.
DAG SchedulerTransforms jobs into a DAG (Directed Acyclic Graph) of stages and tasks.

2. Spark Submit Command

spark-submit 
  --master yarn 
  --deploy-mode cluster 
  --num-executors 10 
  --executor-cores 4 
  --executor-memory 16g 
  --driver-memory 16g 
  --conf spark.sql.shuffle.partitions=200 
  script.py
ParameterDescription
--masterSets the cluster manager (e.g., yarn, local, mesos).
--deploy-modeclient or cluster (execution on client or cluster mode).
--num-executorsNumber of executors to use.
--executor-coresNumber of CPU cores per executor.
--executor-memoryMemory per executor.
--driver-memoryMemory allocated for the driver.
--confAdditional Spark configurations (e.g., shuffle.partitions).

3. DataFrame Basics and Operations

Creating a DataFrame

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("example").getOrCreate()
data = [("Alice", 1), ("Bob", 2)]
columns = ["Name", "Id"]
df = spark.createDataFrame(data, schema=columns)

Viewing Data

CommandDescription
df.show(n)Display first n rows of DataFrame.
df.printSchema()Print schema of DataFrame.
df.columnsList columns of DataFrame.
df.describe().show()Summary statistics for numerical columns.

Selecting Columns

df.select("Name", "Id").show()
df.select(df.Name, df.Id).show()

Filtering Data

df.filter(df["Id"] > 1).show()
df.filter("Id > 1").show()  # SQL-style string condition

Adding and Renaming Columns

from pyspark.sql.functions import lit

df = df.withColumn("Country", lit("USA"))
df = df.withColumnRenamed("Id", "Identifier")

Dropping Columns

df = df.drop("Country")

4. Data Transformations

TransformationExampleDescription
Maprdd.map(lambda x: x * 2)Applies a function to each element.
Filterrdd.filter(lambda x: x > 2)Filters elements based on condition.
FlatMaprdd.flatMap(lambda x: x.split(" "))Applies function and flattens the result.
Distinctdf.distinct()Removes duplicates.
Uniondf.union(df2)Combines two DataFrames with the same schema.
GroupBy + Aggregationdf.groupBy("col").count()Groups and aggregates data.

Aggregations

from pyspark.sql.functions import avg, sum, max, min

df.groupBy("Name").agg(
    sum("Id").alias("Total_Id"),
    avg("Id").alias("Average_Id")
).show()

Sorting

df.orderBy("Id", ascending=False).show()
df.sort("Id", "Name").show()

5. Spark SQL

Registering a DataFrame as a SQL Table

df.createOrReplaceTempView("people")

Executing SQL Queries

sql_df = spark.sql("SELECT * FROM people WHERE Id > 1")
sql_df.show()

Useful SQL Commands

CommandDescription
SELECT col FROM tableSelect specific columns from a table.
WHERE conditionFilter rows based on condition.
GROUP BY colGroup rows based on a column.
ORDER BY colSort rows by column.
JOINPerform SQL joins (INNER, LEFT, RIGHT, FULL).
LIMIT nLimit the number of rows in the output.

Pivoting and Melting (Unpivoting)

df.groupBy("Name").pivot("Year").sum("Sales").show()

6. PySpark Functions for Data Manipulation

FunctionDescription
col("col_name")Access column in DataFrame.
lit(value)Creates a column with a literal value.
when(condition, value)Conditional statement similar to if in SQL.
isNull(), isNotNull()Check if column values are null/not null.
alias("alias_name")Rename column in query.
concat(col1, col2)Concatenate multiple columns.
regexp_replace(col, pattern, replacement)Replace regex matches.

7. Optimization Techniques

TechniqueCommandDescription
Persist/Cachedf.cache(), df.persist()Cache or persist DataFrame in memory for reuse.
Repartitioningdf.repartition(num_partitions)Increase or reduce number of partitions.
Coalescedf.coalesce(num_partitions)Reduce number of partitions without shuffle.
Broadcast Joinbroadcast(df)Optimize joins for small tables using broadcast join.
Avoid Using Too Many Partitionsspark.sql.shuffle.partitions = 200Set reasonable number of partitions for shuffling.
Partitioning on Keysdf.write.partitionBy("col").parquet("path")Partition data by column to improve reading efficiency.
Avoid Data SkewUse salting or avoid uneven distribution in join columns.Prevent performance issues caused by skewed data.
Filter EarlyApply filters early to reduce data size for downstream ops.Reduces data processing and memory usage.

8. Handling Missing Data

OperationCommandDescription
Drop Missing Valuesdf.na.drop()Drop rows with any NaN values.
Fill Missing Valuesdf.na.fill(value)Fill all NaN with a specific value.
Fill Specific Columnsdf.na.fill({'col1': 0, 'col2': 'unknown'})Fill NaN values for specific columns.

9. Window Functions

Syntax for Window Functions

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank

window_spec = Window.partitionBy("col1").orderBy("col2")

# Using row_number
df.withColumn("row_num", row_number().over(window_spec)).show()

# Using rank
df.withColumn("rank", rank().over(window_spec)).show()
FunctionDescription
row_number()Assigns a unique number to each row within a window.
rank()Ranks rows within a window, ties get same rank.
dense_rank()Ranks rows with no gaps in ranking.
lag()Accesses previous row’s value within a window.
lead()Accesses next row’s value within a window.

10. Common PySpark Configuration Settings

ConfigurationDescription
spark.executor.memoryAllocates memory per executor (e.g., 2g).
spark.driver.memoryAllocates memory for the driver program.
spark.sql.shuffle.partitionsSets number of partitions for shuffling data.
spark.executor.coresNumber of cores per executor.
spark.default.parallelismDefault number of partitions in RDDs.
spark.sql.autoBroadcastJoinThresholdSets size threshold for broadcast joins.
spark.sql.cache.serializerSerializer to use for cached data.
spark.dynamicAllocation.enabledEnables dynamic allocation of executors.

This cheat sheet offers a high-level overview of PySpark essentials, covering core concepts, commands, and common operations. It serves as a quick reference for PySpark development and optimizations.

Let me know if you need more specific examples or deeper explanations on any section!

Pages: 1 2 3 4 5 6 7 8 9 10 11 12

Pages ( 12 of 12 ): « Previous1 1011 12

Discover more from AI HitsToday

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

About the HintsToday

AI HintsToday is One Stop Adda to learn All about AI, Data, ML, Stat Learning, SAS, SQL, Python, Pyspark. AHT is Future!

Explore the Posts

Latest Comments

Latest posts

Discover more from AI HitsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading