Comparison Between Pandas and PySpark for Data Analysis

6. Window Functions: Window functions allow for advanced aggregations over partitions of data.

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Define a window specification
window_spec = Window.partitionBy('Name').orderBy('Age')

# Apply window function
sdf.withColumn('row_number', row_number().over(window_spec)).show()
  • Pandas:
    • Basic Aggregations: sum(), mean(), median(), min(), max(), std(), var(), count()
    • GroupBy Aggregations: groupby(), agg()
    • Custom Aggregations: agg(lambda x: custom_function(x))
    • Pivot Tables: pivot_table()
    • Resampling: resample()
    • Rolling Aggregations: rolling()
  • PySpark:
    • Basic Aggregations: sum(), avg(), min(), max(), stddev(), variance(), count()
    • GroupBy Aggregations: groupBy(), agg()
    • Custom Aggregations: udf()
    • Pivot Tables: pivot()
    • Window Functions: window(), row_number()

Detailed Example: Translating a Pandas Script to PySpark

Given your original Pandas code, here’s how to translate it to PySpark:

Pandas Code:

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

%matplotlib inline
plt.rcParams["figure.figsize"] = (8, 6)

import warnings
warnings.filterwarnings('ignore')

df.columns
print(df.info())
df.describe()
df.describe(include = ["object"])
df["y"].value_counts()
df["marital"].value_counts(normalize = True)
df.sort_values(by = "duration", ascending = False).head()
df.sort_values(by = ["age", "duration"], ascending = [True, False]).head()
df.apply(np.max)
d = {"no": 0, "yes": 1}
df["y"] = df["y"].map(d)
df.head()
print("Share of attracted clients =", '{:.1%}'.format(df["y"].mean()))
df[df["y"] == 1].mean()
acd = round(df[df["y"] == 1]["duration"].mean(), 2)
acd_in_min = acd // 60
print("Average call duration for attracted clients =", acd_in_min, "min", int(acd) % 60, "sec")
print("Average age of attracted clients =", int(df[(df["y"] == 1) & (df["marital"] == "single")]["age"].mean()), "years")
df[-1:]
pd.crosstab(df["y"], df["marital"])
pd.crosstab(df["y"],
df["marital"],
normalize = 'index')

df.pivot_table(
["age", "duration"],
["job"],
aggfunc = "mean",
).head(10)

pd.plotting.scatter_matrix(
df[["age", "duration", "campaign"]],
figsize = (15, 15),
diagonal = "kde")
plt.show()

df["age"].hist()

df.hist(color = "k",
bins = 30,
figsize = (15, 10))
plt.show()

df.boxplot(column = "age",
by = "marital")
plt.show()

df.boxplot(column = "age",
by = ["marital", "housing"],
figsize = (20, 20))
plt.show()

Equivalent PySpark Code:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, mean, max as spark_max
import matplotlib.pyplot as plt
import pandas as pd
import warnings

# Initialize Spark session
spark = SparkSession.builder.appName("PySpark Example").getOrCreate()

# Load data
df = spark.read.csv("data.csv", header=True, inferSchema=True)

# Data overview
df.printSchema()
df.describe().show()

# Value counts
df.groupBy("y").count().show()
df.groupBy("marital").count().show()

# Sort values
df.orderBy(col("duration").desc()).show(5)
df.orderBy(col("age").asc(), col("duration").desc()).show(5)

# Max values
df.select([spark_max(c).alias(c) for c in df.columns]).show()

# Map values
d = {"no": 0, "yes": 1}
mapping_expr = create_map([lit(x) for x in chain(*d.items())])
df = df.withColumn("y", mapping_expr[col("y")])

# Head
df.show(5)

# Mean value of "y"
df.select(mean("y")).show()

# Average call duration for attracted clients
acd = df.filter(df["y"] == 1).select(mean("duration")).first()[0]
acd_in_min = acd // 60
print("Average call duration for attracted clients =", acd_in_min, "min", int(acd) % 60, "sec")

# Average age of attracted clients
avg_age = df.filter((df["y"] == 1) & (df["marital"] == "single")).select(mean("age")).first()[0]
print("Average age of attracted clients =", int(avg_age), "years")

# Crosstab
df.crosstab("y", "marital").show()

# Pivot table
df.groupBy("job").agg(mean("age"), mean("duration")).show()

