Comparison Between Pandas and PySpark for Data Analysis

Pandas and PySpark are both popular tools for data analysis, but they serve different purposes and are optimized for different scales and types of data processing. Here’s a comparison focusing on their capabilities, performance, and use cases.

Understanding how PySpark works compared to Pandas involves grasping their underlying architectures, data handling capabilities, and typical usage scenarios. Here’s a breakdown of how each works:

Pandas

Architecture and Data Handling:

  • Single-Node Processing: Pandas operates as an in-memory, single-node data analysis library for Python.
  • DataFrame Structure: Data is stored in a DataFrame, which is essentially a two-dimensional labeled data structure with columns of potentially different types (similar to a spreadsheet or SQL table).
  • Data Size Limitation: Limited by available memory on a single machine, suitable for datasets that can fit into memory (typically up to a few gigabytes depending on the machine).
  • Operations: Supports a wide range of data manipulation operations including filtering, aggregation, merging, reshaping, and more, optimized for single-machine execution.
  • Ease of Use: Designed for ease of use and interactive data analysis, making it popular among data analysts and scientists.

Usage Scenarios:

  • Ideal for exploratory data analysis (EDA), data cleaning, and data preprocessing tasks on smaller to medium-sized datasets.
  • Well-suited for rapid prototyping and development due to its interactive nature and rich set of functions.

PySpark

Architecture and Data Handling:

  • Distributed Computing: PySpark is built on top of Apache Spark, a distributed computing framework, designed for processing and analyzing large-scale data sets across a cluster.
  • Resilient Distributed Datasets (RDDs): PySpark operates on RDDs (in earlier versions) or DataFrames (in newer versions), which are distributed collections of data spread across multiple machines in a cluster.
  • Lazy Evaluation: PySpark uses lazy evaluation to optimize the execution of workflows by deferring execution until necessary.
  • Fault Tolerance: Offers fault tolerance through lineage information, enabling recovery of lost data due to node failures.
  • Performance: Scalable and performs well on large datasets, leveraging in-memory computing and parallel processing across nodes in the cluster.

Usage Scenarios:

  • Big Data Processing: PySpark is suitable for processing and analyzing massive datasets that exceed the capacity of a single machine’s memory.
  • Data Engineering: Used for tasks such as ETL (Extract, Transform, Load), data cleansing, and data preparation in big data pipelines.
  • Machine Learning: Integrates seamlessly with MLlib (Spark’s machine learning library) for scalable machine learning model training and evaluation.

Key Differences

  1. Execution Model:
    • Pandas: Executes operations on a single machine in-memory.
    • PySpark: Distributes computations across a cluster of machines, utilizing distributed memory.
  2. Scalability:
    • Pandas: Limited to data sizes that fit within the memory of a single machine.
    • PySpark: Scales horizontally to handle large datasets by distributing computations across multiple nodes in a cluster.
  3. Performance:
    • Pandas: Optimized for single-machine performance but may face limitations with very large datasets.
    • PySpark: Offers superior performance on large-scale data due to parallel processing and in-memory computing.
  4. Fault Tolerance:
    • Pandas: No built-in fault tolerance; operations are limited to the resources of a single machine.
    • PySpark: Provides fault tolerance through RDD lineage and data replication across nodes.
  5. Use Case:
    • Pandas: Best suited for interactive data analysis, EDA, and small to medium-sized data tasks.
    • PySpark: Ideal for big data processing, data engineering at scale, and distributed machine learning tasks.

In essence, Pandas is a powerful tool for data analysis and manipulation on a single machine, suitable for datasets that fit into memory. PySpark, on the other hand, is designed for big data processing, leveraging distributed computing across a cluster to handle massive datasets efficiently. Choosing between Pandas and PySpark depends on the scale of your data, performance requirements, and the need for distributed computing capabilities.

Comparison in Detail

1. Scale and Performance

Pandas:

  • Scale: Pandas is designed for in-memory data manipulation, making it suitable for small to medium-sized datasets that fit in the memory of a single machine.
  • Performance: Pandas can be very fast for operations on smaller datasets due to its optimized, C-based implementation. However, its performance can degrade significantly as data size grows.

PySpark:

  • Scale: PySpark, part of the Apache Spark ecosystem, is built for distributed computing and can handle very large datasets that do not fit in the memory of a single machine.
  • Performance: PySpark leverages distributed computing, making it capable of processing terabytes of data efficiently. It can also optimize query plans and take advantage of parallel processing across a cluster.

