1.Exploratory Data Analysis (EDA) with Pandas in Banking – Converted in Pyspark
While searching for A free Pandas Project on Google Found this link –Exploratory Data Analysis (EDA) with Pandas in Banking . I have tried to convert this Pyscript in Pyspark one.
First, let’s handle the initial steps of downloading and extracting the data:
# These are shell commands to download and unzip the data
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank-additional.zip
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/EDA_Pandas_Banking_L1/bank-additional.zip
!unzip -o -q bank-additional.zip
In a PySpark context, we’ll assume the data is already available locally after the above steps.
Importing Libraries
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline
plt.rcParams["figure.figsize"] = (8, 6)
import warnings
warnings.filterwarnings('ignore')
Equivalent imports in PySpark:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import matplotlib.pyplot as plt
import numpy as np
# Initialize Spark session
spark = SparkSession.builder.appName("EDA_PySpark_Banking").getOrCreate()
Loading Data
Pandas:
df = pd.read_csv('bank-additional/bank-additional-full.csv', sep=';')
df.head(5)
PySpark:
df = spark.read.csv('bank-additional/bank-additional-full.csv', sep=';', header=True, inferSchema=True)
df.show(5)
Initial Exploration
Columns
Pandas:
df.columns
PySpark:
df.columns
Info
Pandas:
print(df.info())
PySpark:
df.printSchema()
Describe
Pandas:
df.describe()
df.describe(include=["object"])
PySpark:
df.describe().show()
df.select([countDistinct(c).alias(c) for c in df.columns]).show()
Value Counts
Pandas:
df["y"].value_counts()
df["marital"].value_counts(normalize=True)
PySpark:
df.groupBy("y").count().show()
df.groupBy("marital").count().withColumn("normalize", col("count") / df.count()).show()
Sorting
Pandas:
df.sort_values(by="duration", ascending=False).head()
df.sort_values(by=["age", "duration"], ascending=[True, False]).head()
PySpark:
df.orderBy(col("duration").desc()).show(5)
df.orderBy(col("age").asc(), col("duration").desc()).show(5)
Applying Functions
Pandas:
df.apply(np.max)
d = {"no": 0, "yes": 1}
df["y"] = df["y"].map(d)
PySpark:
# PySpark does not have an apply method; use select with max for each column
df.select([max(c).alias(c) for c in df.columns]).show()
# Mapping values
df = df.withColumn("y", when(col("y") == "yes", 1).otherwise(0))
Further Analysis
Pandas:
print("Share of attracted clients =", '{:.1%}'.format(df["y"].mean()))
df[df["y"] == 1].mean()
acd = round(df[df["y"] == 1]["duration"].mean(), 2)
acd_in_min = acd // 60
print("Average call duration for attracted clients =", acd_in_min, "min", int(acd) % 60, "sec")
print("Average age of attracted clients =", int(df[(df["y"] == 1) & (df["marital"] == "single")]["age"].mean()), "years")
df[-1:]
PySpark:
df.groupBy().agg(mean("y")).show()
df.filter(df["y"] == 1).agg(*[mean(c).alias(c) for c in df.columns]).show()
acd = df.filter(df["y"] == 1).agg(round(mean("duration"), 2)).collect()[0][0]
acd_in_min = acd // 60
print("Average call duration for attracted clients =", acd_in_min, "min", int(acd) % 60, "sec")
avg_age = df.filter((df["y"] == 1) & (df["marital"] == "single")).agg(mean("age")).collect()[0][0]
print("Average age of attracted clients =", int(avg_age), "years")
df.orderBy(desc("age")).limit(1).show()
Crosstab and Pivot Table
Pandas:
pd.crosstab(df["y"], df["marital"])
pd.crosstab(df["y"], df["marital"], normalize='index')
df.pivot_table(["age", "duration"], ["job"], aggfunc="mean").head(10)
PySpark:
df.groupBy("y", "marital").count().show()
df.groupBy("y", "marital").count().withColumn("normalize", col("count") / sum("count").over(Window.partitionBy("y"))).show()
df.groupBy("job").agg(mean("age"), mean("duration")).show()
Plots
Pandas:
pd.plotting.scatter_matrix(df[["age", "duration", "campaign"]], figsize=(15, 15), diagonal="kde")
plt.show()
df["age"].hist()
df.hist(color="k", bins=30, figsize=(15, 10))
plt.show()
df.boxplot(column="age", by="marital")
plt.show()
df.boxplot(column="age", by=["marital", "housing"], figsize=(20, 20))
plt.show()
PySpark:
# PySpark does not support direct plotting; use matplotlib for plotting
# Convert to Pandas for plotting
pandas_df = df.select("age", "duration", "campaign").toPandas()
pd.plotting.scatter_matrix(pandas_df, figsize=(15, 15), diagonal="kde")
plt.show()
df.select("age").toPandas().hist()
df.toPandas().hist(color="k", bins=30, figsize=(15, 10))
plt.show()
df.select("age", "marital").toPandas().boxplot(by="marital")
plt.show()
df.select("age", "marital", "housing").toPandas().boxplot(by=["marital", "housing"], figsize=(20, 20))
plt.show()
Combining Code
Here is the combined code:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
# Initialize Spark session
spark = SparkSession.builder.appName("EDA_PySpark_Banking").getOrCreate()
# Load the data
df = spark.read.csv('bank-additional/bank-additional-full.csv', sep=';', header=True, inferSchema=True)
# Initial exploration
df.printSchema()
df.describe().show()
df.groupBy("y").count().show()
df.groupBy("marital").count().withColumn("normalize", col("count") / df.count()).show()
df.orderBy(col("duration").desc()).show(5)
df.orderBy(col("age").asc(), col("duration").desc()).show(5)
df.select([max(c).alias(c) for c in df.columns]).show()
df = df.withColumn("y", when(col("y") == "yes", 1).otherwise(0))
df.groupBy().agg(mean("y")).show()
df.filter(df["y"] == 1).agg(*[mean(c).alias(c) for c in df.columns]).show()
acd = df.filter(df["y"] == 1).agg(round(mean("duration"), 2)).collect()[0][0]
acd_in_min = acd // 60
print("Average call duration for attracted clients =", acd_in_min, "min", int(acd) % 60, "sec")
avg_age = df.filter((df["y"] == 1) & (df["marital"] == "single")).agg(mean("age")).collect()[0][0]
print("Average age of attracted clients =", int(avg_age), "years")
df.orderBy(desc("age")).limit(1).show()
df.groupBy("y", "marital").count().show()
df.groupBy("y", "marital").count().withColumn("normalize", col("count") / sum("count").over(Window.partitionBy("y"))).show()
df.groupBy("job").agg(mean("age"), mean("duration")).show()
# Plots
pandas_df = df.select("age", "duration", "campaign").toPandas()
pd.plotting.scatter_matrix(pandas_df, figsize=(15, 15), diagonal="kde")
plt.show()
df.select("age").toPandas().hist()
df.toPandas().hist(color="k", bins=30, figsize=(15, 10))
plt.show()
df.select("age", "marital").toPandas().boxplot(by="marital")
plt.show()
df.select("age", "marital", "housing").toPandas().boxplot(by=["marital", "housing"], figsize=(20, 20))
plt.show()
2.Dynamic list of variables Creation for ETL Jobs
Let us create One or Multiple dynamic lists of variables and save it in dictionary or Array or other datastructure for further repeating use in Pyspark projects specially for ETL jobs. Variable names are in form of dynamic names for example Month_202401 to Month_202312 for 24 months ( Take these 24 month backdated or as per current month’s Progression). Later we will use this dictionary to create 24 csv files by filtering on year and month. we will also use to create arrays based on this dynamic dictionary. We will create custom excel files for 24 year months combination where column names are also in form of above 24 { year and months combination such as xyz_2404_04 to xyz_2404_12 , abc_xyz_2404_0405 to abc_xyz_2405_1201}. This requirement is specifically for Pyspark based ETL projects.
Creating dynamic lists of variables and saving them in a dictionary or array for further use in PySpark ETL jobs is an excellent idea to streamline and automate the process. Here’s a step-by-step guide to achieve this in a PySpark-based project.
Step 1: Generate Dynamic Variable Names
First, we need to generate dynamic variable names for the past 24 months from the current month. We’ll store these variable names in a dictionary for easy access.
from datetime import datetime, timedelta
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
import calendar
# Initialize SparkSession
spark = SparkSession.builder
.appName("DynamicVariablesETL")
.getOrCreate()
# Get current date
current_date = datetime.now()
# Create a dictionary to hold variable names
dynamic_vars = {}
# Generate variable names for the past 24 months
for i in range(24):
date = current_date - timedelta(days=i*30)
year_month = date.strftime('%Y%m')
dynamic_vars[f'Month_{year_month}'] = None
# Print the generated variable names
print(dynamic_vars)
Step 2: Create CSV Files by Filtering on Year and Month
Next, we’ll use the dynamic variable names to filter data and create CSV files. For simplicity, let’s assume we have a sample DataFrame to filter.
# Sample DataFrame
data = {
'date': [datetime.now() - timedelta(days=i*15) for i in range(50)],
'value': [i for i in range(50)]
}
df = pd.DataFrame(data)
spark_df = spark.createDataFrame(df)
# Function to create CSV files based on dynamic variables
def create_csv_files(spark_df, dynamic_vars):
for var in dynamic_vars.keys():
year_month = var.split('_')[1]
year = int(year_month[:4])
month = int(year_month[4:])
filtered_df = spark_df.filter((F.year("date") == year) & (F.month("date") == month))
if filtered_df.count() > 0:
filtered_df.write.csv(f"/path/to/output/{var}.csv")
create_csv_files(spark_df, dynamic_vars)
Step 3: Create Arrays Based on Dynamic Dictionary
We can also create arrays based on the dynamic dictionary.
# Create arrays from dynamic dictionary
dynamic_arrays = [f'Month_{date.strftime("%Y%m")}' for date in [current_date - timedelta(days=i*30) for i in range(24)]]
print(dynamic_arrays)
Step 4: Create Custom Excel Files
Lastly, we can create custom Excel files with columns named based on the dynamic dictionary.
import openpyxl
from openpyxl.utils.dataframe import dataframe_to_rows
# Function to create custom Excel files
def create_excel_files(dynamic_vars):
for var in dynamic_vars.keys():
year_month = var.split('_')[1]
year = year_month[:4]
month = year_month[4:]
# Create a DataFrame with custom columns
data = {
f'xyz_{year}_{month}': [i for i in range(10)],
f'abc_xyz_{year}_{month}{month}': [i for i in range(10)]
}
df = pd.DataFrame(data)
# Create an Excel file
file_name = f'/path/to/output/{var}.xlsx'
writer = pd.ExcelWriter(file_name, engine='openpyxl')
df.to_excel(writer, sheet_name='Sheet1', index=False)
writer.save()
create_excel_files(dynamic_vars)
Combining It All Together
Here is the complete script that combines all the steps above.
from datetime import datetime, timedelta
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
import calendar
# Initialize SparkSession
spark = SparkSession.builder
.appName("DynamicVariablesETL")
.getOrCreate()
# Get current date
current_date = datetime.now()
# Create a dictionary to hold variable names
dynamic_vars = {}
# Generate variable names for the past 24 months
for i in range(24):
date = current_date - timedelta(days=i*30)
year_month = date.strftime('%Y%m')
dynamic_vars[f'Month_{year_month}'] = None
# Sample DataFrame
data = {
'date': [datetime.now() - timedelta(days=i*15) for i in range(50)],
'value': [i for i in range(50)]
}
df = pd.DataFrame(data)
spark_df = spark.createDataFrame(df)
# Function to create CSV files based on dynamic variables
def create_csv_files(spark_df, dynamic_vars):
for var in dynamic_vars.keys():
year_month = var.split('_')[1]
year = int(year_month[:4])
month = int(year_month[4:])
filtered_df = spark_df.filter((F.year("date") == year) & (F.month("date") == month))
if filtered_df.count() > 0:
filtered_df.write.csv(f"/path/to/output/{var}.csv")
create_csv_files(spark_df, dynamic_vars)
# Create arrays from dynamic dictionary
dynamic_arrays = [f'Month_{date.strftime("%Y%m")}' for date in [current_date - timedelta(days=i*30) for i in range(24)]]
# Function to create custom Excel files
def create_excel_files(dynamic_vars):
for var in dynamic_vars.keys():
year_month = var.split('_')[1]
year = year_month[:4]
month = year_month[4:]
# Create a DataFrame with custom columns
data = {
f'xyz_{year}_{month}': [i for i in range(10)],
f'abc_xyz_{year}_{month}{month}': [i for i in range(10)]
}
df = pd.DataFrame(data)
# Create an Excel file
file_name = f'/path/to/output/{var}.xlsx'
writer = pd.ExcelWriter(file_name, engine='openpyxl')
df.to_excel(writer, sheet_name='Sheet1', index=False)
writer.save()
create_excel_files(dynamic_vars)
This script demonstrates how to generate dynamic variable names, filter data to create CSV files, and create custom Excel files with dynamically generated columns. You can customize the paths and logic as needed for your specific ETL job in PySpark.
2.SAS project to Pyspark Migration- merging, joining, transposing , lead/rank functions, data validation with PROC FREQ, error handling, macro variables, and macros
Let us create a comprehensive SAS project that involves merging, joining, transposing large tables, applying PROC SQL lead/rank functions, performing data validation with PROC FREQ, and incorporating error handling, macro variables, and macros for various functional tasks.
Step 1: Set Up Macro Variables for Date Values
%let start_date = '01JAN2023'd;
%let end_date = '31DEC2023'd;
Step 2: Define Macros for Various Functional Tasks
%macro import_data(libname=, dataset=, filepath=);
proc import datafile="&filepath" out=&libname..&dataset dbms=csv replace;
getnames=yes;
run;
%mend import_data;
%macro join_tables(libname=, table1=, table2=, out_table=, join_condition=);
proc sql;
create table &libname..&out_table as
select a.*, b.*
from &libname..&table1 as a
left join &libname..&table2 as b
on &join_condition;
quit;
%mend join_tables;
%macro transpose_table(libname=, in_table=, out_table=, id_var=, var=);
proc transpose data=&libname..&in_table out=&libname..&out_table;
by &id_var;
var &var;
run;
%mend transpose_table;
%macro validate_data(libname=, table=, var=);
proc freq data=&libname..&table;
tables &var / missing;
run;
%mend validate_data;
%macro apply_rank(libname=, table=, var=, out_table=);
proc sql;
create table &libname..&out_table as
select *, rank() over (order by &var desc) as rank
from &libname..&table;
quit;
%mend apply_rank;
Step 3: Import Large Tables
%import_data(libname=work, dataset=table1, filepath='/path/to/table1.csv');
%import_data(libname=work, dataset=table2, filepath='/path/to/table2.csv');
Step 4: Merge and Join Tables
%join_tables(libname=work, table1=table1, table2=table2, out_table=merged_table, join_condition=a.id=b.id);
Step 5: Transpose the Data
%transpose_table(libname=work, in_table=merged_table, out_table=transposed_table, id_var=id, var=some_var);
Step 6: Apply PROC SQL Lead/Rank Functions
%apply_rank(libname=work, table=transposed_table, var=some_var, out_table=ranked_table);
Step 7: Validate Data with PROC FREQ
%validate_data(libname=work, table=ranked_table, var=some_var);
Step 8: Error Handling
Ensure error handling by checking the existence of datasets and proper execution of steps.
%macro check_dataset(libname=, dataset=);
%if %sysfunc(exist(&libname..&dataset)) %then %do;
%put NOTE: Dataset &libname..&dataset exists.;
%end;
%else %do;
%put ERROR: Dataset &libname..&dataset does not exist.;
%abort cancel;
%end;
%mend check_dataset;
%check_dataset(libname=work, dataset=ranked_table);
Full Example in One Go
Putting it all together:
%let start_date = '01JAN2023'd;
%let end_date = '31DEC2023'd;
%macro import_data(libname=, dataset=, filepath=);
proc import datafile="&filepath" out=&libname..&dataset dbms=csv replace;
getnames=yes;
run;
%mend import_data;
%macro join_tables(libname=, table1=, table2=, out_table=, join_condition=);
proc sql;
create table &libname..&out_table as
select a.*, b.*
from &libname..&table1 as a
left join &libname..&table2 as b
on &join_condition;
quit;
%mend join_tables;
%macro transpose_table(libname=, in_table=, out_table=, id_var=, var=);
proc transpose data=&libname..&in_table out=&libname..&out_table;
by &id_var;
var &var;
run;
%mend transpose_table;
%macro validate_data(libname=, table=, var=);
proc freq data=&libname..&table;
tables &var / missing;
run;
%mend validate_data;
%macro apply_rank(libname=, table=, var=, out_table=);
proc sql;
create table &libname..&out_table as
select *, rank() over (order by &var desc) as rank
from &libname..&table;
quit;
%mend apply_rank;
%macro check_dataset(libname=, dataset=);
%if %sysfunc(exist(&libname..&dataset)) %then %do;
%put NOTE: Dataset &libname..&dataset exists.;
%end;
%else %do;
%put ERROR: Dataset &libname..&dataset does not exist.;
%abort cancel;
%end;
%mend check_dataset;
/* Import large tables */
%import_data(libname=work, dataset=table1, filepath='/path/to/table1.csv');
%import_data(libname=work, dataset=table2, filepath='/path/to/table2.csv');
/* Merge and join tables */
%join_tables(libname=work, table1=table1, table2=table2, out_table=merged_table, join_condition=a.id=b.id);
/* Transpose the data */
%transpose_table(libname=work, in_table=merged_table, out_table=transposed_table, id_var=id, var=some_var);
/* Apply PROC SQL lead/rank functions */
%apply_rank(libname=work, table=transposed_table, var=some_var, out_table=ranked_table);
/* Validate data with PROC FREQ */
%validate_data(libname=work, table=ranked_table, var=some_var);
/* Check the existence of the final dataset */
%check_dataset(libname=work, dataset=ranked_table);
Explanation:
- Macro Variables: Defined for date values.
- Macros for Functional Tasks: Defined for importing data, joining tables, transposing tables, validating data, and applying ranking functions.
- Data Import: Two large tables are imported using
PROC IMPORT
. - Data Merge and Join: Tables are merged and joined using
PROC SQL
. - Data Transposition: Transposing the merged table.
- Rank Application: Applying the rank function.
- Data Validation: Validating data using
PROC FREQ
. - Error Handling: Ensuring datasets exist and handling errors accordingly.
This setup provides a robust framework for handling complex data processing tasks in SAS, incorporating various best practices and functionalities.
Method1
Here is the equivalent PySpark code for the provided SAS script:
from pyspark.sql import SparkSession
from pyspark.sql.functions import rank, col
# Create a SparkSession object
spark = SparkSession.builder.appName("My App").getOrCreate()
# Define the start and end dates
start_date = "2023-01-01"
end_date = "2023-12-31"
# Import the data
table1 = spark.read.csv("/path/to/table1.csv", header=True, inferSchema=True)
table2 = spark.read.csv("/path/to/table2.csv", header=True, inferSchema=True)
# Register the DataFrames as temporary views
table1.createOrReplaceTempView("table1")
table2.createOrReplaceTempView("table2")
# Join the tables
merged_table = spark.sql("SELECT a.*, b.* FROM table1 a LEFT JOIN table2 b ON a.id = b.id")
merged_table.createOrReplaceTempView("merged_table")
# Transpose the data
transposed_table = merged_table.groupBy("id").pivot("some_var").sum()
transposed_table.createOrReplaceTempView("transposed_table")
# Apply the rank function
ranked_table = transposed_table.withColumn("rank", rank().over(Window.orderBy(col("some_var").desc())))
ranked_table.createOrReplaceTempView("ranked_table")
# Validate the data
ranked_table.select("some_var").summary().show()
# Check the existence of the final dataset
if ranked_table.count() > 0:
print("Dataset exists.")
else:
print("Dataset does not exist.")
Here’s a step-by-step explanation of the PySpark code:
- Create a
SparkSession
object, which is the entry point to programming Spark with the Dataset and DataFrame API. - Define the start and end dates as strings.
- Import the data from CSV files using the
read.csv
method, which returns a DataFrame. - Register the DataFrames as temporary views using the
createOrReplaceTempView
method, so they can be queried using SQL. - Join the tables using a SQL query, which is executed using the
sql
method. - Transpose the data using the
groupBy
andpivot
methods, which are equivalent to the SASproc transpose
step. - Apply the rank function using the
withColumn
andrank
methods, which are equivalent to the SASproc sql
step with therank
function. - Validate the data by computing summary statistics using the
summary
method, which is equivalent to the SASproc freq
step. - Check the existence of the final dataset by checking the count of rows using the
count
method. If the count is greater than 0, the dataset exists; otherwise, it does not exist.
Note that the PySpark code uses the DataFrame API, which is a more concise and expressive way of working with data compared to the SAS script. Additionally, the PySpark code uses SQL queries to perform the join and rank operations, which can be more efficient than using the SAS proc sql
step.
Method 2
Here’s the modified PySpark code that translates SAS macros to functions and adds error handling:
Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import rank, col
# Create a SparkSession object
spark = SparkSession.builder.appName("My App").getOrCreate()
# Define a function for importing data
def import_data(libname, dataset, filepath):
try:
df = spark.read.csv(filepath, header=True, inferSchema=True)
df.createOrReplaceTempView(f"{libname}_{dataset}")
return df
except Exception as e:
print(f"Error importing data: {e}")
return None
# Define a function for joining tables
def join_tables(libname, table1, table2, out_table, join_condition):
try:
spark.sql(f"""
CREATE TABLE {libname}_{out_table} AS
SELECT a.*, b.*
FROM {libname}_{table1} a
LEFT JOIN {libname}_{table2} b
ON {join_condition}
""")
return spark.table(f"{libname}_{out_table}")
except Exception as e:
print(f"Error joining tables: {e}")
return None
# Define a function for transposing data
def transpose_table(libname, in_table, out_table, id_var, var):
try:
spark.sql(f"""
CREATE TABLE {libname}_{out_table} AS
SELECT {id_var}, {var}
FROM {libname}_{in_table}
GROUP BY {id_var}
PIVOT ({var})
""")
return spark.table(f"{libname}_{out_table}")
except Exception as e:
print(f"Error transposing data: {e}")
return None
# Define a function for applying the rank function
def apply_rank(libname, table, var, out_table):
try:
spark.sql(f"""
CREATE TABLE {libname}_{out_table} AS
SELECT *, RANK() OVER (ORDER BY {var} DESC) AS rank
FROM {libname}_{table}
""")
return spark.table(f"{libname}_{out_table}")
except Exception as e:
print(f"Error applying rank function: {e}")
return None
# Define a function for validating data
def validate_data(libname, table, var):
try:
spark.sql(f"""
SELECT {var}, COUNT(*) AS count
FROM {libname}_{table}
GROUP BY {var}
""").show()
except Exception as e:
print(f"Error validating data: {e}")
# Define a function for checking the existence of a dataset
def check_dataset(libname, dataset):
try:
if spark.catalog.tableExists(f"{libname}_{dataset}"):
print("Dataset exists.")
else:
print("Dataset does not exist.")
except Exception as e:
print(f"Error checking dataset existence: {e}")
# Call the functions
table1 = import_data("work", "table1", "/path/to/table1.csv")
table2 = import_data("work", "table2", "/path/to/table2.csv")
merged_table = join_tables("work", "table1", "table2", "merged_table", "a.id = b.id")
transposed_table = transpose_table("work", "merged_table", "transposed_table", "id", "some_var")
ranked_table = apply_rank("work", "transposed_table", "some_var", "ranked_table")
validate_data("work", "ranked_table", "some_var")
check_dataset("work", "ranked_table")
In this modified code, I’ve defined separate functions for each of the SAS macros, and added error handling using try-except blocks. If an error occurs during the execution of a function, the error message is printed and the function returns None.
Method 3
Here’s the modified PySpark code that uses the sql
method for the transpose and rank operations, and defines classes, objects, and functions to mimic the SAS macros:
Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import rank, col
# Create a SparkSession object
spark = SparkSession.builder.appName("My App").getOrCreate()
# Define the start and end dates
start_date = "2023-01-01"
end_date = "2023-12-31"
# Define a class for importing data
class ImportData:
def __init__(self, libname, dataset, filepath):
self.libname = libname
self.dataset = dataset
self.filepath = filepath
def execute(self):
df = spark.read.csv(self.filepath, header=True, inferSchema=True)
df.createOrReplaceTempView(f"{self.libname}_{self.dataset}")
return df
# Define a class for joining tables
class JoinTables:
def __init__(self, libname, table1, table2, out_table, join_condition):
self.libname = libname
self.table1 = table1
self.table2 = table2
self.out_table = out_table
self.join_condition = join_condition
def execute(self):
spark.sql(f"""
CREATE TABLE {self.libname}_{self.out_table} AS
SELECT a.*, b.*
FROM {self.libname}_{self.table1} a
LEFT JOIN {self.libname}_{self.table2} b
ON {self.join_condition}
""")
return spark.table(f"{self.libname}_{self.out_table}")
# Define a class for transposing data
class TransposeTable:
def __init__(self, libname, in_table, out_table, id_var, var):
self.libname = libname
self.in_table = in_table
self.out_table = out_table
self.id_var = id_var
self.var = var
def execute(self):
spark.sql(f"""
CREATE TABLE {self.libname}_{self.out_table} AS
SELECT {self.id_var}, {self.var}
FROM {self.libname}_{self.in_table}
GROUP BY {self.id_var}
PIVOT ({self.var})
""")
return spark.table(f"{self.libname}_{self.out_table}")
# Define a class for applying the rank function
class ApplyRank:
def __init__(self, libname, table, var, out_table):
self.libname = libname
self.table = table
self.var = var
self.out_table = out_table
def execute(self):
spark.sql(f"""
CREATE TABLE {self.libname}_{self.out_table} AS
SELECT *, RANK() OVER (ORDER BY {self.var} DESC) AS rank
FROM {self.libname}_{self.table}
""")
return spark.table(f"{self.libname}_{self.out_table}")
# Define a class for validating data
class ValidateData:
def __init__(self, libname, table, var):
self.libname = libname
self.table = table
self.var = var
def execute(self):
spark.sql(f"""
SELECT {self.var}, COUNT(*) AS count
FROM {self.libname}_{self.table}
GROUP BY {self.var}
""").show()
# Define a class for checking the existence of a dataset
class CheckDataset:
def __init__(self, libname, dataset):
self.libname = libname
self.dataset = dataset
def execute(self):
if spark.catalog.tableExists(f"{self.libname}_{self.dataset}"):
print("Dataset exists.")
else:
print("Dataset does not exist.")
# Create instances of the classes and execute the methods
import_data = ImportData("work", "table1", "/path/to/table1.csv")
table1 = import_data.execute()
import_data = ImportData("work", "table2", "/path/to/table2.csv")
table2 = import_data.execute()
join_tables = JoinTables("work", "table1", "table2", "merged_table", "a.id = b.id")
merged_table = join_tables.execute()
transpose_table = TransposeTable("work", "merged_table", "transposed_table", "id", "some_var")
transposed_table = transpose_table.execute()
apply_rank = ApplyRank("work", "transposed_table", "some_var", "ranked_table")
ranked_table = apply_rank.execute()
validate_data = ValidateData("work", "ranked_table", "some_var")
validate_data.execute()
check_dataset = CheckDataset("work", "ranked_table")
check_dataset.execute()
In this modified code, we define classes for each of the SAS macros, and create instances of these classes to execute
step by step explanation is still left
Here is a step-by-step explanation of the modified PySpark code:
Step 1: Create a SparkSession object
spark = SparkSession.builder.appName("My App").getOrCreate()
This line creates a SparkSession object, which is the entry point to programming Spark with the Dataset and DataFrame API.
Step 2: Define the start and end dates
start_date = "2023-01-01"
end_date = "2023-12-31"
These lines define the start and end dates as strings.
Step 3: Define a class for importing data
class ImportData:
def __init__(self, libname, dataset, filepath):
self.libname = libname
self.dataset = dataset
self.filepath = filepath
def execute(self):
df = spark.read.csv(self.filepath, header=True, inferSchema=True)
df.createOrReplaceTempView(f"{self.libname}_{self.dataset}")
return df
This class defines a method for importing data from a CSV file. The execute
method reads the CSV file using the spark.read.csv
method, creates a temporary view using the createOrReplaceTempView
method, and returns the DataFrame.
Step 4: Define a class for joining tables
class JoinTables:
def __init__(self, libname, table1, table2, out_table, join_condition):
self.libname = libname
self.table1 = table1
self.table2 = table2
self.out_table = out_table
self.join_condition = join_condition
def execute(self):
spark.sql(f"""
CREATE TABLE {self.libname}_{self.out_table} AS
SELECT a.*, b.*
FROM {self.libname}_{self.table1} a
LEFT JOIN {self.libname}_{self.table2} b
ON {self.join_condition}
""")
return spark.table(f"{self.libname}_{self.out_table}")
This class defines a method for joining two tables. The execute
method uses the spark.sql
method to execute a SQL query that joins the two tables, creates a new table with the joined data, and returns the resulting DataFrame.
Step 5: Define a class for transposing data
class TransposeTable:
def __init__(self, libname, in_table, out_table, id_var, var):
self.libname = libname
self.in_table = in_table
self.out_table = out_table
self.id_var = id_var
self.var = var
def execute(self):
spark.sql(f"""
CREATE TABLE {self.libname}_{self.out_table} AS
SELECT {self.id_var}, {self.var}
FROM {self.libname}_{self.in_table}
GROUP BY {self.id_var}
PIVOT ({self.var})""")
return spark.table(f"{self.libname}_{self.out_table}")
This class defines a method for transposing data. The execute
method uses the spark.sql
method to execute a SQL query that transposes the data, creates a new table with the transposed data, and returns the resulting DataFrame.
Step 6: Define a class for applying the rank function
class ApplyRank:
def __init__(self, libname, table, var, out_table):
self.libname = libname
self.table = table
self.var = var
self.out_table = out_table
def execute(self):
spark.sql(f"""
CREATE TABLE {self.libname}_{self.out_table} AS
SELECT *, RANK() OVER (ORDER BY {self.var} DESC) AS rank
FROM {self.libname}_{self.table}
""")
return spark.table(f"{self.libname}_{self.out_table}")
This class defines a method for applying the rank function. The execute
method uses the spark.sql
method to execute a SQL query that applies the rank function, creates a new table with the ranked data, and returns the resulting DataFrame.
Step 7: Define a class for validating data
class ValidateData:
def __init__(self, libname, table, var):
self.libname = libname
self.table = table
self.var = var
def execute(self):
spark.sql(f"""
SELECT {self.var}, COUNT(*) AS count
FROM {self.libname}_{self.table}
GROUP BY {self.var}
""").show()
This class defines a method for validating data. The execute
method uses the spark.sql
method to execute a SQL query that groups the data by the specified variable and counts the number of rows in each group, and displays the results.
for dividing a monthly PySpark script into multiple code steps. Each step will be saved in the code
column of a control DataFrame and executed sequentially. The script will include error handling and pre-checks to ensure source tables are present or Oracle tables have data before starting each step. The control DataFrame and status DataFrame for each month will be appended to a Hive or Oracle table to maintain historical data.
Here is a detailed approach for dividing a monthly PySpark script into multiple code steps. Each step will be saved in the code
column of a control DataFrame and executed sequentially. The script will include error handling and pre-checks to ensure source tables are present or Oracle tables have data before starting each step. The control DataFrame and status DataFrame for each month will be appended to a Hive or Oracle table to maintain historical data.
Step 1: Prepare the Control DataFrame with Code Steps
Sample Monthly PySpark Script
Below is a sample PySpark script that reads data from Hive tables, performs transformations, and writes the output to Hive tables.
Sample PySpark Script
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from datetime import datetime
spark = SparkSession.builder
.appName("Monthly PySpark Script")
.enableHiveSupport()
.getOrCreate()
# Define the current month in MMyyyy format
current_month = datetime.now().strftime("%m%Y")
# Sample code steps to be executed sequentially
code_steps = [
# Step 1: Check if source Hive table exists and has data
f"""
source_table = "source_db.source_table"
if not spark.catalog.tableExists(source_table):
raise ValueError("Source table does not exist.")
df = spark.table(source_table)
if df.count() == 0:
raise ValueError("Source table is empty.")
""",
# Step 2: Read data from an Oracle table (Assuming JDBC connection properties are set)
f"""
oracle_url = "jdbc:oracle:thin:@hostname:port:service_name"
oracle_properties = {{
"user": "username",
"password": "password"
}}
oracle_df = spark.read.jdbc(oracle_url, "oracle_schema.oracle_table", properties=oracle_properties)
if oracle_df.count() == 0:
raise ValueError("Oracle table is empty.")
print("Data read from Oracle table:", oracle_df.count(), "rows")
""",
# Step 3: Perform a transformation
f"""
transformed_df = df.withColumn("new_col", col("existing_col") * 2)
print("Transformation applied:", transformed_df.count(), "rows")
""",
# Step 4: Join with Oracle data
f"""
final_df = transformed_df.join(oracle_df, "join_key")
print("Joined data:", final_df.count(), "rows")
""",
# Step 5: Write data to the target Hive table
f"""
target_table = "target_db.target_table"
final_df.write.mode("overwrite").saveAsTable(target_table)
print("Data written to target table")
"""
]
# Create a Control DataFrame
from pyspark.sql import Row
control_data = [Row(serial_no=i+1, code=step, month_period=current_month) for i, step in enumerate(code_steps)]
control_df = spark.createDataFrame(control_data)
control_df.show(truncate=False)
Step 2: Create a Status Table
Create a status table to store the execution status of each code step. This table can be created in Hive for simplicity.
# Create a status DataFrame
status_schema = "serial_no INT, status STRING, message STRING, month_period STRING, row_count INT"
spark.sql(f"CREATE TABLE IF NOT EXISTS control_db.status_table ({status_schema})")
Step 3: Execute Each Code Step with Error Handling and Status Tracking
Execute each code step sequentially, log the status, any error messages, and the row count of the output tables.
# Function to log status
def log_status(serial_no, status, message, row_count=0):
spark.sql(f"""
INSERT INTO control_db.status_table
VALUES ({serial_no}, '{status}', '{message}', '{current_month}', {row_count})
""")
# Execute each code snippet
code_snippets = control_df.select("serial_no", "code").collect()
for row in code_snippets:
serial_no = row["serial_no"]
code_snippet = row["code"]
try:
print(f"Executing step {serial_no}...")
exec(code_snippet)
# Fetch the row count of the output table if this step writes to a table
if "write.mode" in code_snippet:
output_table = code_snippet.split('"')[1]
row_count = spark.table(output_table).count()
else:
row_count = 0
log_status(serial_no, "SUCCESS", "Executed successfully", row_count)
except Exception as e:
error_message = str(e).replace("'", "")
log_status(serial_no, "FAILED", error_message)
print(f"Error executing step {serial_no}: {error_message}")
break
# Example to display the status table
spark.sql("SELECT * FROM control_db.status_table").show()
Full Example Script
Here’s the complete script:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from datetime import datetime
from pyspark.sql import Row
# Initialize Spark session
spark = SparkSession.builder
.appName("Execute Code Snippets from Control DataFrame")
.enableHiveSupport()
.getOrCreate()
# Define the current month in MMyyyy format
current_month = datetime.now().strftime("%m%Y")
# Define code snippets
code_steps = [
# Step 1: Check if source Hive table exists and has data
f"""
source_table = "source_db.source_table"
if not spark.catalog.tableExists(source_table):
raise ValueError("Source table does not exist.")
df = spark.table(source_table)
if df.count() == 0:
raise ValueError("Source table is empty.")
""",
# Step 2: Read data from an Oracle table
f"""
oracle_url = "jdbc:oracle:thin:@hostname:port:service_name"
oracle_properties = {{
"user": "username",
"password": "password"
}}
oracle_df = spark.read.jdbc(oracle_url, "oracle_schema.oracle_table", properties=oracle_properties)
if oracle_df.count() == 0:
raise ValueError("Oracle table is empty.")
print("Data read from Oracle table:", oracle_df.count(), "rows")
""",
# Step 3: Perform a transformation
f"""
transformed_df = df.withColumn("new_col", col("existing_col") * 2)
print("Transformation applied:", transformed_df.count(), "rows")
""",
# Step 4: Join with Oracle data
f"""
final_df = transformed_df.join(oracle_df, "join_key")
print("Joined data:", final_df.count(), "rows")
""",
# Step 5: Write data to the target Hive table
f"""
target_table = "target_db.target_table"
final_df.write.mode("overwrite").saveAsTable(target_table)
print("Data written to target table")
"""
]
# Create a Control DataFrame
control_data = [Row(serial_no=i+1, code=step, month_period=current_month) for i, step in enumerate(code_steps)]
control_df = spark.createDataFrame(control_data)
control_df.show(truncate=False)
# Create a status table
status_schema = "serial_no INT, status STRING, message STRING, month_period STRING, row_count INT"
spark.sql(f"CREATE TABLE IF NOT EXISTS control_db.status_table ({status_schema})")
# Function to log status
def log_status(serial_no, status, message, row_count=0):
spark.sql(f"""
INSERT INTO control_db.status_table
VALUES ({serial_no}, '{status}', '{message}', '{current_month}', {row_count})
""")
# Execute each code snippet
code_snippets = control_df.select("serial_no", "code").collect()
for row in code_snippets:
serial_no = row["serial_no"]
code_snippet = row["code"]
try:
print(f"Executing step {serial_no}...")
exec(code_snippet)
# Fetch the row count of the output table if this step writes to a table
if "write.mode" in code_snippet:
output_table = code_snippet.split('"')[1]
row_count = spark.table(output_table).count()
else:
row_count = 0
log_status(serial_no, "SUCCESS", "Executed successfully", row_count)
except Exception as e:
error_message = str(e).replace("'", "")
log_status(serial_no, "FAILED", error_message)
print(f"Error executing step {serial_no}: {error_message}")
break
# Example to display the status table
spark.sql("SELECT * FROM control_db.status_table").show()
To execute dynamic code snippets stored in a DataFrame (or any other source) in PySpark, you can use the exec
function. This can be particularly useful when you have a control table that stores PySpark code snippets to be executed sequentially. Below is a complete example of how to achieve this, including error handling, logging, and updating status.
Example: Dynamic Code Execution in PySpark
1. Initialize Spark Session
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from datetime import datetime
# Initialize Spark session
spark = SparkSession.builder
.appName("PySpark Dynamic Code Execution Example")
.enableHiveSupport()
.getOrCreate()
2. Create Sample Control DataFrame
# Sample control DataFrame
control_data = [
(1, "df1 = spark.range(0, 10)"),
(2, "df2 = df1.withColumn('square', F.col('id') ** 2)"),
(3, "df2.show()")
]
control_columns = ["serial_no", "code_snippet"]
control_df = spark.createDataFrame(control_data, control_columns)
control_df.show(truncate=False)
3. Create Log and Status DataFrames
# Define schema for log and status tables
log_schema = "timestamp STRING, message STRING, level STRING"
status_schema = "step INT, status STRING, row_count INT, timestamp STRING"
# Initialize empty log and status DataFrames
log_df = spark.createDataFrame([], schema=log_schema)
status_df = spark.createDataFrame([], schema=status_schema)
4. Define Functions for Error Handling and Logging
# Function to log messages
def log(message, level="INFO"):
global log_df
log_entry = [(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), message, level)]
log_df = log_df.union(spark.createDataFrame(log_entry, schema=log_schema))
# Function to update status
def update_status(step, status, row_count=0):
global status_df
status_entry = [(step, status, row_count, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))]
status_df = status_df.union(spark.createDataFrame(status_entry, schema=status_schema))
5. Execute Code Snippets Sequentially
# Function to execute code snippets
def execute_code(control_df):
for row in control_df.collect():
step = row["serial_no"]
code = row["code_snippet"]
try:
# Execute the code snippet
exec(code, globals())
log(f"Step {step} executed successfully", level="INFO")
# Optionally, you can get the row count of a DataFrame if the code produces one
row_count = eval(code.split('=')[0].strip()).count() if '=' in code else 0
update_status(step, "SUCCESS", row_count)
except Exception as e:
log(f"Step {step} failed: {e}", level="ERROR")
update_status(step, "FAILED")
break
# Execute the control DataFrame
execute_code(control_df)
# Show log and status tables
log_df.show(truncate=False)
status_df.show(truncate=False)
Explanation
- Initialization: A Spark session is initialized, and a sample control DataFrame is created with code snippets.
- Log and Status DataFrames: Empty DataFrames for logs and status are initialized with appropriate schemas.
- Logging Functions: Functions for logging messages and updating the status are defined.
- Execute Code Snippets: The
execute_code
function iterates over the control DataFrame, executing each code snippet usingexec
. It logs the execution status and updates the status table. - Execution and Display: The control DataFrame is executed, and the log and status tables are displayed.
Scheduling the Script
To schedule the script to run automatically, you can use scheduling tools like Apache Airflow, cron jobs, or any other job scheduler that fits your environment. Here’s an example of how you might schedule it using a simple cron job in Linux:
- Create a Python script (e.g.,
pyspark_script.py
) with the above code. - Add a cron job to execute the script at the desired interval.
bashCopy code# Open the cron table for editing
crontab -e
# Add a line to run the script at the desired schedule (e.g., every day at midnight)
0 0 * * * /path/to/spark/bin/spark-submit /path/to/pyspark_script.py
This setup will run your PySpark script automatically according to the schedule specified in the cron job.
Building a ETL Data pipeline in Pyspark and using Pandas and Matplotlib for Further Processing
Project Alert:- Building a ETL Data pipeline in Pyspark and using Pandas and Matplotlib for Further Processing. For Deployment we will consider using Bitbucket and Genkins.
We will build a Data pipeline from BDL Reading Hive Tables in Pyspark and executing Pyspark scrip for Complex Transformations on Data via Py spark Sql and Dataframe ApI. Some of our sources will be from Oracle Tables and CSV files stored in Server specific Location. We will read these and join with our processed BDL data to add more data information( Variables). Our Target tables are BDL Hive tables in paraquet format saved in another schema. We will also transform Big spark dataframes to consolidated data to be consumed as Pandas Dataframes which will be used for analysis using Pandas and Visualisation with Matplotlib. These data can be saved as CSV, Excel or Oracle Tables.
tep 1: Environment Setup
- Spark Session Initialization: Set up Spark with Hive support.
- Dependencies: Ensure you have the necessary libraries installed.
Step 2: Data Extraction
- Hive Tables: Read from Hive.
- Oracle Tables: Use JDBC to read from Oracle.
- CSV Files: Read from local/remote storage.
Step 3: Data Transformation
- PySpark SQL and DataFrame API: Perform transformations.
- Joining Data: Combine data from different sources.
Step 4: Data Loading
- Write to Hive: Save results in Parquet format.
- Convert to Pandas: For further analysis.
- Save as CSV/Excel/Oracle: Export data for other uses.
Step 5: Data Analysis and Visualization
- Pandas for Analysis: Use Pandas DataFrame.
- Matplotlib for Visualization: Create charts and graphs.
Step 6: Deployment with Bitbucket and Jenkins
- Version Control with Bitbucket: Push code to Bitbucket.
- CI/CD with Jenkins: Automate deployment.
Example Code
Spark Session Initialization
from pyspark.sql import SparkSession
spark = SparkSession.builder
.appName("ETL Pipeline")
.enableHiveSupport()
.getOrCreate()
Data Extraction
# Reading from Hive
hive_df = spark.sql("SELECT * FROM bdl_database.bdl_table")
# Reading from Oracle
oracle_df = spark.read.format("jdbc").options(
url="jdbc:oracle:thin:@//your_oracle_host:1521/your_oracle_db",
driver="oracle.jdbc.driver.OracleDriver",
dbtable="your_oracle_table",
user="your_user",
password="your_password"
).load()
# Reading from CSV
csv_df = spark.read.csv("path/to/your/csvfile.csv", header=True, inferSchema=True)
Data Transformation
# Example Transformation
hive_df_filtered = hive_df.filter(hive_df["column_name"] > 0)
oracle_df_selected = oracle_df.select("col1", "col2")
csv_df_transformed = csv_df.withColumn("new_col", csv_df["col3"] * 2)
# Joining Data
joined_df = hive_df_filtered.join(oracle_df_selected, "id", "inner")
.join(csv_df_transformed, "id", "inner")
Data Loading
#Save to Hive
joined_df.write.mode("overwrite").parquet("path/to/hive_table")
# Convert to Pandas for further analysis
pandas_df = joined_df.toPandas()
# Save as CSV and Excel
pandas_df.to_csv("path/to/save/yourdata.csv", index=False)
pandas_df.to_excel("path/to/save/yourdata.xlsx", index=False)
Data Analysis and Visualization
import matplotlib.pyplot as plt
import pandas as pd
# Pandas DataFrame Analysis
print(pandas_df.describe())
# Visualization
plt.figure(figsize=(10, 6))
pandas_df["your_column"].hist()
plt.title("Histogram of Your Column")
plt.xlabel("Values")
plt.ylabel("Frequency")
plt.show()
Deployment with Bitbucket and Jenkins
- Push Code to Bitbucket:
- Initialize a Git repository.
- Add your code and commit.
- Push to your Bitbucket repository.
- Set Up Jenkins for CI/CD:
- Install Jenkins and necessary plugins.
- Create a new Jenkins job.
- Configure the job to pull from your Bitbucket repository.
- Add build steps to run your PySpark script.
Jenkins Pipeline Script Example
pipeline {
agent any
stages {
stage('Clone Repository') {
steps {
git 'https://bitbucket.org/your_repo.git'
}
}
stage('Build and Test') {
steps {
sh 'spark-submit --master local[2] your_script.py'
}
}
stage('Deploy') {
steps {
// Add deployment steps here
}
}
}
}
Simplified Version of Complete Pipeline:-
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd
import matplotlib.pyplot as plt
# Initialize Spark Session
spark = SparkSession.builder
.appName("DataPipeline")
.enableHiveSupport()
.getOrCreate()
# Read Data from Hive
hive_df = spark.sql("SELECT * FROM bdl_database.bdl_table")
# Read Data from Oracle
oracle_df = spark.read.format("jdbc").options(
url="jdbc:oracle:thin:@//your_oracle_host:1521/your_oracle_db",
driver="oracle.jdbc.driver.OracleDriver",
dbtable="your_oracle_table",
user="your_user",
password="your_password"
).load()
# Read CSV Files
csv_df = spark.read.csv("path/to/your/csvfile.csv", header=True, inferSchema=True)
# Perform Transformations
# Example transformation: filter and select specific columns
hive_df_filtered = hive_df.filter(hive_df["column_name"] > 0)
oracle_df_selected = oracle_df.select("col1", "col2")
csv_df_transformed = csv_df.withColumn("new_col", csv_df["col3"] * 2)
# Join DataFrames
joined_df = hive_df_filtered.join(oracle_df_selected, hive_df_filtered["id"] == oracle_df_selected["id"], "inner")
final_df = joined_df.join(csv_df_transformed, joined_df["id"] == csv_df_transformed["id"], "inner")
# Save to Hive in Parquet format
final_df.write.mode("overwrite").parquet("path/to/hive_table")
# Convert to Pandas for further analysis
pandas_df = final_df.toPandas()
# Save Pandas DataFrame as CSV and Excel
pandas_df.to_csv("path/to/save/yourdata.csv", index=False)
pandas_df.to_excel("path/to/save/yourdata.xlsx", index=False)
# Visualize Data using Matplotlib
plt.figure(figsize=(10, 6))
pandas_df["your_column"].hist()
plt.title("Histogram of Your Column")
plt.xlabel("Values")
plt.ylabel("Frequency")
plt.show()
# Further Analysis with Pandas
print(pandas_df.describe())
# Close Spark Session
spark.stop()
Leave a Reply