# Convert to Pandas for plotting
df_pandas = df.toPandas()

# Scatter matrix
pd.plotting.scatter_matrix(df_pandas[["age", "duration", "campaign"]],
figsize=(15, 15),
diagonal="kde")
plt.show()

# Histograms
df_pandas["age"].hist()
plt.show()

df_pandas.hist(color="k", bins=30, figsize=(15, 10))
plt.show()

# Boxplots
df_pandas.boxplot(column="age", by="marital")
plt.show()

df_pandas.boxplot(column="age", by=["marital", "housing"], figsize=(20, 20))
plt.show()

# Stop Spark session
spark.stop()

Key Differences

  1. Syntax: PySpark requires more verbose syntax, especially for operations like grouping and aggregations.
  2. Execution: PySpark operations are lazily evaluated, meaning transformations are not executed until an action is performed. In contrast, Pandas executes operations eagerly.
  3. Data Handling: PySpark handles distributed data, while Pandas is designed for in-memory operations.
  4. Conversion: Often, PySpark DataFrames need to be converted to Pandas DataFrames for complex visualizations, as shown in the plotting section.

data transactions;
    input id date :date9. amount;
    format date date9.;
    datalines;
1 01JAN2023 100
1 05JAN2023 150
1 10JAN2023 200
2 01JAN2023 300
2 15JAN2023 400
2 20JAN2023 500
3 01JAN2023 250
3 05JAN2023 350
;
run;

/* Sort the data by 'id' and 'date' */
proc sort data=transactions;
    by id date;
run;

/* Use first. and last. to identify first and last transactions for each id */
data transactions_with_flags;
    set transactions;
    by id date;
    if first.id then first_transaction = 1;
    else first_transaction = 0;
    if last.id then last_transaction = 1;
    else last_transaction = 0;
run;

/* Transpose the data to get amounts for each date into separate columns */
proc transpose data=transactions_with_flags out=transposed_data prefix=amount_;
    by id;
    var amount;
    id date;
run;

proc print data=transposed_data;
run;
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, first, last, when, lit

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("SAS_translation").getOrCreate()

# Sample data
data = [(1, "2023-01-01", 100), (1, "2023-01-05", 150), (1, "2023-01-10", 200),
        (2, "2023-01-01", 300), (2, "2023-01-15", 400), (2, "2023-01-20", 500),
        (3, "2023-01-01", 250), (3, "2023-01-05", 350)]
columns = ["id", "date", "amount"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Convert 'date' to date type
df = df.withColumn("date", col("date").cast("date"))

# Window specification
window_spec = Window.partitionBy("id").orderBy("date")

# Add 'first.' and 'last.' equivalent columns using Window functions
df_with_flags = df.withColumn("first_transaction", when(col("date") == first("date").over(window_spec), lit(1)).otherwise(lit(0))) 
                  .withColumn("last_transaction", when(col("date") == last("date").over(window_spec), lit(1)).otherwise(lit(0)))

df_with_flags.show()

# Transpose (pivot) operation
df_pivot = df_with_flags.groupBy("id").pivot("date").sum("amount")
df_pivot.show()
import pandas as pd

# Sample data
data = {'id': [1, 1, 1, 2, 2, 2, 3, 3],
        'date': ['2023-01-01', '2023-01-05', '2023-01-10', '2023-01-01', '2023-01-15', '2023-01-20', '2023-01-01', '2023-01-05'],
        'amount': [100, 150, 200, 300, 400, 500, 250, 350]}

df = pd.DataFrame(data)

# Convert 'date' column to datetime type
df['date'] = pd.to_datetime(df['date'])

# Sort values by 'id' and 'date'
df = df.sort_values(by=['id', 'date'])

# Add 'first_transaction' and 'last_transaction' flags
df['first_transaction'] = df.groupby('id')['date'].transform('first') == df['date']
df['last_transaction'] = df.groupby('id')['date'].transform('last') == df['date']

print(df)

# Transpose (pivot) operation
df_pivot = df.pivot_table(index='id', columns='date', values='amount', aggfunc='sum')
print(df_pivot)

https://www.hintstoday.com/exploratory-data-analysis-eda-with-pandas-in-banking-converted-in-pyspark

Pages ( 8 of 8 ): « Previous1 ... 67 8