2. Ease of Use

Pandas:

  • Ease of Use: Pandas is known for its user-friendly and intuitive API. It’s easy to learn and use, especially for those familiar with Python.
  • API: The Pandas API is rich and expressive, with a wide range of functions for data manipulation, analysis, and visualization.

PySpark:

  • Ease of Use: PySpark can be more complex to set up and use due to its distributed nature and the need for a Spark cluster.
  • API: The PySpark DataFrame API is similar to Pandas, but it requires an understanding of Spark concepts like RDDs, Spark SQL, and distributed processing.

3. Data Handling and Processing

Pandas:

  • Data Handling: Pandas is excellent for handling and manipulating structured data, such as CSV files, SQL databases, and Excel files.
  • Processing: Pandas operations are typically performed in-memory, which can lead to memory limitations for very large datasets.

PySpark:

  • Data Handling: PySpark can handle both structured and unstructured data, and it integrates well with various data sources, including HDFS, S3, Cassandra, HBase, and more.
  • Processing: PySpark processes data in a distributed manner, which allows it to scale to large datasets. It can also handle complex data pipelines and workflows.

4. Integration and Ecosystem

Pandas:

  • Integration: Pandas integrates seamlessly with other Python libraries such as NumPy, Matplotlib, and Scikit-Learn, making it a powerful tool for data analysis and machine learning.
  • Ecosystem: Pandas is part of the broader Python data science ecosystem, which includes Jupyter Notebooks, SciPy, and TensorFlow.

PySpark:

  • Integration: PySpark integrates well with the Spark ecosystem, including Spark SQL, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for real-time data processing.
  • Ecosystem: PySpark is part of the larger Apache Spark ecosystem, which is used for big data processing and analytics across various industries.

5. Setup and Configuration

Pandas:

  • Setup: Pandas is easy to install and requires minimal configuration. It can be installed using pip or conda.
  • Configuration: Pandas requires little to no configuration to get started.

PySpark:

  • Setup: PySpark can be more challenging to set up, especially for distributed computing environments. It requires a Spark cluster, which can be set up locally or on cloud platforms like AWS, Azure, or Google Cloud.
  • Configuration: PySpark requires more configuration, including setting up SparkContext, managing cluster resources, and tuning Spark parameters.

6. Example Use Cases

Pandas:

  • Use Cases: Ideal for exploratory data analysis, small to medium-sized data processing tasks, data cleaning, and preparation for machine learning.
  • Example: Analyzing a CSV file with a few million rows, performing data cleaning and transformation, and visualizing the results.

PySpark:

  • Use Cases: Suitable for big data processing, ETL pipelines, large-scale data analysis, and distributed machine learning.
  • Example: Processing and analyzing terabytes of log data from a web application, building a distributed machine learning pipeline for predictive analytics.
  • mains, and the choice depends on your specific requirements, the scale of your data, and your familiarity with the toolset.
On WhatPandas DataFrameSpark DataFrame
ArchitectureSingle-node, in-memory data structureDistributed, operates on a cluster of machines
ScalabilityLimited to data that fits into memory of a single machineScales horizontally across a cluster of machines
Data HandlingHandles data in-memory on a single machineHandles distributed data across multiple machines
Fault ToleranceNo built-in fault toleranceFault-tolerant through RDD lineage and data replication
Execution ModelExecutes operations on a single machineDistributes computations across a cluster of machines
PerformanceOptimized for single-machine performanceOptimized for parallel processing and in-memory computing
Use CasesInteractive data analysis, small to medium-sized datasetsBig data processing, large-scale data engineering tasks
Ease of UseUser-friendly, interactiveRequires understanding of distributed computing concepts
Library EcosystemRich ecosystem for data manipulation and analysisIntegrated with Spark’s ecosystem, including MLlib for ML
Integration with MLLimited to single-machine capabilitiesIntegrates with Spark MLlib for scalable machine learning
Programming LanguagePythonPython, Scala, Java, R (via SparkR and SparklyR interfaces)

Here’s a detailed comparison between Pandas and PySpark, focusing on syntax, functions, and operations. This will help understand how to translate code from Pandas to PySpark.

Pandas vs. PySpark: Basic Operations

Loading Data

Creating DataFrames from various data sources is a common task in both Pandas and PySpark. Below are examples of how to create DataFrames from CSV files, JSON files, SQL databases, and directly from Python dictionaries for both Pandas and PySpark.

1. From CSV File

