Aggregation
Aggregation is a fundamental operation for summarizing data, and both Pandas and PySpark provide extensive support for various aggregation functions and options. Below is a detailed overview of how to perform aggregations in both libraries, including all functions and options available.
Aggregation in Pandas
1. Basic Aggregation Functions: Pandas provides several built-in aggregation functions that can be applied directly to DataFrames or Series.
import pandas as pd
data = {
'Name': ['Alice', 'Bob', 'Charlie', 'Alice', 'Bob'],
'Age': [25, 30, 35, 28, 33],
'Salary': [70000, 80000, 90000, 75000, 85000]
}
pdf = pd.DataFrame(data)
# Basic aggregation functions
print(pdf['Age'].sum()) # Sum
print(pdf['Age'].mean()) # Mean
print(pdf['Age'].median()) # Median
print(pdf['Age'].min()) # Minimum
print(pdf['Age'].max()) # Maximum
print(pdf['Age'].std()) # Standard Deviation
print(pdf['Age'].var()) # Variance
print(pdf['Age'].count()) # Count
2. GroupBy Aggregations: Aggregating data by groups is a common operation, and Pandas provides the groupby()
method for this purpose.
# GroupBy aggregation
grouped = pdf.groupby('Name')
print(grouped['Salary'].sum()) # Sum of salaries by name
print(grouped['Age'].mean()) # Mean age by name
3. Aggregate Multiple Functions: You can apply multiple aggregation functions simultaneously using the agg()
method.
# Aggregate multiple functions
print(grouped['Salary'].agg(['sum', 'mean', 'max']))
4. Custom Aggregation Functions: You can also define custom aggregation functions.
# Custom aggregation function
print(grouped['Salary'].agg(lambda x: x.max() - x.min()))
5. Pivot Tables: Pivot tables provide a way to aggregate data across multiple dimensions.
# Pivot table
pivot_table = pdf.pivot_table(values='Salary', index='Name', aggfunc='sum')
print(pivot_table)
6. Resample Time-Series Data: Resampling is used for time-series data to aggregate data based on a new time frequency.
# Resample time-series data
date_range = pd.date_range(start='1/1/2021', periods=5, freq='D')
pdf['Date'] = date_range
# Set the index to the date column
pdf.set_index('Date', inplace=True)
# Resample by week and sum
resampled = pdf.resample('W').sum()
print(resampled)
7. Rolling Aggregations: Rolling aggregations provide a way to calculate moving statistics.
# Rolling aggregation
print(pdf['Salary'].rolling(window=2).mean())
Aggregation in PySpark
1. Basic Aggregation Functions: PySpark provides a similar set of aggregation functions as Pandas.
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, avg, min, max, stddev, variance, count
# Initialize Spark session
spark = SparkSession.builder.appName("example").getOrCreate()
data = [('Alice', 25, 70000), ('Bob', 30, 80000), ('Charlie', 35, 90000), ('Alice', 28, 75000), ('Bob', 33, 85000)]
columns = ['Name', 'Age', 'Salary']
sdf = spark.createDataFrame(data, columns)
# Basic aggregation functions
sdf.select(sum('Age')).show()
sdf.select(avg('Age')).show()
sdf.select(min('Age')).show()
sdf.select(max('Age')).show()
sdf.select(stddev('Age')).show()
sdf.select(variance('Age')).show()
sdf.select(count('Age')).show()
2. GroupBy Aggregations: PySpark provides the groupBy()
method for grouping data and applying aggregations.
# GroupBy aggregation
grouped = sdf.groupBy('Name')
grouped.agg(sum('Salary')).show()
grouped.agg(avg('Age')).show()
3. Aggregate Multiple Functions: You can apply multiple aggregation functions using the agg()
method.
# Aggregate multiple functions
grouped.agg(sum('Salary'), avg('Age'), max('Salary')).show()
4. Custom Aggregation Functions: You can define custom aggregation functions in PySpark as well.
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
# Define a custom aggregation function
def range_udf(column):
return max(column) - min(column)
range_udf = udf(range_udf, DoubleType())
# Register the custom UDF
spark.udf.register("range_udf", range_udf)
# Use the custom UDF in aggregation
grouped.agg(range_udf('Salary')).show()
Pivot Tables: Pivot tables in PySpark can be created using the pivot()
method.
# Pivot table
pivot_table = sdf.groupBy('Name').pivot('Age').agg(sum('Salary'))
pivot_table.show()