6. Window Functions: Window functions allow for advanced aggregations over partitions of data.
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
# Define a window specification
window_spec = Window.partitionBy('Name').orderBy('Age')
# Apply window function
sdf.withColumn('row_number', row_number().over(window_spec)).show()
- Pandas:
- Basic Aggregations:
sum()
,mean()
,median()
,min()
,max()
,std()
,var()
,count()
- GroupBy Aggregations:
groupby()
,agg()
- Custom Aggregations:
agg(lambda x: custom_function(x))
- Pivot Tables:
pivot_table()
- Resampling:
resample()
- Rolling Aggregations:
rolling()
- Basic Aggregations:
- PySpark:
- Basic Aggregations:
sum()
,avg()
,min()
,max()
,stddev()
,variance()
,count()
- GroupBy Aggregations:
groupBy()
,agg()
- Custom Aggregations:
udf()
- Pivot Tables:
pivot()
- Window Functions:
window()
,row_number()
- Basic Aggregations:
Detailed Example: Translating a Pandas Script to PySpark
Given your original Pandas code, here’s how to translate it to PySpark:
Pandas Code:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline
plt.rcParams["figure.figsize"] = (8, 6)
import warnings
warnings.filterwarnings('ignore')
df.columns
print(df.info())
df.describe()
df.describe(include = ["object"])
df["y"].value_counts()
df["marital"].value_counts(normalize = True)
df.sort_values(by = "duration", ascending = False).head()
df.sort_values(by = ["age", "duration"], ascending = [True, False]).head()
df.apply(np.max)
d = {"no": 0, "yes": 1}
df["y"] = df["y"].map(d)
df.head()
print("Share of attracted clients =", '{:.1%}'.format(df["y"].mean()))
df[df["y"] == 1].mean()
acd = round(df[df["y"] == 1]["duration"].mean(), 2)
acd_in_min = acd // 60
print("Average call duration for attracted clients =", acd_in_min, "min", int(acd) % 60, "sec")
print("Average age of attracted clients =", int(df[(df["y"] == 1) & (df["marital"] == "single")]["age"].mean()), "years")
df[-1:]
pd.crosstab(df["y"], df["marital"])
pd.crosstab(df["y"],
df["marital"],
normalize = 'index')
df.pivot_table(
["age", "duration"],
["job"],
aggfunc = "mean",
).head(10)
pd.plotting.scatter_matrix(
df[["age", "duration", "campaign"]],
figsize = (15, 15),
diagonal = "kde")
plt.show()
df["age"].hist()
df.hist(color = "k",
bins = 30,
figsize = (15, 10))
plt.show()
df.boxplot(column = "age",
by = "marital")
plt.show()
df.boxplot(column = "age",
by = ["marital", "housing"],
figsize = (20, 20))
plt.show()
Equivalent PySpark Code:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, mean, max as spark_max
import matplotlib.pyplot as plt
import pandas as pd
import warnings
# Initialize Spark session
spark = SparkSession.builder.appName("PySpark Example").getOrCreate()
# Load data
df = spark.read.csv("data.csv", header=True, inferSchema=True)
# Data overview
df.printSchema()
df.describe().show()
# Value counts
df.groupBy("y").count().show()
df.groupBy("marital").count().show()
# Sort values
df.orderBy(col("duration").desc()).show(5)
df.orderBy(col("age").asc(), col("duration").desc()).show(5)
# Max values
df.select([spark_max(c).alias(c) for c in df.columns]).show()
# Map values
d = {"no": 0, "yes": 1}
mapping_expr = create_map([lit(x) for x in chain(*d.items())])
df = df.withColumn("y", mapping_expr[col("y")])
# Head
df.show(5)
# Mean value of "y"
df.select(mean("y")).show()
# Average call duration for attracted clients
acd = df.filter(df["y"] == 1).select(mean("duration")).first()[0]
acd_in_min = acd // 60
print("Average call duration for attracted clients =", acd_in_min, "min", int(acd) % 60, "sec")
# Average age of attracted clients
avg_age = df.filter((df["y"] == 1) & (df["marital"] == "single")).select(mean("age")).first()[0]
print("Average age of attracted clients =", int(avg_age), "years")
# Crosstab
df.crosstab("y", "marital").show()
# Pivot table
df.groupBy("job").agg(mean("age"), mean("duration")).show()
# Convert to Pandas for plotting
df_pandas = df.toPandas()
# Scatter matrix
pd.plotting.scatter_matrix(df_pandas[["age", "duration", "campaign"]],
figsize=(15, 15),
diagonal="kde")
plt.show()
# Histograms
df_pandas["age"].hist()
plt.show()
df_pandas.hist(color="k", bins=30, figsize=(15, 10))
plt.show()
# Boxplots
df_pandas.boxplot(column="age", by="marital")
plt.show()
df_pandas.boxplot(column="age", by=["marital", "housing"], figsize=(20, 20))
plt.show()
# Stop Spark session
spark.stop()
Key Differences
- Syntax: PySpark requires more verbose syntax, especially for operations like grouping and aggregations.
- Execution: PySpark operations are lazily evaluated, meaning transformations are not executed until an action is performed. In contrast, Pandas executes operations eagerly.
- Data Handling: PySpark handles distributed data, while Pandas is designed for in-memory operations.
- Conversion: Often, PySpark DataFrames need to be converted to Pandas DataFrames for complex visualizations, as shown in the plotting section.
data transactions;
input id date :date9. amount;
format date date9.;
datalines;
1 01JAN2023 100
1 05JAN2023 150
1 10JAN2023 200
2 01JAN2023 300
2 15JAN2023 400
2 20JAN2023 500
3 01JAN2023 250
3 05JAN2023 350
;
run;
/* Sort the data by 'id' and 'date' */
proc sort data=transactions;
by id date;
run;
/* Use first. and last. to identify first and last transactions for each id */
data transactions_with_flags;
set transactions;
by id date;
if first.id then first_transaction = 1;
else first_transaction = 0;
if last.id then last_transaction = 1;
else last_transaction = 0;
run;
/* Transpose the data to get amounts for each date into separate columns */
proc transpose data=transactions_with_flags out=transposed_data prefix=amount_;
by id;
var amount;
id date;
run;
proc print data=transposed_data;
run;
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, first, last, when, lit
# Initialize Spark session
spark = SparkSession.builder.master("local").appName("SAS_translation").getOrCreate()
# Sample data
data = [(1, "2023-01-01", 100), (1, "2023-01-05", 150), (1, "2023-01-10", 200),
(2, "2023-01-01", 300), (2, "2023-01-15", 400), (2, "2023-01-20", 500),
(3, "2023-01-01", 250), (3, "2023-01-05", 350)]
columns = ["id", "date", "amount"]
# Create DataFrame
df = spark.createDataFrame(data, columns)
# Convert 'date' to date type
df = df.withColumn("date", col("date").cast("date"))
# Window specification
window_spec = Window.partitionBy("id").orderBy("date")
# Add 'first.' and 'last.' equivalent columns using Window functions
df_with_flags = df.withColumn("first_transaction", when(col("date") == first("date").over(window_spec), lit(1)).otherwise(lit(0)))
.withColumn("last_transaction", when(col("date") == last("date").over(window_spec), lit(1)).otherwise(lit(0)))
df_with_flags.show()
# Transpose (pivot) operation
df_pivot = df_with_flags.groupBy("id").pivot("date").sum("amount")
df_pivot.show()
import pandas as pd
# Sample data
data = {'id': [1, 1, 1, 2, 2, 2, 3, 3],
'date': ['2023-01-01', '2023-01-05', '2023-01-10', '2023-01-01', '2023-01-15', '2023-01-20', '2023-01-01', '2023-01-05'],
'amount': [100, 150, 200, 300, 400, 500, 250, 350]}
df = pd.DataFrame(data)
# Convert 'date' column to datetime type
df['date'] = pd.to_datetime(df['date'])
# Sort values by 'id' and 'date'
df = df.sort_values(by=['id', 'date'])
# Add 'first_transaction' and 'last_transaction' flags
df['first_transaction'] = df.groupby('id')['date'].transform('first') == df['date']
df['last_transaction'] = df.groupby('id')['date'].transform('last') == df['date']
print(df)
# Transpose (pivot) operation
df_pivot = df.pivot_table(index='id', columns='date', values='amount', aggfunc='sum')
print(df_pivot)
https://www.hintstoday.com/exploratory-data-analysis-eda-with-pandas-in-banking-converted-in-pyspark