Pandas:

import pandas as pd

# Create DataFrame from CSV file
pdf = pd.read_csv('path/to/your/file.csv')
print(pdf)

PySpark:

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Create DataFrame from CSV file
sdf = spark.read.csv('path/to/your/file.csv', header=True, inferSchema=True)
sdf.show()

2. From JSON File

Pandas:

import pandas as pd

# Create DataFrame from JSON file
pdf = pd.read_json('path/to/your/file.json')
print(pdf)

PySpark:

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Create DataFrame from JSON file
sdf = spark.read.json('path/to/your/file.json')
sdf.show()

3. From SQL Database

Pandas:

import pandas as pd
import sqlalchemy

# Create an engine instance
engine = sqlalchemy.create_engine('postgresql://username:password@host:port/database')

# Create DataFrame from SQL query
pdf = pd.read_sql_query('SELECT * FROM table_name', engine)
print(pdf)

PySpark:

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Create DataFrame from SQL query
sdf = spark.read.format("jdbc").options(
    url='jdbc:postgresql://host:port/database',
    driver='org.postgresql.Driver',
    dbtable='table_name',
    user='username',
    password='password'
).load()
sdf.show()

4. From Python Dictionary

Pandas:

import pandas as pd

data = {
    'Name': ['Alice', 'Bob', 'Charlie'],
    'Age': [25, 30, 35]
}

# Create DataFrame from dictionary
pdf = pd.DataFrame(data)
print(pdf)

PySpark:

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

data = [('Alice', 25), ('Bob', 30), ('Charlie', 35)]
columns = ['Name', 'Age']

# Create DataFrame from list of tuples
sdf = spark.createDataFrame(data, columns)
sdf.show()
  • Pandas: Directly creates DataFrames from in-memory structures, files, and databases using a straightforward, intuitive syntax. Ideal for smaller datasets and interactive data analysis.
  • PySpark: Handles large datasets and distributed computing efficiently. Requires a Spark session and uses a different set of functions for loading data from various sources.

Viewing Data

Pandas:

df.head()
df.info()
df.describe()

PySpark:

df_csv.show()
df_csv.printSchema()
df_csv.describe().show()

Viewing data is an essential part of data analysis and manipulation. Both Pandas and PySpark provide various methods for inspecting DataFrames. Below is a detailed comparison of these methods in Pandas and PySpark, including options like vertical view in PySpark’s show method.

Viewing Data in Pandas

1. head(): View the first few rows

import pandas as pd

data = {
    'Name': ['Alice', 'Bob', 'Charlie'],
    'Age': [25, 30, 35]
}

pdf = pd.DataFrame(data)

# View the first 5 rows
print(pdf.head())

# View the first n rows
print(pdf.head(10))

2. tail(): View the last few rows

# View the last 5 rows
print(pdf.tail())

# View the last n rows
print(pdf.tail(10))

3. info(): Summary of the DataFrame

# Summary of DataFrame
print(pdf.info())

4. describe(): Summary statistics

# Summary statistics of DataFrame
print(pdf.describe())

5. sample(): Random sample of rows

# Random sample of 5 rows
print(pdf.sample(5))

# Random sample of n rows
print(pdf.sample(10))

6. iloc[]: Access rows by integer-location based indexing

# Access rows by index
print(pdf.iloc[0:5]) # First 5 rows

7. loc[]: Access rows by label

# Access rows by label
print(pdf.loc[0:5]) # First 5 rows

Viewing Data in PySpark

1. show(): View the first few rows

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

data = [('Alice', 25), ('Bob', 30), ('Charlie', 35)]
columns = ['Name', 'Age']

sdf = spark.createDataFrame(data, columns)

# View the first 20 rows (default)
sdf.show()

# View the first n rows
sdf.show(10)

2. show() with truncate: Control column width

# Truncate long strings to 20 characters (default)
sdf.show(truncate=True)

# Do not truncate strings
sdf.show(truncate=False)

# Truncate strings to a specific length
sdf.show(truncate=5)

3. show() with vertical: Vertical display of rows

# Vertical display of rows
sdf.show(vertical=True)

4. printSchema(): Print the schema of the DataFrame

# Print the schema
sdf.printSchema()

5. describe(): Summary statistics

# Summary statistics of DataFrame
sdf.describe().show()

6. head(): Retrieve the first row or n rows

# Retrieve the first row
print(sdf.head())

# Retrieve the first n rows
print(sdf.head(5))

7. take(): Retrieve the first n rows

# Retrieve the first n rows as a list of Row objects
print(sdf.take(5))

8. collect(): Retrieve all rows

# Retrieve all rows as a list of Row objects
all_rows = sdf.collect()
for row in all_rows:
print(row)

  • Pandas:
    • head(), tail(): For viewing first and last few rows.
    • info(), describe(): For summary and statistics.
    • sample(): For random sampling.
    • iloc[], loc[]: For row-based access.
  • PySpark:
    • show(): Flexible method for displaying rows with options like truncate and vertical.
    • printSchema(): For printing the schema.
    • describe(): For summary statistics.
    • head(), take(): For retrieving specific number of rows.
    • collect(): For retrieving all rows.

Both Pandas and PySpark provide comprehensive options for viewing and inspecting DataFrames, with PySpark offering additional flexibility through the show method’s parameters.


Selecting Columns

Selecting columns is a fundamental operation in data manipulation. Both Pandas and PySpark provide a variety of ways to select columns. Below, I detail all the options and functions available for selecting columns in both libraries.

Selecting Columns in Pandas

1. Selecting a Single Column:

import pandas as pd

data = {
    'Name': ['Alice', 'Bob', 'Charlie'],
    'Age': [25, 30, 35],
    'City': ['New York', 'Los Angeles', 'Chicago']
}

pdf = pd.DataFrame(data)

# Selecting a single column
print(pdf['Name'])
print(pdf.Name)  # Alternative syntax

2. Selecting Multiple Columns:

# Selecting multiple columns
print(pdf[['Name', 'Age']])

3. Using loc[]:

# Selecting columns using loc
print(pdf.loc[:, ['Name', 'Age']])

4. Using iloc[]:

# Selecting columns by index using iloc
print(pdf.iloc[:, [0, 1]]) # First two columns

5. Using Column Index:

# Selecting columns by index
print(pdf.iloc[:, [0, 2]]) # First and third columns

6. Selecting Columns Based on Data Type:

# Selecting columns based on data type
print(pdf.select_dtypes(include=['int64']))
print(pdf.select_dtypes(include=['object']))

7. Using Filter Method:

# Using filter method to select columns
print(pdf.filter(['Name', 'City']))

Selecting Columns in PySpark

1. Selecting a Single Column:

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

data = [('Alice', 25, 'New York'), ('Bob', 30, 'Los Angeles'), ('Charlie', 35, 'Chicago')]
columns = ['Name', 'Age', 'City']

sdf = spark.createDataFrame(data, columns)

# Selecting a single column
sdf.select('Name').show()

2. Selecting Multiple Columns:

# Selecting multiple columns
sdf.select('Name', 'Age').show()

3. Using selectExpr:

# Using selectExpr to select columns
sdf.selectExpr('Name', 'Age').show()

4. Using Column Object:

from pyspark.sql.functions import col

# Selecting columns using col function
sdf.select(col('Name'), col('Age')).show()

5. Using Column Index:

# Selecting columns by index (using df.columns)
selected_columns = sdf.select([sdf.columns[i] for i in [0, 2]])
selected_columns.show()

6. Selecting Columns Based on Condition:

# Selecting columns based on condition (example: starting with 'A')
selected_columns = sdf.select([col for col in sdf.columns if col.startswith('A')])
selected_columns.show()

7. Dropping Columns:

# Dropping columns and selecting the rest
sdf.drop('Age').show()
  • Pandas:
    • Single Column: pdf['column'], pdf.column
    • Multiple Columns: pdf[['col1', 'col2']]
    • Using loc[]: pdf.loc[:, ['col1', 'col2']]
    • Using iloc[]: pdf.iloc[:, [index1, index2]]
    • Column Index: pdf.iloc[:, [index1, index2]]
    • Data Type: pdf.select_dtypes(include=['dtype'])
    • Filter: pdf.filter(['col1', 'col2'])
  • PySpark:
    • Single Column: sdf.select('column')
    • Multiple Columns: sdf.select('col1', 'col2')
    • Using selectExpr: sdf.selectExpr('col1', 'col2')
    • Using Column Object: sdf.select(col('col1'), col('col2'))
    • Column Index: sdf.select([sdf.columns[index1], sdf.columns[index2]])
    • Condition: sdf.select([col for col in sdf.columns if condition])
    • Dropping Columns: sdf.drop('column')

Filtering Data

Filtering data is a crucial step in data analysis and manipulation. Both Pandas and PySpark offer various methods and functions for filtering DataFrames. Below are detailed examples of all the options and functions available for filtering data in both libraries.

Filtering Data in Pandas

1. Filtering with a Single Condition:

import pandas as pd

data = {
    'Name': ['Alice', 'Bob', 'Charlie'],
    'Age': [25, 30, 35],
    'City': ['New York', 'Los Angeles', 'Chicago']
}

pdf = pd.DataFrame(data)

# Filtering with a single condition
filtered_pdf = pdf[pdf['Age'] > 30]
print(filtered_pdf)

2. Filtering with Multiple Conditions:

# Filtering with multiple conditions using & and |
filtered_pdf = pdf[(pdf['Age'] > 25) & (pdf['City'] == 'Los Angeles')]
print(filtered_pdf)

3. Using query() Method:

# Filtering using query method
filtered_pdf = pdf.query('Age > 25 and City == "Los Angeles"')
print(filtered_pdf)

4. Filtering with isin():

# Filtering using isin method
filtered_pdf = pdf[pdf['City'].isin(['New York', 'Chicago'])]
print(filtered_pdf)

5. Filtering with str.contains():

# Filtering using str.contains method for string operations
filtered_pdf = pdf[pdf['City'].str.contains('New')]
print(filtered_pdf)

6. Filtering with between():

# Filtering using between method
filtered_pdf = pdf[pdf['Age'].between(25, 30)]
print(filtered_pdf)

7. Filtering with loc[]:

# Filtering using loc
filtered_pdf = pdf.loc[pdf['Age'] > 30]
print(filtered_pdf)

8. Filtering with mask():

# Filtering using mask (inverse of filter)
masked_pdf = pdf.mask(pdf['Age'] > 30)
print(masked_pdf)

9. Filtering with filter():

# Filtering rows using filter method
filtered_pdf = pdf.filter(items=[0, 2], axis=0) # Filter rows by index
print(filtered_pdf)

Filtering Data in PySpark

1. Filtering with a Single Condition:

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

data = [('Alice', 25, 'New York'), ('Bob', 30, 'Los Angeles'), ('Charlie', 35, 'Chicago')]
columns = ['Name', 'Age', 'City']

sdf = spark.createDataFrame(data, columns)

# Filtering with a single condition
filtered_sdf = sdf.filter(sdf['Age'] > 30)
filtered_sdf.show()

2. Filtering with Multiple Conditions:

# Filtering with multiple conditions using & and |
filtered_sdf = sdf.filter((sdf['Age'] > 25) & (sdf['City'] == 'Los Angeles'))
filtered_sdf.show()

3. Using where() Method:

# Filtering using where method
filtered_sdf = sdf.where(sdf['Age'] > 30)
filtered_sdf.show()

4. Filtering with isin():

# Filtering using isin method
filtered_sdf = sdf.filter(sdf['City'].isin(['New York', 'Chicago']))
filtered_sdf.show()

5. Filtering with contains():

# Filtering using contains method for string operations
filtered_sdf = sdf.filter(sdf['City'].contains('New'))
filtered_sdf.show()

6. Filtering with between():

# Filtering using between method
filtered_sdf = sdf.filter(sdf['Age'].between(25, 30))
filtered_sdf.show()

7. Filtering with SQL Expression:

# Register the DataFrame as a SQL temporary view
sdf.createOrReplaceTempView("people")

# SQL query
filtered_sdf = spark.sql("SELECT * FROM people WHERE Age > 30")
filtered_sdf.show()

8. Using selectExpr() with Conditional Expression:

# Filtering using selectExpr with conditional expression
filtered_sdf = sdf.selectExpr("Name", "Age", "City").filter("Age > 30")
filtered_sdf.show()
  • Pandas:
    • Single Condition: pdf[pdf['column'] condition]
    • Multiple Conditions: pdf[(pdf['col1'] condition) & (pdf['col2'] condition)]
    • Using query() Method: pdf.query('condition')
    • Using isin(): pdf[pdf['column'].isin(['value1', 'value2'])]
    • Using str.contains(): pdf[pdf['column'].str.contains('pattern')]
    • Using between(): pdf[pdf['column'].between(start, end)]
    • Using loc[]: pdf.loc[pdf['column'] condition]
    • Using mask(): pdf.mask(pdf['column'] condition)
    • Using filter(): pdf.filter(items=[index1, index2], axis=0)
  • PySpark:
    • Single Condition: sdf.filter(sdf['column'] condition)
    • Multiple Conditions: sdf.filter((sdf['col1'] condition) & (sdf['col2'] condition))
    • Using where() Method: sdf.where(sdf['column'] condition)
    • Using isin(): sdf.filter(sdf['column'].isin(['value1', 'value2']))
    • Using contains(): sdf.filter(sdf['column'].contains('pattern'))
    • Using between(): sdf.filter(sdf['column'].between(start, end))
    • Using SQL Expression: spark.sql("SELECT * FROM view WHERE condition")
    • Using selectExpr() with Conditional Expression: sdf.selectExpr("col1", "col2").filter("condition")

Aggregation

Aggregation is a fundamental operation for summarizing data, and both Pandas and PySpark provide extensive support for various aggregation functions and options. Below is a detailed overview of how to perform aggregations in both libraries, including all functions and options available.

Aggregation in Pandas

1. Basic Aggregation Functions: Pandas provides several built-in aggregation functions that can be applied directly to DataFrames or Series.

import pandas as pd

data = {
    'Name': ['Alice', 'Bob', 'Charlie', 'Alice', 'Bob'],
    'Age': [25, 30, 35, 28, 33],
    'Salary': [70000, 80000, 90000, 75000, 85000]
}

pdf = pd.DataFrame(data)

# Basic aggregation functions
print(pdf['Age'].sum())  # Sum
print(pdf['Age'].mean())  # Mean
print(pdf['Age'].median())  # Median
print(pdf['Age'].min())  # Minimum
print(pdf['Age'].max())  # Maximum
print(pdf['Age'].std())  # Standard Deviation
print(pdf['Age'].var())  # Variance
print(pdf['Age'].count())  # Count

2. GroupBy Aggregations: Aggregating data by groups is a common operation, and Pandas provides the groupby() method for this purpose.

# GroupBy aggregation
grouped = pdf.groupby('Name')
print(grouped['Salary'].sum())  # Sum of salaries by name
print(grouped['Age'].mean())  # Mean age by name

3. Aggregate Multiple Functions: You can apply multiple aggregation functions simultaneously using the agg() method.

# Aggregate multiple functions
print(grouped['Salary'].agg(['sum', 'mean', 'max']))

4. Custom Aggregation Functions: You can also define custom aggregation functions.

# Custom aggregation function
print(grouped['Salary'].agg(lambda x: x.max() - x.min()))

5. Pivot Tables: Pivot tables provide a way to aggregate data across multiple dimensions.

# Pivot table
pivot_table = pdf.pivot_table(values='Salary', index='Name', aggfunc='sum')
print(pivot_table)

6. Resample Time-Series Data: Resampling is used for time-series data to aggregate data based on a new time frequency.

# Resample time-series data
date_range = pd.date_range(start='1/1/2021', periods=5, freq='D')
pdf['Date'] = date_range

# Set the index to the date column
pdf.set_index('Date', inplace=True)

# Resample by week and sum
resampled = pdf.resample('W').sum()
print(resampled)

7. Rolling Aggregations: Rolling aggregations provide a way to calculate moving statistics.

# Rolling aggregation
print(pdf['Salary'].rolling(window=2).mean())

Aggregation in PySpark

1. Basic Aggregation Functions: PySpark provides a similar set of aggregation functions as Pandas.

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, avg, min, max, stddev, variance, count

# Initialize Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

data = [('Alice', 25, 70000), ('Bob', 30, 80000), ('Charlie', 35, 90000), ('Alice', 28, 75000), ('Bob', 33, 85000)]
columns = ['Name', 'Age', 'Salary']

sdf = spark.createDataFrame(data, columns)

# Basic aggregation functions
sdf.select(sum('Age')).show()
sdf.select(avg('Age')).show()
sdf.select(min('Age')).show()
sdf.select(max('Age')).show()
sdf.select(stddev('Age')).show()
sdf.select(variance('Age')).show()
sdf.select(count('Age')).show()

2. GroupBy Aggregations: PySpark provides the groupBy() method for grouping data and applying aggregations.

# GroupBy aggregation
grouped = sdf.groupBy('Name')
grouped.agg(sum('Salary')).show()
grouped.agg(avg('Age')).show()

3. Aggregate Multiple Functions: You can apply multiple aggregation functions using the agg() method.

# Aggregate multiple functions
grouped.agg(sum('Salary'), avg('Age'), max('Salary')).show()

4. Custom Aggregation Functions: You can define custom aggregation functions in PySpark as well.

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# Define a custom aggregation function
def range_udf(column):
return max(column) - min(column)

range_udf = udf(range_udf, DoubleType())

# Register the custom UDF
spark.udf.register("range_udf", range_udf)

# Use the custom UDF in aggregation
grouped.agg(range_udf('Salary')).show()

5. Pivot Tables: Pivot tables in PySpark can be created using the pivot() method.

# Pivot table
pivot_table = sdf.groupBy('Name').pivot('Age').agg(sum('Salary'))
pivot_table.show()

6. Window Functions: Window functions allow for advanced aggregations over partitions of data.

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Define a window specification
window_spec = Window.partitionBy('Name').orderBy('Age')

# Apply window function
sdf.withColumn('row_number', row_number().over(window_spec)).show()
  • Pandas:
    • Basic Aggregations: sum(), mean(), median(), min(), max(), std(), var(), count()
    • GroupBy Aggregations: groupby(), agg()
    • Custom Aggregations: agg(lambda x: custom_function(x))
    • Pivot Tables: pivot_table()
    • Resampling: resample()
    • Rolling Aggregations: rolling()
  • PySpark:
    • Basic Aggregations: sum(), avg(), min(), max(), stddev(), variance(), count()
    • GroupBy Aggregations: groupBy(), agg()
    • Custom Aggregations: udf()
    • Pivot Tables: pivot()
    • Window Functions: window(), row_number()

Detailed Example: Translating a Pandas Script to PySpark

Given your original Pandas code, here’s how to translate it to PySpark:

Pandas Code:

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

%matplotlib inline
plt.rcParams["figure.figsize"] = (8, 6)

import warnings
warnings.filterwarnings('ignore')

df.columns
print(df.info())
df.describe()
df.describe(include = ["object"])
df["y"].value_counts()
df["marital"].value_counts(normalize = True)
df.sort_values(by = "duration", ascending = False).head()
df.sort_values(by = ["age", "duration"], ascending = [True, False]).head()
df.apply(np.max)
d = {"no": 0, "yes": 1}
df["y"] = df["y"].map(d)
df.head()
print("Share of attracted clients =", '{:.1%}'.format(df["y"].mean()))
df[df["y"] == 1].mean()
acd = round(df[df["y"] == 1]["duration"].mean(), 2)
acd_in_min = acd // 60
print("Average call duration for attracted clients =", acd_in_min, "min", int(acd) % 60, "sec")
print("Average age of attracted clients =", int(df[(df["y"] == 1) & (df["marital"] == "single")]["age"].mean()), "years")
df[-1:]
pd.crosstab(df["y"], df["marital"])
pd.crosstab(df["y"],
df["marital"],
normalize = 'index')

df.pivot_table(
["age", "duration"],
["job"],
aggfunc = "mean",
).head(10)

pd.plotting.scatter_matrix(
df[["age", "duration", "campaign"]],
figsize = (15, 15),
diagonal = "kde")
plt.show()

df["age"].hist()

df.hist(color = "k",
bins = 30,
figsize = (15, 10))
plt.show()

df.boxplot(column = "age",
by = "marital")
plt.show()

df.boxplot(column = "age",
by = ["marital", "housing"],
figsize = (20, 20))
plt.show()

Equivalent PySpark Code:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, mean, max as spark_max
import matplotlib.pyplot as plt
import pandas as pd
import warnings

# Initialize Spark session
spark = SparkSession.builder.appName("PySpark Example").getOrCreate()

# Load data
df = spark.read.csv("data.csv", header=True, inferSchema=True)

# Data overview
df.printSchema()
df.describe().show()

# Value counts
df.groupBy("y").count().show()
df.groupBy("marital").count().show()

# Sort values
df.orderBy(col("duration").desc()).show(5)
df.orderBy(col("age").asc(), col("duration").desc()).show(5)

# Max values
df.select([spark_max(c).alias(c) for c in df.columns]).show()

# Map values
d = {"no": 0, "yes": 1}
mapping_expr = create_map([lit(x) for x in chain(*d.items())])
df = df.withColumn("y", mapping_expr[col("y")])

# Head
df.show(5)

# Mean value of "y"
df.select(mean("y")).show()

# Average call duration for attracted clients
acd = df.filter(df["y"] == 1).select(mean("duration")).first()[0]
acd_in_min = acd // 60
print("Average call duration for attracted clients =", acd_in_min, "min", int(acd) % 60, "sec")

# Average age of attracted clients
avg_age = df.filter((df["y"] == 1) & (df["marital"] == "single")).select(mean("age")).first()[0]
print("Average age of attracted clients =", int(avg_age), "years")

# Crosstab
df.crosstab("y", "marital").show()

# Pivot table
df.groupBy("job").agg(mean("age"), mean("duration")).show()

# Convert to Pandas for plotting
df_pandas = df.toPandas()

# Scatter matrix
pd.plotting.scatter_matrix(df_pandas[["age", "duration", "campaign"]],
figsize=(15, 15),
diagonal="kde")
plt.show()

# Histograms
df_pandas["age"].hist()
plt.show()

df_pandas.hist(color="k", bins=30, figsize=(15, 10))
plt.show()

# Boxplots
df_pandas.boxplot(column="age", by="marital")
plt.show()

df_pandas.boxplot(column="age", by=["marital", "housing"], figsize=(20, 20))
plt.show()

# Stop Spark session
spark.stop()

Key Differences

  1. Syntax: PySpark requires more verbose syntax, especially for operations like grouping and aggregations.
  2. Execution: PySpark operations are lazily evaluated, meaning transformations are not executed until an action is performed. In contrast, Pandas executes operations eagerly.
  3. Data Handling: PySpark handles distributed data, while Pandas is designed for in-memory operations.
  4. Conversion: Often, PySpark DataFrames need to be converted to Pandas DataFrames for complex visualizations, as shown in the plotting section.

data transactions;
    input id date :date9. amount;
    format date date9.;
    datalines;
1 01JAN2023 100
1 05JAN2023 150
1 10JAN2023 200
2 01JAN2023 300
2 15JAN2023 400
2 20JAN2023 500
3 01JAN2023 250
3 05JAN2023 350
;
run;

/* Sort the data by 'id' and 'date' */
proc sort data=transactions;
    by id date;
run;

/* Use first. and last. to identify first and last transactions for each id */
data transactions_with_flags;
    set transactions;
    by id date;
    if first.id then first_transaction = 1;
    else first_transaction = 0;
    if last.id then last_transaction = 1;
    else last_transaction = 0;
run;

/* Transpose the data to get amounts for each date into separate columns */
proc transpose data=transactions_with_flags out=transposed_data prefix=amount_;
    by id;
    var amount;
    id date;
run;

proc print data=transposed_data;
run;
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, first, last, when, lit

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("SAS_translation").getOrCreate()

# Sample data
data = [(1, "2023-01-01", 100), (1, "2023-01-05", 150), (1, "2023-01-10", 200),
        (2, "2023-01-01", 300), (2, "2023-01-15", 400), (2, "2023-01-20", 500),
        (3, "2023-01-01", 250), (3, "2023-01-05", 350)]
columns = ["id", "date", "amount"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Convert 'date' to date type
df = df.withColumn("date", col("date").cast("date"))

# Window specification
window_spec = Window.partitionBy("id").orderBy("date")

# Add 'first.' and 'last.' equivalent columns using Window functions
df_with_flags = df.withColumn("first_transaction", when(col("date") == first("date").over(window_spec), lit(1)).otherwise(lit(0))) 
                  .withColumn("last_transaction", when(col("date") == last("date").over(window_spec), lit(1)).otherwise(lit(0)))

df_with_flags.show()

# Transpose (pivot) operation
df_pivot = df_with_flags.groupBy("id").pivot("date").sum("amount")
df_pivot.show()
import pandas as pd

# Sample data
data = {'id': [1, 1, 1, 2, 2, 2, 3, 3],
        'date': ['2023-01-01', '2023-01-05', '2023-01-10', '2023-01-01', '2023-01-15', '2023-01-20', '2023-01-01', '2023-01-05'],
        'amount': [100, 150, 200, 300, 400, 500, 250, 350]}

df = pd.DataFrame(data)

# Convert 'date' column to datetime type
df['date'] = pd.to_datetime(df['date'])

# Sort values by 'id' and 'date'
df = df.sort_values(by=['id', 'date'])

# Add 'first_transaction' and 'last_transaction' flags
df['first_transaction'] = df.groupby('id')['date'].transform('first') == df['date']
df['last_transaction'] = df.groupby('id')['date'].transform('last') == df['date']

print(df)

# Transpose (pivot) operation
df_pivot = df.pivot_table(index='id', columns='date', values='amount', aggfunc='sum')
print(df_pivot)

Subscribe