• Regular expressions (regex) are a powerful tool for matching patterns in text. Python’s re module provides functions and tools for working with regular expressions. Here’s a complete tutorial on using regex in Python.

    1. Importing the re Module

    To use regular expressions in Python, you need to import the re module:

    import re

    2. Basic Functions

    re.search()

    Searches for a pattern within a string and returns a match object if found.

    pattern = r"d+"  # Matches one or more digits
    text = "The year is 2024"
    match = re.search(pattern, text)
    if match:
        print(f"Match found: {match.group()}")  # Output: Match found: 2024
    

    re.match()

    Matches a pattern at the beginning of a string.

    pattern = r"d+"  # Matches one or more digits
    text = "2024 is the year"
    match = re.match(pattern, text)
    if match:
        print(f"Match found: {match.group()}")  # Output: Match found: 2024
    

    re.findall()

    Finds all non-overlapping matches of a pattern in a string.

    pattern = r"d+"  # Matches one or more digits
    text = "The years are 2024, 2025, and 2026"
    matches = re.findall(pattern, text)
    print(f"Matches found: {matches}")  # Output: Matches found: ['2024', '2025', '2026']
    

    re.finditer()

    Returns an iterator yielding match objects for all non-overlapping matches.

    pattern = r"d+"  # Matches one or more digits
    text = "The years are 2024, 2025, and 2026"
    matches = re.finditer(pattern, text)
    for match in matches:
        print(f"Match found: {match.group()}")  # Output: 2024, 2025, 2026
    

    re.sub()

    Replaces occurrences of a pattern with a specified string.

    pattern = r"d+"  # Matches one or more digits
    text = "The years are 2024, 2025, and 2026"
    new_text = re.sub(pattern, "YEAR", text)
    print(new_text)  # Output: The years are YEAR, YEAR, and YEAR
    

    3. Special Characters

    • .: Matches any character except a newline.
    • ^: Matches the start of the string.
    • $: Matches the end of the string.
    • *: Matches 0 or more repetitions of the preceding element.
    • +: Matches 1 or more repetitions of the preceding element.
    • ?: Matches 0 or 1 repetition of the preceding element.
    • {m,n}: Matches from m to n repetitions of the preceding element.
    • []: Matches any one of the characters inside the brackets.
    • |: Matches either the pattern before or the pattern after the |.
    • () : Groups patterns and captures matches.

    4. Escaping Special Characters

    To match a literal special character, escape it with a backslash ().

    pattern = r"$100"  # Matches the string "$100"
    text = "The price is $100"
    match = re.search(pattern, text)
    if match:
        print(f"Match found: {match.group()}")  # Output: Match found: $100
    

    5. Character Classes

    • d: Matches any digit (equivalent to [0-9]).
    • D: Matches any non-digit.
    • w: Matches any alphanumeric character (equivalent to [a-zA-Z0-9_]).
    • W: Matches any non-alphanumeric character.
    • s: Matches any whitespace character (spaces, tabs, newlines).
    • S: Matches any non-whitespace character.

    6. Groups and Capturing

    You can use parentheses to create groups and capture parts of the match.

    pattern = r"(d{4})-(d{2})-(d{2})"  # Matches dates in the format YYYY-MM-DD
    text = "Today's date is 2024-06-27"
    match = re.search(pattern, text)
    if match:
        year, month, day = match.groups()
        print(f"Year: {year}, Month: {month}, Day: {day}")  # Output: Year: 2024, Month: 06, Day: 27
    

    7. Named Groups

    Named groups allow you to assign a name to a capturing group for easier access.

    pattern = r"(?P<year>d{4})-(?P<month>d{2})-(?P<day>d{2})"  # Matches dates in the format YYYY-MM-DD
    text = "Today's date is 2024-06-27"
    match = re.search(pattern, text)
    if match:
        print(f"Year: {match.group('year')}, Month: {match.group('month')}, Day: {match.group('day')}")  # Output: Year: 2024, Month: 06, Day: 27
    

    8. Lookahead and Lookbehind

    Lookaheads and lookbehinds are assertions that allow you to match a pattern only if it is (or isn’t) followed or preceded by another pattern.

    Positive Lookahead

    pattern = r"d+(?= dollars)"  # Matches digits followed by "dollars"
    text = "The price is 100 dollars"
    match = re.search(pattern, text)
    if match:
        print(f"Match found: {match.group()}")  # Output: Match found: 100
    

    Negative Lookahead

    pattern = r"d+(?! dollars)"  # Matches digits not followed by "dollars"
    text = "The price is 100 dollars or 200 euros"
    matches = re.findall(pattern, text)
    print(f"Matches found: {matches}")  # Output: Matches found: ['200']
    

    Positive Lookbehind

    pattern = r"(?<=$)d+"  # Matches digits preceded by a dollar sign
    text = "The price is $100"
    match = re.search(pattern, text)
    if match:
        print(f"Match found: {match.group()}")  # Output: Match found: 100
    

    Negative Lookbehind

    pattern = r"(?<!$)d+"  # Matches digits not preceded by a dollar sign
    text = "The price is 100 dollars or $200"
    matches = re.findall(pattern, text)
    print(f"Matches found: {matches}")  # Output: Matches found: ['100']
    

    9. Compiling Regular Expressions

    For efficiency, especially when using the same regex multiple times, you can compile a regular expression.

    pattern = re.compile(r"d{4}-d{2}-d{2}")  # Compile the regex pattern
    text = "The dates are 2024-06-27 and 2025-07-28"
    
    # Use the compiled pattern
    matches = pattern.findall(text)
    print(f"Matches found: {matches}")  # Output: Matches found: ['2024-06-27', '2025-07-28']
    

    10. Flags

    You can use flags to modify the behavior of regex functions. Common flags include:

    • re.IGNORECASE (re.I): Ignore case.
    • re.MULTILINE (re.M): Multi-line matching, affects ^ and $.
    • re.DOTALL (re.S): Dot matches all characters, including newline.
    pattern = r"^hello"
    text = "Hellonhello"
    
    # Without flag
    matches = re.findall(pattern, text)
    print(f"Matches without flag: {matches}")  # Output: Matches without flag: ['hello']
    
    # With IGNORECASE and MULTILINE flags
    matches = re.findall(pattern, text, re.IGNORECASE | re.MULTILINE)
    print(f"Matches with flags: {matches}")  # Output: Matches with flags: ['Hello', 'hello']
    

    Example: Searching for Emails in a Database

    Suppose you have a database with user data, and you want to extract all email addresses.

    import re
    
    # Sample data representing rows in a database
    data = [
        "Alice, alice@example.com, 123-456-7890",
        "Bob, bob123@gmail.com, 987-654-3210",
        "Charlie, charlie123@company.org, 555-555-5555",
        "Invalid data, no email here, 000-000-0000"
    ]
    
    # Regex pattern for matching email addresses
    email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,}'
    
    emails = []
    
    for entry in data:
        found_emails = re.findall(email_pattern, entry)
        emails.extend(found_emails)
    
    print("Extracted Emails:")
    for email in emails:
        print(email)
    

    Example: Searching for Code Snippets in a Code Repository

    Suppose you have a repository with Python files, and you want to find all instances of function definitions.

    import re
    import os
    
    # Directory containing Python files
    code_directory = 'path_to_code_repository'
    
    # Regex pattern for matching function definitions in Python
    function_pattern = r'defs+([a-zA-Z_][a-zA-Z0-9_]*)s*(.*)s*:'
    
    functions = []
    
    # Walk through the directory and process each Python file
    for root, dirs, files in os.walk(code_directory):
        for file in files:
            if file.endswith('.py'):
                file_path = os.path.join(root, file)
                with open(file_path, 'r') as f:
                    file_content = f.read()
                    found_functions = re.findall(function_pattern, file_content)
                    for func in found_functions:
                        functions.append((file, func))
    
    print("Found Functions:")
    for file, func in functions:
        print(f"Function '{func}' found in file '{file}'")
  • 1.Exploratory Data Analysis (EDA) with Pandas in Banking – Converted in Pyspark

    While searching for A free Pandas Project on Google Found this link –Exploratory Data Analysis (EDA) with Pandas in Banking . I have tried to convert this Pyscript in Pyspark one.

    First, let’s handle the initial steps of downloading and extracting the data:

    In a PySpark context, we’ll assume the data is already available locally after the above steps.

    Importing Libraries

    import pandas as pd
    import matplotlib.pyplot as plt
    import numpy as np

    %matplotlib inline
    plt.rcParams["figure.figsize"] = (8, 6)

    import warnings
    warnings.filterwarnings('ignore')

    Equivalent imports in PySpark:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import *
    import matplotlib.pyplot as plt
    import numpy as np

    # Initialize Spark session
    spark = SparkSession.builder.appName("EDA_PySpark_Banking").getOrCreate()

    Loading Data

    Pandas:

    df = pd.read_csv('bank-additional/bank-additional-full.csv', sep=';')
    df.head(5)

    PySpark:

    df = spark.read.csv('bank-additional/bank-additional-full.csv', sep=';', header=True, inferSchema=True)
    df.show(5)

    Initial Exploration

    Columns

    Pandas:

    df.columns

    PySpark:

    df.columns

    Info

    Pandas:

    print(df.info())

    PySpark:

    df.printSchema()

    Describe

    Pandas:

    df.describe()
    df.describe(include=["object"])

    PySpark:

    df.describe().show()
    df.select([countDistinct(c).alias(c) for c in df.columns]).show()

    Value Counts

    Pandas:

    df["y"].value_counts()
    df["marital"].value_counts(normalize=True)

    PySpark:

    df.groupBy("y").count().show()
    df.groupBy("marital").count().withColumn("normalize", col("count") / df.count()).show()

    Sorting

    Pandas:

    df.sort_values(by="duration", ascending=False).head()
    df.sort_values(by=["age", "duration"], ascending=[True, False]).head()

    PySpark:

    df.orderBy(col("duration").desc()).show(5)
    df.orderBy(col("age").asc(), col("duration").desc()).show(5)

    Applying Functions

    Pandas:

    df.apply(np.max)
    d = {"no": 0, "yes": 1}
    df["y"] = df["y"].map(d)

    PySpark:

    # PySpark does not have an apply method; use select with max for each column
    df.select([max(c).alias(c) for c in df.columns]).show()

    # Mapping values
    df = df.withColumn("y", when(col("y") == "yes", 1).otherwise(0))

    Further Analysis

    Pandas:

    print("Share of attracted clients =", '{:.1%}'.format(df["y"].mean()))
    df[df["y"] == 1].mean()
    acd = round(df[df["y"] == 1]["duration"].mean(), 2)
    acd_in_min = acd // 60
    print("Average call duration for attracted clients =", acd_in_min, "min", int(acd) % 60, "sec")
    print("Average age of attracted clients =", int(df[(df["y"] == 1) & (df["marital"] == "single")]["age"].mean()), "years")
    df[-1:]

    PySpark:

    df.groupBy().agg(mean("y")).show()
    df.filter(df["y"] == 1).agg(*[mean(c).alias(c) for c in df.columns]).show()
    acd = df.filter(df["y"] == 1).agg(round(mean("duration"), 2)).collect()[0][0]
    acd_in_min = acd // 60
    print("Average call duration for attracted clients =", acd_in_min, "min", int(acd) % 60, "sec")
    avg_age = df.filter((df["y"] == 1) & (df["marital"] == "single")).agg(mean("age")).collect()[0][0]
    print("Average age of attracted clients =", int(avg_age), "years")
    df.orderBy(desc("age")).limit(1).show()

    Crosstab and Pivot Table

    Pandas:

    pd.crosstab(df["y"], df["marital"])
    pd.crosstab(df["y"], df["marital"], normalize='index')
    df.pivot_table(["age", "duration"], ["job"], aggfunc="mean").head(10)

    PySpark:

    df.groupBy("y", "marital").count().show()
    df.groupBy("y", "marital").count().withColumn("normalize", col("count") / sum("count").over(Window.partitionBy("y"))).show()
    df.groupBy("job").agg(mean("age"), mean("duration")).show()

    Plots

    Pandas:

    pd.plotting.scatter_matrix(df[["age", "duration", "campaign"]], figsize=(15, 15), diagonal="kde")
    plt.show()

    df["age"].hist()
    df.hist(color="k", bins=30, figsize=(15, 10))
    plt.show()

    df.boxplot(column="age", by="marital")
    plt.show()

    df.boxplot(column="age", by=["marital", "housing"], figsize=(20, 20))
    plt.show()

    PySpark:

    # PySpark does not support direct plotting; use matplotlib for plotting
    # Convert to Pandas for plotting
    pandas_df = df.select("age", "duration", "campaign").toPandas()
    pd.plotting.scatter_matrix(pandas_df, figsize=(15, 15), diagonal="kde")
    plt.show()

    df.select("age").toPandas().hist()
    df.toPandas().hist(color="k", bins=30, figsize=(15, 10))
    plt.show()

    df.select("age", "marital").toPandas().boxplot(by="marital")
    plt.show()

    df.select("age", "marital", "housing").toPandas().boxplot(by=["marital", "housing"], figsize=(20, 20))
    plt.show()

    Combining Code

    Here is the combined code:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import *
    from pyspark.sql.window import Window
    import matplotlib.pyplot as plt
    import pandas as pd
    import numpy as np

    # Initialize Spark session
    spark = SparkSession.builder.appName("EDA_PySpark_Banking").getOrCreate()

    # Load the data
    df = spark.read.csv('bank-additional/bank-additional-full.csv', sep=';', header=True, inferSchema=True)

    # Initial exploration
    df.printSchema()
    df.describe().show()

    df.groupBy("y").count().show()
    df.groupBy("marital").count().withColumn("normalize", col("count") / df.count()).show()

    df.orderBy(col("duration").desc()).show(5)
    df.orderBy(col("age").asc(), col("duration").desc()).show(5)

    df.select([max(c).alias(c) for c in df.columns]).show()
    df = df.withColumn("y", when(col("y") == "yes", 1).otherwise(0))

    df.groupBy().agg(mean("y")).show()
    df.filter(df["y"] == 1).agg(*[mean(c).alias(c) for c in df.columns]).show()

    acd = df.filter(df["y"] == 1).agg(round(mean("duration"), 2)).collect()[0][0]
    acd_in_min = acd // 60
    print("Average call duration for attracted clients =", acd_in_min, "min", int(acd) % 60, "sec")

    avg_age = df.filter((df["y"] == 1) & (df["marital"] == "single")).agg(mean("age")).collect()[0][0]
    print("Average age of attracted clients =", int(avg_age), "years")
    df.orderBy(desc("age")).limit(1).show()

    df.groupBy("y", "marital").count().show()
    df.groupBy("y", "marital").count().withColumn("normalize", col("count") / sum("count").over(Window.partitionBy("y"))).show()
    df.groupBy("job").agg(mean("age"), mean("duration")).show()

    # Plots
    pandas_df = df.select("age", "duration", "campaign").toPandas()
    pd.plotting.scatter_matrix(pandas_df, figsize=(15, 15), diagonal="kde")
    plt.show()

    df.select("age").toPandas().hist()
    df.toPandas().hist(color="k", bins=30, figsize=(15, 10))
    plt.show()

    df.select("age", "marital").toPandas().boxplot(by="marital")
    plt.show()

    df.select("age", "marital", "housing").toPandas().boxplot(by=["marital", "housing"], figsize=(20, 20))
    plt.show()


    2.Dynamic list of variables Creation for ETL Jobs

    Let us create One or Multiple dynamic lists of variables and save it in dictionary or Array or other datastructure for further repeating use in Pyspark projects specially for ETL jobs. Variable names are in form of dynamic names for example Month_202401 to Month_202312 for 24 months ( Take these 24 month backdated or as per current month’s Progression). Later we will use this dictionary to create 24 csv files by filtering on year and month. we will also use to create arrays based on this dynamic dictionary. We will create custom excel files for 24 year months combination where column names are also in form of above 24 { year and months combination such as xyz_2404_04 to xyz_2404_12 , abc_xyz_2404_0405 to abc_xyz_2405_1201}. This requirement is specifically for Pyspark based ETL projects.

    Creating dynamic lists of variables and saving them in a dictionary or array for further use in PySpark ETL jobs is an excellent idea to streamline and automate the process. Here’s a step-by-step guide to achieve this in a PySpark-based project.

    Step 1: Generate Dynamic Variable Names

    First, we need to generate dynamic variable names for the past 24 months from the current month. We’ll store these variable names in a dictionary for easy access.

    from datetime import datetime, timedelta
    import pandas as pd
    import pyspark.sql.functions as F
    from pyspark.sql import SparkSession
    import calendar
    
    # Initialize SparkSession
    spark = SparkSession.builder
        .appName("DynamicVariablesETL")
        .getOrCreate()
    
    # Get current date
    current_date = datetime.now()
    
    # Create a dictionary to hold variable names
    dynamic_vars = {}
    
    # Generate variable names for the past 24 months
    for i in range(24):
        date = current_date - timedelta(days=i*30)
        year_month = date.strftime('%Y%m')
        dynamic_vars[f'Month_{year_month}'] = None
    
    # Print the generated variable names
    print(dynamic_vars)
    

    Step 2: Create CSV Files by Filtering on Year and Month

    Next, we’ll use the dynamic variable names to filter data and create CSV files. For simplicity, let’s assume we have a sample DataFrame to filter.

    # Sample DataFrame
    data = {
        'date': [datetime.now() - timedelta(days=i*15) for i in range(50)],
        'value': [i for i in range(50)]
    }
    
    df = pd.DataFrame(data)
    spark_df = spark.createDataFrame(df)
    
    # Function to create CSV files based on dynamic variables
    def create_csv_files(spark_df, dynamic_vars):
        for var in dynamic_vars.keys():
            year_month = var.split('_')[1]
            year = int(year_month[:4])
            month = int(year_month[4:])
    
            filtered_df = spark_df.filter((F.year("date") == year) & (F.month("date") == month))
    
            if filtered_df.count() > 0:
                filtered_df.write.csv(f"/path/to/output/{var}.csv")
    
    create_csv_files(spark_df, dynamic_vars)
    

    Step 3: Create Arrays Based on Dynamic Dictionary

    We can also create arrays based on the dynamic dictionary.

    # Create arrays from dynamic dictionary
    dynamic_arrays = [f'Month_{date.strftime("%Y%m")}' for date in [current_date - timedelta(days=i*30) for i in range(24)]]
    
    print(dynamic_arrays)
    

    Step 4: Create Custom Excel Files

    Lastly, we can create custom Excel files with columns named based on the dynamic dictionary.

    import openpyxl
    from openpyxl.utils.dataframe import dataframe_to_rows
    
    # Function to create custom Excel files
    def create_excel_files(dynamic_vars):
        for var in dynamic_vars.keys():
            year_month = var.split('_')[1]
            year = year_month[:4]
            month = year_month[4:]
    
            # Create a DataFrame with custom columns
            data = {
                f'xyz_{year}_{month}': [i for i in range(10)],
                f'abc_xyz_{year}_{month}{month}': [i for i in range(10)]
            }
    
            df = pd.DataFrame(data)
    
            # Create an Excel file
            file_name = f'/path/to/output/{var}.xlsx'
            writer = pd.ExcelWriter(file_name, engine='openpyxl')
    
            df.to_excel(writer, sheet_name='Sheet1', index=False)
    
            writer.save()
    
    create_excel_files(dynamic_vars)
    

    Combining It All Together

    Here is the complete script that combines all the steps above.

    from datetime import datetime, timedelta
    import pandas as pd
    import pyspark.sql.functions as F
    from pyspark.sql import SparkSession
    import calendar
    
    # Initialize SparkSession
    spark = SparkSession.builder
        .appName("DynamicVariablesETL")
        .getOrCreate()
    
    # Get current date
    current_date = datetime.now()
    
    # Create a dictionary to hold variable names
    dynamic_vars = {}
    
    # Generate variable names for the past 24 months
    for i in range(24):
        date = current_date - timedelta(days=i*30)
        year_month = date.strftime('%Y%m')
        dynamic_vars[f'Month_{year_month}'] = None
    
    # Sample DataFrame
    data = {
        'date': [datetime.now() - timedelta(days=i*15) for i in range(50)],
        'value': [i for i in range(50)]
    }
    
    df = pd.DataFrame(data)
    spark_df = spark.createDataFrame(df)
    
    # Function to create CSV files based on dynamic variables
    def create_csv_files(spark_df, dynamic_vars):
        for var in dynamic_vars.keys():
            year_month = var.split('_')[1]
            year = int(year_month[:4])
            month = int(year_month[4:])
    
            filtered_df = spark_df.filter((F.year("date") == year) & (F.month("date") == month))
    
            if filtered_df.count() > 0:
                filtered_df.write.csv(f"/path/to/output/{var}.csv")
    
    create_csv_files(spark_df, dynamic_vars)
    
    # Create arrays from dynamic dictionary
    dynamic_arrays = [f'Month_{date.strftime("%Y%m")}' for date in [current_date - timedelta(days=i*30) for i in range(24)]]
    
    # Function to create custom Excel files
    def create_excel_files(dynamic_vars):
        for var in dynamic_vars.keys():
            year_month = var.split('_')[1]
            year = year_month[:4]
            month = year_month[4:]
    
            # Create a DataFrame with custom columns
            data = {
                f'xyz_{year}_{month}': [i for i in range(10)],
                f'abc_xyz_{year}_{month}{month}': [i for i in range(10)]
            }
    
            df = pd.DataFrame(data)
    
            # Create an Excel file
            file_name = f'/path/to/output/{var}.xlsx'
            writer = pd.ExcelWriter(file_name, engine='openpyxl')
    
            df.to_excel(writer, sheet_name='Sheet1', index=False)
    
            writer.save()
    
    create_excel_files(dynamic_vars)
    

    This script demonstrates how to generate dynamic variable names, filter data to create CSV files, and create custom Excel files with dynamically generated columns. You can customize the paths and logic as needed for your specific ETL job in PySpark.


    2.SAS project to Pyspark Migration- merging, joining, transposing , lead/rank functions, data validation with PROC FREQ, error handling, macro variables, and macros

    Let us create a comprehensive SAS project that involves merging, joining, transposing large tables, applying PROC SQL lead/rank functions, performing data validation with PROC FREQ, and incorporating error handling, macro variables, and macros for various functional tasks.

    Step 1: Set Up Macro Variables for Date Values

    %let start_date = '01JAN2023'd;
    %let end_date = '31DEC2023'd;
    

    Step 2: Define Macros for Various Functional Tasks

    %macro import_data(libname=, dataset=, filepath=);
        proc import datafile="&filepath" out=&libname..&dataset dbms=csv replace;
        getnames=yes;
        run;
    %mend import_data;
    
    %macro join_tables(libname=, table1=, table2=, out_table=, join_condition=);
        proc sql;
            create table &libname..&out_table as
            select a.*, b.*
            from &libname..&table1 as a
            left join &libname..&table2 as b
            on &join_condition;
        quit;
    %mend join_tables;
    
    %macro transpose_table(libname=, in_table=, out_table=, id_var=, var=);
        proc transpose data=&libname..&in_table out=&libname..&out_table;
        by &id_var;
        var &var;
        run;
    %mend transpose_table;
    
    %macro validate_data(libname=, table=, var=);
        proc freq data=&libname..&table;
        tables &var / missing;
        run;
    %mend validate_data;
    
    %macro apply_rank(libname=, table=, var=, out_table=);
        proc sql;
            create table &libname..&out_table as
            select *, rank() over (order by &var desc) as rank
            from &libname..&table;
        quit;
    %mend apply_rank;
    

    Step 3: Import Large Tables

    %import_data(libname=work, dataset=table1, filepath='/path/to/table1.csv');
    %import_data(libname=work, dataset=table2, filepath='/path/to/table2.csv');
    

    Step 4: Merge and Join Tables

    %join_tables(libname=work, table1=table1, table2=table2, out_table=merged_table, join_condition=a.id=b.id);
    

    Step 5: Transpose the Data

    %transpose_table(libname=work, in_table=merged_table, out_table=transposed_table, id_var=id, var=some_var);
    

    Step 6: Apply PROC SQL Lead/Rank Functions

    %apply_rank(libname=work, table=transposed_table, var=some_var, out_table=ranked_table);
    

    Step 7: Validate Data with PROC FREQ

    %validate_data(libname=work, table=ranked_table, var=some_var);
    

    Step 8: Error Handling

    Ensure error handling by checking the existence of datasets and proper execution of steps.

    %macro check_dataset(libname=, dataset=);
        %if %sysfunc(exist(&libname..&dataset)) %then %do;
            %put NOTE: Dataset &libname..&dataset exists.;
        %end;
        %else %do;
            %put ERROR: Dataset &libname..&dataset does not exist.;
            %abort cancel;
        %end;
    %mend check_dataset;
    
    %check_dataset(libname=work, dataset=ranked_table);
    

    Full Example in One Go

    Putting it all together:

    %let start_date = '01JAN2023'd;
    %let end_date = '31DEC2023'd;
    
    %macro import_data(libname=, dataset=, filepath=);
        proc import datafile="&filepath" out=&libname..&dataset dbms=csv replace;
        getnames=yes;
        run;
    %mend import_data;
    
    %macro join_tables(libname=, table1=, table2=, out_table=, join_condition=);
        proc sql;
            create table &libname..&out_table as
            select a.*, b.*
            from &libname..&table1 as a
            left join &libname..&table2 as b
            on &join_condition;
        quit;
    %mend join_tables;
    
    %macro transpose_table(libname=, in_table=, out_table=, id_var=, var=);
        proc transpose data=&libname..&in_table out=&libname..&out_table;
        by &id_var;
        var &var;
        run;
    %mend transpose_table;
    
    %macro validate_data(libname=, table=, var=);
        proc freq data=&libname..&table;
        tables &var / missing;
        run;
    %mend validate_data;
    
    %macro apply_rank(libname=, table=, var=, out_table=);
        proc sql;
            create table &libname..&out_table as
            select *, rank() over (order by &var desc) as rank
            from &libname..&table;
        quit;
    %mend apply_rank;
    
    %macro check_dataset(libname=, dataset=);
        %if %sysfunc(exist(&libname..&dataset)) %then %do;
            %put NOTE: Dataset &libname..&dataset exists.;
        %end;
        %else %do;
            %put ERROR: Dataset &libname..&dataset does not exist.;
            %abort cancel;
        %end;
    %mend check_dataset;
    
    /* Import large tables */
    %import_data(libname=work, dataset=table1, filepath='/path/to/table1.csv');
    %import_data(libname=work, dataset=table2, filepath='/path/to/table2.csv');
    
    /* Merge and join tables */
    %join_tables(libname=work, table1=table1, table2=table2, out_table=merged_table, join_condition=a.id=b.id);
    
    /* Transpose the data */
    %transpose_table(libname=work, in_table=merged_table, out_table=transposed_table, id_var=id, var=some_var);
    
    /* Apply PROC SQL lead/rank functions */
    %apply_rank(libname=work, table=transposed_table, var=some_var, out_table=ranked_table);
    
    /* Validate data with PROC FREQ */
    %validate_data(libname=work, table=ranked_table, var=some_var);
    
    /* Check the existence of the final dataset */
    %check_dataset(libname=work, dataset=ranked_table);
    

    Explanation:

    1. Macro Variables: Defined for date values.
    2. Macros for Functional Tasks: Defined for importing data, joining tables, transposing tables, validating data, and applying ranking functions.
    3. Data Import: Two large tables are imported using PROC IMPORT.
    4. Data Merge and Join: Tables are merged and joined using PROC SQL.
    5. Data Transposition: Transposing the merged table.
    6. Rank Application: Applying the rank function.
    7. Data Validation: Validating data using PROC FREQ.
    8. Error Handling: Ensuring datasets exist and handling errors accordingly.

    This setup provides a robust framework for handling complex data processing tasks in SAS, incorporating various best practices and functionalities.

    Method1

    Here is the equivalent PySpark code for the provided SAS script:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import rank, col
    
    # Create a SparkSession object
    spark = SparkSession.builder.appName("My App").getOrCreate()
    
    # Define the start and end dates
    start_date = "2023-01-01"
    end_date = "2023-12-31"
    
    # Import the data
    table1 = spark.read.csv("/path/to/table1.csv", header=True, inferSchema=True)
    table2 = spark.read.csv("/path/to/table2.csv", header=True, inferSchema=True)
    
    # Register the DataFrames as temporary views
    table1.createOrReplaceTempView("table1")
    table2.createOrReplaceTempView("table2")
    
    # Join the tables
    merged_table = spark.sql("SELECT a.*, b.* FROM table1 a LEFT JOIN table2 b ON a.id = b.id")
    merged_table.createOrReplaceTempView("merged_table")
    
    # Transpose the data
    transposed_table = merged_table.groupBy("id").pivot("some_var").sum()
    transposed_table.createOrReplaceTempView("transposed_table")
    
    # Apply the rank function
    ranked_table = transposed_table.withColumn("rank", rank().over(Window.orderBy(col("some_var").desc())))
    ranked_table.createOrReplaceTempView("ranked_table")
    
    # Validate the data
    ranked_table.select("some_var").summary().show()
    
    # Check the existence of the final dataset
    if ranked_table.count() > 0:
        print("Dataset exists.")
    else:
        print("Dataset does not exist.")

    Here’s a step-by-step explanation of the PySpark code:

    1. Create a SparkSession object, which is the entry point to programming Spark with the Dataset and DataFrame API.
    2. Define the start and end dates as strings.
    3. Import the data from CSV files using the read.csv method, which returns a DataFrame.
    4. Register the DataFrames as temporary views using the createOrReplaceTempView method, so they can be queried using SQL.
    5. Join the tables using a SQL query, which is executed using the sql method.
    6. Transpose the data using the groupBy and pivot methods, which are equivalent to the SAS proc transpose step.
    7. Apply the rank function using the withColumn and rank methods, which are equivalent to the SAS proc sql step with the rank function.
    8. Validate the data by computing summary statistics using the summary method, which is equivalent to the SAS proc freq step.
    9. Check the existence of the final dataset by checking the count of rows using the count method. If the count is greater than 0, the dataset exists; otherwise, it does not exist.

    Note that the PySpark code uses the DataFrame API, which is a more concise and expressive way of working with data compared to the SAS script. Additionally, the PySpark code uses SQL queries to perform the join and rank operations, which can be more efficient than using the SAS proc sql step.

    Method 2

    Here’s the modified PySpark code that translates SAS macros to functions and adds error handling:

    Python

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import rank, col
    
    # Create a SparkSession object
    spark = SparkSession.builder.appName("My App").getOrCreate()
    
    # Define a function for importing data
    def import_data(libname, dataset, filepath):
        try:
            df = spark.read.csv(filepath, header=True, inferSchema=True)
            df.createOrReplaceTempView(f"{libname}_{dataset}")
            return df
        except Exception as e:
            print(f"Error importing data: {e}")
            return None
    
    # Define a function for joining tables
    def join_tables(libname, table1, table2, out_table, join_condition):
        try:
            spark.sql(f"""
                CREATE TABLE {libname}_{out_table} AS
                SELECT a.*, b.*
                FROM {libname}_{table1} a
                LEFT JOIN {libname}_{table2} b
                ON {join_condition}
            """)
            return spark.table(f"{libname}_{out_table}")
        except Exception as e:
            print(f"Error joining tables: {e}")
            return None
    
    # Define a function for transposing data
    def transpose_table(libname, in_table, out_table, id_var, var):
        try:
            spark.sql(f"""
                CREATE TABLE {libname}_{out_table} AS
                SELECT {id_var}, {var}
                FROM {libname}_{in_table}
                GROUP BY {id_var}
                PIVOT ({var})
            """)
            return spark.table(f"{libname}_{out_table}")
        except Exception as e:
            print(f"Error transposing data: {e}")
            return None
    
    # Define a function for applying the rank function
    def apply_rank(libname, table, var, out_table):
        try:
            spark.sql(f"""
                CREATE TABLE {libname}_{out_table} AS
                SELECT *, RANK() OVER (ORDER BY {var} DESC) AS rank
                FROM {libname}_{table}
            """)
            return spark.table(f"{libname}_{out_table}")
        except Exception as e:
            print(f"Error applying rank function: {e}")
            return None
    
    # Define a function for validating data
    def validate_data(libname, table, var):
        try:
            spark.sql(f"""
                SELECT {var}, COUNT(*) AS count
                FROM {libname}_{table}
                GROUP BY {var}
            """).show()
        except Exception as e:
            print(f"Error validating data: {e}")
    
    # Define a function for checking the existence of a dataset
    def check_dataset(libname, dataset):
        try:
            if spark.catalog.tableExists(f"{libname}_{dataset}"):
                print("Dataset exists.")
            else:
                print("Dataset does not exist.")
        except Exception as e:
            print(f"Error checking dataset existence: {e}")
    
    # Call the functions
    table1 = import_data("work", "table1", "/path/to/table1.csv")
    table2 = import_data("work", "table2", "/path/to/table2.csv")
    
    merged_table = join_tables("work", "table1", "table2", "merged_table", "a.id = b.id")
    
    transposed_table = transpose_table("work", "merged_table", "transposed_table", "id", "some_var")
    
    ranked_table = apply_rank("work", "transposed_table", "some_var", "ranked_table")
    
    validate_data("work", "ranked_table", "some_var")
    
    check_dataset("work", "ranked_table")

    In this modified code, I’ve defined separate functions for each of the SAS macros, and added error handling using try-except blocks. If an error occurs during the execution of a function, the error message is printed and the function returns None.

    Method 3

    Here’s the modified PySpark code that uses the sql method for the transpose and rank operations, and defines classes, objects, and functions to mimic the SAS macros:

    Python

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import rank, col
    
    # Create a SparkSession object
    spark = SparkSession.builder.appName("My App").getOrCreate()
    
    # Define the start and end dates
    start_date = "2023-01-01"
    end_date = "2023-12-31"
    
    # Define a class for importing data
    class ImportData:
        def __init__(self, libname, dataset, filepath):
            self.libname = libname
            self.dataset = dataset
            self.filepath = filepath
    
        def execute(self):
            df = spark.read.csv(self.filepath, header=True, inferSchema=True)
            df.createOrReplaceTempView(f"{self.libname}_{self.dataset}")
            return df
    
    # Define a class for joining tables
    class JoinTables:
        def __init__(self, libname, table1, table2, out_table, join_condition):
            self.libname = libname
            self.table1 = table1
            self.table2 = table2
            self.out_table = out_table
            self.join_condition = join_condition
    
        def execute(self):
            spark.sql(f"""
                CREATE TABLE {self.libname}_{self.out_table} AS
                SELECT a.*, b.*
                FROM {self.libname}_{self.table1} a
                LEFT JOIN {self.libname}_{self.table2} b
                ON {self.join_condition}
            """)
            return spark.table(f"{self.libname}_{self.out_table}")
    
    # Define a class for transposing data
    class TransposeTable:
        def __init__(self, libname, in_table, out_table, id_var, var):
            self.libname = libname
            self.in_table = in_table
            self.out_table = out_table
            self.id_var = id_var
            self.var = var
    
        def execute(self):
            spark.sql(f"""
                CREATE TABLE {self.libname}_{self.out_table} AS
                SELECT {self.id_var}, {self.var}
                FROM {self.libname}_{self.in_table}
                GROUP BY {self.id_var}
                PIVOT ({self.var})
            """)
            return spark.table(f"{self.libname}_{self.out_table}")
    
    # Define a class for applying the rank function
    class ApplyRank:
        def __init__(self, libname, table, var, out_table):
            self.libname = libname
            self.table = table
            self.var = var
            self.out_table = out_table
    
        def execute(self):
            spark.sql(f"""
                CREATE TABLE {self.libname}_{self.out_table} AS
                SELECT *, RANK() OVER (ORDER BY {self.var} DESC) AS rank
                FROM {self.libname}_{self.table}
            """)
            return spark.table(f"{self.libname}_{self.out_table}")
    
    # Define a class for validating data
    class ValidateData:
        def __init__(self, libname, table, var):
            self.libname = libname
            self.table = table
            self.var = var
    
        def execute(self):
            spark.sql(f"""
                SELECT {self.var}, COUNT(*) AS count
                FROM {self.libname}_{self.table}
                GROUP BY {self.var}
            """).show()
    
    # Define a class for checking the existence of a dataset
    class CheckDataset:
        def __init__(self, libname, dataset):
            self.libname = libname
            self.dataset = dataset
    
        def execute(self):
            if spark.catalog.tableExists(f"{self.libname}_{self.dataset}"):
                print("Dataset exists.")
            else:
                print("Dataset does not exist.")
    
    # Create instances of the classes and execute the methods
    import_data = ImportData("work", "table1", "/path/to/table1.csv")
    table1 = import_data.execute()
    
    import_data = ImportData("work", "table2", "/path/to/table2.csv")
    table2 = import_data.execute()
    
    join_tables = JoinTables("work", "table1", "table2", "merged_table", "a.id = b.id")
    merged_table = join_tables.execute()
    
    transpose_table = TransposeTable("work", "merged_table", "transposed_table", "id", "some_var")
    transposed_table = transpose_table.execute()
    
    apply_rank = ApplyRank("work", "transposed_table", "some_var", "ranked_table")
    ranked_table = apply_rank.execute()
    
    validate_data = ValidateData("work", "ranked_table", "some_var")
    validate_data.execute()
    
    check_dataset = CheckDataset("work", "ranked_table")
    check_dataset.execute()

    In this modified code, we define classes for each of the SAS macros, and create instances of these classes to execute

    step by step explanation is still left

    Here is a step-by-step explanation of the modified PySpark code:

    Step 1: Create a SparkSession object

    spark = SparkSession.builder.appName("My App").getOrCreate()

    This line creates a SparkSession object, which is the entry point to programming Spark with the Dataset and DataFrame API.

    Step 2: Define the start and end dates

    start_date = "2023-01-01"
    end_date = "2023-12-31"

    These lines define the start and end dates as strings.

    Step 3: Define a class for importing data

    class ImportData:
        def __init__(self, libname, dataset, filepath):
            self.libname = libname
            self.dataset = dataset
            self.filepath = filepath
    
        def execute(self):
            df = spark.read.csv(self.filepath, header=True, inferSchema=True)
            df.createOrReplaceTempView(f"{self.libname}_{self.dataset}")
            return df

    This class defines a method for importing data from a CSV file. The execute method reads the CSV file using the spark.read.csv method, creates a temporary view using the createOrReplaceTempView method, and returns the DataFrame.

    Step 4: Define a class for joining tables

    class JoinTables:
        def __init__(self, libname, table1, table2, out_table, join_condition):
            self.libname = libname
            self.table1 = table1
            self.table2 = table2
            self.out_table = out_table
            self.join_condition = join_condition
    
        def execute(self):
            spark.sql(f"""
                CREATE TABLE {self.libname}_{self.out_table} AS
                SELECT a.*, b.*
                FROM {self.libname}_{self.table1} a
                LEFT JOIN {self.libname}_{self.table2} b
                ON {self.join_condition}
            """)
            return spark.table(f"{self.libname}_{self.out_table}")

    This class defines a method for joining two tables. The execute method uses the spark.sql method to execute a SQL query that joins the two tables, creates a new table with the joined data, and returns the resulting DataFrame.

    Step 5: Define a class for transposing data

    class TransposeTable:
        def __init__(self, libname, in_table, out_table, id_var, var):
            self.libname = libname
            self.in_table = in_table
            self.out_table = out_table
            self.id_var = id_var
            self.var = var
    
        def execute(self):
            spark.sql(f"""
                CREATE TABLE {self.libname}_{self.out_table} AS
                SELECT {self.id_var}, {self.var}
                FROM {self.libname}_{self.in_table}
                GROUP BY {self.id_var}
                PIVOT ({self.var})""")
            return spark.table(f"{self.libname}_{self.out_table}")

    This class defines a method for transposing data. The execute method uses the spark.sql method to execute a SQL query that transposes the data, creates a new table with the transposed data, and returns the resulting DataFrame.

    Step 6: Define a class for applying the rank function

    class ApplyRank:
        def __init__(self, libname, table, var, out_table):
            self.libname = libname
            self.table = table
            self.var = var
            self.out_table = out_table
    
        def execute(self):
            spark.sql(f"""
                CREATE TABLE {self.libname}_{self.out_table} AS
                SELECT *, RANK() OVER (ORDER BY {self.var} DESC) AS rank
                FROM {self.libname}_{self.table}
            """)
            return spark.table(f"{self.libname}_{self.out_table}")

    This class defines a method for applying the rank function. The execute method uses the spark.sql method to execute a SQL query that applies the rank function, creates a new table with the ranked data, and returns the resulting DataFrame.

    Step 7: Define a class for validating data

    class ValidateData:
        def __init__(self, libname, table, var):
            self.libname = libname
            self.table = table
            self.var = var
    
        def execute(self):
            spark.sql(f"""
                SELECT {self.var}, COUNT(*) AS count
                FROM {self.libname}_{self.table}
                GROUP BY {self.var}
            """).show()

    This class defines a method for validating data. The execute method uses the spark.sql method to execute a SQL query that groups the data by the specified variable and counts the number of rows in each group, and displays the results.


    for dividing a monthly PySpark script into multiple code steps. Each step will be saved in the code column of a control DataFrame and executed sequentially. The script will include error handling and pre-checks to ensure source tables are present or Oracle tables have data before starting each step. The control DataFrame and status DataFrame for each month will be appended to a Hive or Oracle table to maintain historical data.

    Here is a detailed approach for dividing a monthly PySpark script into multiple code steps. Each step will be saved in the code column of a control DataFrame and executed sequentially. The script will include error handling and pre-checks to ensure source tables are present or Oracle tables have data before starting each step. The control DataFrame and status DataFrame for each month will be appended to a Hive or Oracle table to maintain historical data.

    Step 1: Prepare the Control DataFrame with Code Steps

    Sample Monthly PySpark Script

    Below is a sample PySpark script that reads data from Hive tables, performs transformations, and writes the output to Hive tables.

    Sample PySpark Script

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col
    from datetime import datetime
    
    spark = SparkSession.builder
        .appName("Monthly PySpark Script")
        .enableHiveSupport()
        .getOrCreate()
    
    # Define the current month in MMyyyy format
    current_month = datetime.now().strftime("%m%Y")
    
    # Sample code steps to be executed sequentially
    code_steps = [
        # Step 1: Check if source Hive table exists and has data
        f"""
        source_table = "source_db.source_table"
        if not spark.catalog.tableExists(source_table):
            raise ValueError("Source table does not exist.")
        df = spark.table(source_table)
        if df.count() == 0:
            raise ValueError("Source table is empty.")
        """,
    
        # Step 2: Read data from an Oracle table (Assuming JDBC connection properties are set)
        f"""
        oracle_url = "jdbc:oracle:thin:@hostname:port:service_name"
        oracle_properties = {{
            "user": "username",
            "password": "password"
        }}
        oracle_df = spark.read.jdbc(oracle_url, "oracle_schema.oracle_table",   properties=oracle_properties)
        if oracle_df.count() == 0:
            raise ValueError("Oracle table is empty.")
        print("Data read from Oracle table:", oracle_df.count(), "rows")
        """,
    
        # Step 3: Perform a transformation
        f"""
        transformed_df = df.withColumn("new_col", col("existing_col") * 2)
        print("Transformation applied:", transformed_df.count(), "rows")
        """,
    
        # Step 4: Join with Oracle data
        f"""
        final_df = transformed_df.join(oracle_df, "join_key")
        print("Joined data:", final_df.count(), "rows")
        """,
    
        # Step 5: Write data to the target Hive table
        f"""
        target_table = "target_db.target_table"
        final_df.write.mode("overwrite").saveAsTable(target_table)
        print("Data written to target table")
        """
    ]
    
    # Create a Control DataFrame
    from pyspark.sql import Row
    control_data = [Row(serial_no=i+1, code=step, month_period=current_month) for i, step in enumerate(code_steps)]
    control_df = spark.createDataFrame(control_data)
    control_df.show(truncate=False)
    

    Step 2: Create a Status Table

    Create a status table to store the execution status of each code step. This table can be created in Hive for simplicity.

    # Create a status DataFrame
    status_schema = "serial_no INT, status STRING, message STRING, month_period STRING, row_count INT"
    spark.sql(f"CREATE TABLE IF NOT EXISTS control_db.status_table ({status_schema})")
    

    Step 3: Execute Each Code Step with Error Handling and Status Tracking

    Execute each code step sequentially, log the status, any error messages, and the row count of the output tables.

    # Function to log status
    def log_status(serial_no, status, message, row_count=0):
    spark.sql(f"""
    INSERT INTO control_db.status_table
    VALUES ({serial_no}, '{status}', '{message}', '{current_month}', {row_count})
    """)

    # Execute each code snippet
    code_snippets = control_df.select("serial_no", "code").collect()

    for row in code_snippets:
    serial_no = row["serial_no"]
    code_snippet = row["code"]

    try:
    print(f"Executing step {serial_no}...")
    exec(code_snippet)

    # Fetch the row count of the output table if this step writes to a table
    if "write.mode" in code_snippet:
    output_table = code_snippet.split('"')[1]
    row_count = spark.table(output_table).count()
    else:
    row_count = 0

    log_status(serial_no, "SUCCESS", "Executed successfully", row_count)
    except Exception as e:
    error_message = str(e).replace("'", "")
    log_status(serial_no, "FAILED", error_message)
    print(f"Error executing step {serial_no}: {error_message}")
    break

    # Example to display the status table
    spark.sql("SELECT * FROM control_db.status_table").show()

    Full Example Script

    Here’s the complete script:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col
    from datetime import datetime
    from pyspark.sql import Row

    # Initialize Spark session
    spark = SparkSession.builder
    .appName("Execute Code Snippets from Control DataFrame")
    .enableHiveSupport()
    .getOrCreate()

    # Define the current month in MMyyyy format
    current_month = datetime.now().strftime("%m%Y")

    # Define code snippets
    code_steps = [
    # Step 1: Check if source Hive table exists and has data
    f"""
    source_table = "source_db.source_table"
    if not spark.catalog.tableExists(source_table):
    raise ValueError("Source table does not exist.")
    df = spark.table(source_table)
    if df.count() == 0:
    raise ValueError("Source table is empty.")
    """,

    # Step 2: Read data from an Oracle table
    f"""
    oracle_url = "jdbc:oracle:thin:@hostname:port:service_name"
    oracle_properties = {{
    "user": "username",
    "password": "password"
    }}
    oracle_df = spark.read.jdbc(oracle_url, "oracle_schema.oracle_table", properties=oracle_properties)
    if oracle_df.count() == 0:
    raise ValueError("Oracle table is empty.")
    print("Data read from Oracle table:", oracle_df.count(), "rows")
    """,

    # Step 3: Perform a transformation
    f"""
    transformed_df = df.withColumn("new_col", col("existing_col") * 2)
    print("Transformation applied:", transformed_df.count(), "rows")
    """,

    # Step 4: Join with Oracle data
    f"""
    final_df = transformed_df.join(oracle_df, "join_key")
    print("Joined data:", final_df.count(), "rows")
    """,

    # Step 5: Write data to the target Hive table
    f"""
    target_table = "target_db.target_table"
    final_df.write.mode("overwrite").saveAsTable(target_table)
    print("Data written to target table")
    """
    ]

    # Create a Control DataFrame
    control_data = [Row(serial_no=i+1, code=step, month_period=current_month) for i, step in enumerate(code_steps)]
    control_df = spark.createDataFrame(control_data)
    control_df.show(truncate=False)

    # Create a status table
    status_schema = "serial_no INT, status STRING, message STRING, month_period STRING, row_count INT"
    spark.sql(f"CREATE TABLE IF NOT EXISTS control_db.status_table ({status_schema})")

    # Function to log status
    def log_status(serial_no, status, message, row_count=0):
    spark.sql(f"""
    INSERT INTO control_db.status_table
    VALUES ({serial_no}, '{status}', '{message}', '{current_month}', {row_count})
    """)

    # Execute each code snippet
    code_snippets = control_df.select("serial_no", "code").collect()

    for row in code_snippets:
    serial_no = row["serial_no"]
    code_snippet = row["code"]

    try:
    print(f"Executing step {serial_no}...")
    exec(code_snippet)

    # Fetch the row count of the output table if this step writes to a table
    if "write.mode" in code_snippet:
    output_table = code_snippet.split('"')[1]
    row_count = spark.table(output_table).count()
    else:
    row_count = 0

    log_status(serial_no, "SUCCESS", "Executed successfully", row_count)
    except Exception as e:
    error_message = str(e).replace("'", "")
    log_status(serial_no, "FAILED", error_message)
    print(f"Error executing step {serial_no}: {error_message}")
    break

    # Example to display the status table
    spark.sql("SELECT * FROM control_db.status_table").show()

    To execute dynamic code snippets stored in a DataFrame (or any other source) in PySpark, you can use the exec function. This can be particularly useful when you have a control table that stores PySpark code snippets to be executed sequentially. Below is a complete example of how to achieve this, including error handling, logging, and updating status.

    Example: Dynamic Code Execution in PySpark

    1. Initialize Spark Session

    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    from datetime import datetime

    # Initialize Spark session
    spark = SparkSession.builder
    .appName("PySpark Dynamic Code Execution Example")
    .enableHiveSupport()
    .getOrCreate()

    2. Create Sample Control DataFrame

    # Sample control DataFrame
    control_data = [
    (1, "df1 = spark.range(0, 10)"),
    (2, "df2 = df1.withColumn('square', F.col('id') ** 2)"),
    (3, "df2.show()")
    ]
    control_columns = ["serial_no", "code_snippet"]

    control_df = spark.createDataFrame(control_data, control_columns)
    control_df.show(truncate=False)

    3. Create Log and Status DataFrames

    # Define schema for log and status tables
    log_schema = "timestamp STRING, message STRING, level STRING"
    status_schema = "step INT, status STRING, row_count INT, timestamp STRING"

    # Initialize empty log and status DataFrames
    log_df = spark.createDataFrame([], schema=log_schema)
    status_df = spark.createDataFrame([], schema=status_schema)

    4. Define Functions for Error Handling and Logging

    # Function to log messages
    def log(message, level="INFO"):
    global log_df
    log_entry = [(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), message, level)]
    log_df = log_df.union(spark.createDataFrame(log_entry, schema=log_schema))

    # Function to update status
    def update_status(step, status, row_count=0):
    global status_df
    status_entry = [(step, status, row_count, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))]
    status_df = status_df.union(spark.createDataFrame(status_entry, schema=status_schema))

    5. Execute Code Snippets Sequentially

    # Function to execute code snippets
    def execute_code(control_df):
    for row in control_df.collect():
    step = row["serial_no"]
    code = row["code_snippet"]
    try:
    # Execute the code snippet
    exec(code, globals())
    log(f"Step {step} executed successfully", level="INFO")
    # Optionally, you can get the row count of a DataFrame if the code produces one
    row_count = eval(code.split('=')[0].strip()).count() if '=' in code else 0
    update_status(step, "SUCCESS", row_count)
    except Exception as e:
    log(f"Step {step} failed: {e}", level="ERROR")
    update_status(step, "FAILED")
    break

    # Execute the control DataFrame
    execute_code(control_df)

    # Show log and status tables
    log_df.show(truncate=False)
    status_df.show(truncate=False)

    Explanation

    1. Initialization: A Spark session is initialized, and a sample control DataFrame is created with code snippets.
    2. Log and Status DataFrames: Empty DataFrames for logs and status are initialized with appropriate schemas.
    3. Logging Functions: Functions for logging messages and updating the status are defined.
    4. Execute Code Snippets: The execute_code function iterates over the control DataFrame, executing each code snippet using exec. It logs the execution status and updates the status table.
    5. Execution and Display: The control DataFrame is executed, and the log and status tables are displayed.

    Scheduling the Script

    To schedule the script to run automatically, you can use scheduling tools like Apache Airflow, cron jobs, or any other job scheduler that fits your environment. Here’s an example of how you might schedule it using a simple cron job in Linux:

    1. Create a Python script (e.g., pyspark_script.py) with the above code.
    2. Add a cron job to execute the script at the desired interval.
    bashCopy code# Open the cron table for editing
    crontab -e
    
    # Add a line to run the script at the desired schedule (e.g., every day at midnight)
    0 0 * * * /path/to/spark/bin/spark-submit /path/to/pyspark_script.py
    

    This setup will run your PySpark script automatically according to the schedule specified in the cron job.


    Building a ETL Data pipeline in Pyspark and using Pandas and Matplotlib for Further Processing

    Project Alert:- Building a ETL Data pipeline in Pyspark and using Pandas and Matplotlib for Further Processing. For Deployment we will consider using Bitbucket and Genkins.

    We will build a Data pipeline from BDL Reading Hive Tables in Pyspark and executing Pyspark scrip for Complex Transformations on Data via Py spark Sql and Dataframe ApI. Some of our sources will be from Oracle Tables and CSV files stored in Server specific Location. We will read these and join with our processed BDL data to add more data information( Variables). Our Target tables are BDL Hive tables in paraquet format saved in another schema. We will also transform Big spark dataframes to consolidated data to be consumed as Pandas Dataframes which will be used for analysis using Pandas and Visualisation with Matplotlib. These data can be saved as CSV, Excel or Oracle Tables.

    tep 1: Environment Setup

    1. Spark Session Initialization: Set up Spark with Hive support.
    2. Dependencies: Ensure you have the necessary libraries installed.

    Step 2: Data Extraction

    1. Hive Tables: Read from Hive.
    2. Oracle Tables: Use JDBC to read from Oracle.
    3. CSV Files: Read from local/remote storage.

    Step 3: Data Transformation

    1. PySpark SQL and DataFrame API: Perform transformations.
    2. Joining Data: Combine data from different sources.

    Step 4: Data Loading

    1. Write to Hive: Save results in Parquet format.
    2. Convert to Pandas: For further analysis.
    3. Save as CSV/Excel/Oracle: Export data for other uses.

    Step 5: Data Analysis and Visualization

    1. Pandas for Analysis: Use Pandas DataFrame.
    2. Matplotlib for Visualization: Create charts and graphs.

    Step 6: Deployment with Bitbucket and Jenkins

    1. Version Control with Bitbucket: Push code to Bitbucket.
    2. CI/CD with Jenkins: Automate deployment.

    Example Code

    Spark Session Initialization

    from pyspark.sql import SparkSession

    spark = SparkSession.builder
    .appName("ETL Pipeline")
    .enableHiveSupport()
    .getOrCreate()

    Data Extraction

    # Reading from Hive
    hive_df = spark.sql("SELECT * FROM bdl_database.bdl_table")

    # Reading from Oracle
    oracle_df = spark.read.format("jdbc").options(
    url="jdbc:oracle:thin:@//your_oracle_host:1521/your_oracle_db",
    driver="oracle.jdbc.driver.OracleDriver",
    dbtable="your_oracle_table",
    user="your_user",
    password="your_password"
    ).load()

    # Reading from CSV
    csv_df = spark.read.csv("path/to/your/csvfile.csv", header=True, inferSchema=True)

    Data Transformation

    # Example Transformation
    hive_df_filtered = hive_df.filter(hive_df["column_name"] > 0)
    oracle_df_selected = oracle_df.select("col1", "col2")
    csv_df_transformed = csv_df.withColumn("new_col", csv_df["col3"] * 2)

    # Joining Data
    joined_df = hive_df_filtered.join(oracle_df_selected, "id", "inner")
    .join(csv_df_transformed, "id", "inner")

    Data Loading

    #Save to Hive
    joined_df.write.mode("overwrite").parquet("path/to/hive_table")

    # Convert to Pandas for further analysis
    pandas_df = joined_df.toPandas()

    # Save as CSV and Excel
    pandas_df.to_csv("path/to/save/yourdata.csv", index=False)
    pandas_df.to_excel("path/to/save/yourdata.xlsx", index=False)

    Data Analysis and Visualization

    import matplotlib.pyplot as plt
    import pandas as pd

    # Pandas DataFrame Analysis
    print(pandas_df.describe())

    # Visualization
    plt.figure(figsize=(10, 6))
    pandas_df["your_column"].hist()
    plt.title("Histogram of Your Column")
    plt.xlabel("Values")
    plt.ylabel("Frequency")
    plt.show()

    Deployment with Bitbucket and Jenkins

    1. Push Code to Bitbucket:
      • Initialize a Git repository.
      • Add your code and commit.
      • Push to your Bitbucket repository.
    2. Set Up Jenkins for CI/CD:
      • Install Jenkins and necessary plugins.
      • Create a new Jenkins job.
      • Configure the job to pull from your Bitbucket repository.
      • Add build steps to run your PySpark script.

    Jenkins Pipeline Script Example

    pipeline {
    agent any

    stages {
    stage('Clone Repository') {
    steps {
    git 'https://bitbucket.org/your_repo.git'
    }
    }
    stage('Build and Test') {
    steps {
    sh 'spark-submit --master local[2] your_script.py'
    }
    }
    stage('Deploy') {
    steps {
    // Add deployment steps here
    }
    }
    }
    }


    Simplified Version of Complete Pipeline:-

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import *
    import pandas as pd
    import matplotlib.pyplot as plt

    # Initialize Spark Session
    spark = SparkSession.builder
    .appName("DataPipeline")
    .enableHiveSupport()
    .getOrCreate()

    # Read Data from Hive
    hive_df = spark.sql("SELECT * FROM bdl_database.bdl_table")

    # Read Data from Oracle
    oracle_df = spark.read.format("jdbc").options(
    url="jdbc:oracle:thin:@//your_oracle_host:1521/your_oracle_db",
    driver="oracle.jdbc.driver.OracleDriver",
    dbtable="your_oracle_table",
    user="your_user",
    password="your_password"
    ).load()

    # Read CSV Files
    csv_df = spark.read.csv("path/to/your/csvfile.csv", header=True, inferSchema=True)

    # Perform Transformations
    # Example transformation: filter and select specific columns
    hive_df_filtered = hive_df.filter(hive_df["column_name"] > 0)
    oracle_df_selected = oracle_df.select("col1", "col2")
    csv_df_transformed = csv_df.withColumn("new_col", csv_df["col3"] * 2)

    # Join DataFrames
    joined_df = hive_df_filtered.join(oracle_df_selected, hive_df_filtered["id"] == oracle_df_selected["id"], "inner")
    final_df = joined_df.join(csv_df_transformed, joined_df["id"] == csv_df_transformed["id"], "inner")

    # Save to Hive in Parquet format
    final_df.write.mode("overwrite").parquet("path/to/hive_table")

    # Convert to Pandas for further analysis
    pandas_df = final_df.toPandas()

    # Save Pandas DataFrame as CSV and Excel
    pandas_df.to_csv("path/to/save/yourdata.csv", index=False)
    pandas_df.to_excel("path/to/save/yourdata.xlsx", index=False)

    # Visualize Data using Matplotlib
    plt.figure(figsize=(10, 6))
    pandas_df["your_column"].hist()
    plt.title("Histogram of Your Column")
    plt.xlabel("Values")
    plt.ylabel("Frequency")
    plt.show()

    # Further Analysis with Pandas
    print(pandas_df.describe())

    # Close Spark Session
    spark.stop()
  • String manipulation is a common task in data processing. PySpark provides a variety of built-in functions for manipulating string columns in DataFrames. Below, we explore some of the most useful string manipulation functions and demonstrate how to use them with examples.

    Common String Manipulation Functions

    1. concat: Concatenates multiple string columns into one.
    2. substring: Extracts a substring from a string column.
    3. length: Computes the length of a string column.
    4. trimltrimrtrim: Trims whitespace from strings.
    5. upperlower: Converts strings to upper or lower case.
    6. regexp_replace: Replaces substrings matching a regex pattern.
    7. regexp_extract: Extracts substrings matching a regex pattern.
    8. split: Splits a string column based on a delimiter.
    9. replace: Replaces all occurrences of a substring with another substring.
    10. translate: Replaces characters in a string based on a mapping.

    Example Usage

    1. Concatenation

    Syntax:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import concat, lit

    # Initialize Spark session
    spark = SparkSession.builder.appName("string_manipulation").getOrCreate()

    # Sample data
    data = [("John", "Doe"), ("Jane", "Smith")]
    columns = ["first_name", "last_name"]

    # Create DataFrame
    df = spark.createDataFrame(data, columns)

    # Concatenate first and last names
    df = df.withColumn("full_name", concat(df.first_name, lit(" "), df.last_name))

    # Show result
    df.show()

    2. Substring Extraction

    Syntax:

    from pyspark.sql.functions import substring

    # Extract first three characters of first_name
    df = df.withColumn("initials", substring(df.first_name, 1, 3))

    # Show result
    df.show()

    3. Length Calculation

    Syntax:

    from pyspark.sql.functions import length
    
    # Calculate length of first_name
    df = df.withColumn("name_length", length(df.first_name))
    
    # Show result
    df.show()

    In PySpark, the length() function counts all characters, including:

    • Leading/trailing spaces
    • Special characters (!@#$%^, etc.)
    • Unicode characters
    • Numbers

    ✅ Example: Count Characters with Spaces and Specials

    from pyspark.sql.functions import col, length
    
    df = spark.createDataFrame([
        (" Alice ",),     # Leading/trailing space
        ("A$ha",),        # Special character
        ("  Bob  ",),     # More spaces
        ("Émilie",),      # Unicode
        ("",),            # Empty string
        (None,)           # NULL value
    ], ["first_name"])
    
    df_with_len = df.withColumn("name_length", length(col("first_name")))
    df_with_len.show(truncate=False)
    

    ✅ Output:

    +-----------+------------+
    |first_name |name_length |
    +-----------+------------+
    | Alice     |7           |
    |A$ha       |4           |
    |  Bob      |7           |
    |Émilie     |6           |
    |           |0           |
    |null       |null        |
    +-----------+------------+
    

    🧠 Key Notes:

    • length() is character count, not byte count.
    • It includes all types of characters (whitespace, special, Unicode).
    • Use trim() if you want to remove spaces before counting.

    ⚠️ Compare With Trim:

    from pyspark.sql.functions import trim
    
    df_trimmed = df.withColumn("trimmed_length", length(trim(col("first_name"))))
    df_trimmed.show()
    

    4. Trimming

    Syntax:

    from pyspark.sql.functions import trim, ltrim, rtrim

    # Trim whitespace from first_name
    df = df.withColumn("trimmed_name", trim(df.first_name))

    # Show result
    df.show()

    5. Case Conversion

    Syntax:

    from pyspark.sql.functions import upper, lower

    # Convert first_name to upper case
    df = df.withColumn("upper_name", upper(df.first_name))

    # Convert last_name to lower case
    df = df.withColumn("lower_name", lower(df.last_name))

    # Show result
    df.show()

    6. Regex Replace

    Syntax:

    from pyspark.sql.functions import regexp_replace
    
    # Replace all digits with asterisks
    df = df.withColumn("masked_name", regexp_replace(df.first_name, "d", "*"))
    
    # Show result
    df.show()
    

    7. Regex Extract

    Syntax:

    from pyspark.sql.functions import regexp_extract

    # Extract digits from first_name
    df = df.withColumn("extracted_digits", regexp_extract(df.first_name, "(d+)", 1))

    # Show result
    df.show()

    8. Split

    from pyspark.sql.functions import split
    
    # Split full_name into first and last name
    df = df.withColumn("split_name", split(df.full_name, " "))
    
    # Show result
    df.select("split_name").show(truncate=False)
    

    9. Replace

    from pyspark.sql.functions import expr
    
    # Replace 'Doe' with 'Smith' in last_name
    df = df.withColumn("updated_last_name", expr("replace(last_name, 'Doe', 'Smith')"))
    
    # Show result
    df.show()
    

    10. Translate

    Syntax:

    from pyspark.sql.functions import translate

    # Translate 'o' to '0' in last_name
    df = df.withColumn("translated_last_name", translate(df.last_name, "o", "0"))

    # Show result
    df.show()

    Comprehensive Example: Combining Multiple String Manipulations

    Let’s create a project that combines multiple string manipulation operations on a DataFrame.

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import concat, lit, substring, length, trim, upper, lower, regexp_replace, regexp_extract, split, expr, translate

    # Initialize Spark session
    spark = SparkSession.builder.appName("string_manipulation").getOrCreate()

    # Sample data
    data = [("John123", "Doe456"), ("Jane789", "Smith012")]
    columns = ["first_name", "last_name"]

    # Create DataFrame
    df = spark.createDataFrame(data, columns)

    # Perform various string manipulations
    df = df.withColumn("full_name", concat(df.first_name, lit(" "), df.last_name))
    df = df.withColumn("initials", substring(df.first_name, 1, 3))
    df = df.withColumn("name_length", length(df.first_name))
    df = df.withColumn("trimmed_name", trim(df.first_name))
    df = df.withColumn("upper_name", upper(df.first_name))
    df = df.withColumn("lower_name", lower(df.last_name))
    df = df.withColumn("masked_name", regexp_replace(df.first_name, "d", "*"))
    df = df.withColumn("extracted_digits", regexp_extract(df.first_name, "(d+)", 1))
    df = df.withColumn("split_name", split(df.full_name, " "))
    df = df.withColumn("updated_last_name", expr("replace(last_name, 'Doe456', 'Smith')"))
    df = df.withColumn("translated_last_name", translate(df.last_name, "456", "789"))

    # Show result
    df.show(truncate=False)

    Explanation

    1. Concatenation: Combines the first_name and last_name with a space.
    2. Substring Extraction: Extracts the first three characters of first_name.
    3. Length Calculation: Computes the length of first_name.
    4. Trimming: Trims any whitespace around first_name.
    5. Case Conversion: Converts first_name to upper case and last_name to lower case.
    6. Regex Replace: Replaces all digits in first_name with asterisks.
    7. Regex Extract: Extracts digits from first_name.
    8. Split: Splits the full_name into a list of first_name and last_name.
    9. Replace: Replaces ‘Doe456’ with ‘Smith’ in last_name.
    10. Translate: Translates ‘456’ to ‘789’ in last_name.

    These examples demonstrate the versatility of string manipulation functions in PySpark, allowing for complex transformations and processing of text data.

    Here’s a table summarizing common string manipulation functions in PySpark SQL, including regular expressions (regex):

    FunctionDescriptionExample (Input: “Hello World”)
    initcap(str)Converts first letter of each word to uppercase“Hello World” -> “Hello World” (already capitalized words remain unchanged)
    lower(str)Converts all characters to lowercase“Hello World” -> “hello world”
    upper(str)Converts all characters to uppercase“Hello World” -> “HELLO WORLD”
    lpad(str, length, pad_string)Pads a string to the left with a specified string“Hello World”, 15, “-” -> “—Hello World”
    rpad(str, length, pad_string)Pads a string to the right with a specified string“Hello World”, 15, “-” -> “Hello World—“
    ltrim(str)Removes leading whitespace characters” Hello World” -> “Hello World”
    rtrim(str)Removes trailing whitespace characters“Hello World ” -> “Hello World”
    trim(str)Removes leading and trailing whitespace characters” Hello World ” -> “Hello World”
    length(str)Returns the length (number of characters) of a string“Hello World” -> 11
    substr(str, pos, len)Extracts a substring from a string“Hello World”, 7, 5 -> “World”
    instr(str, substr)Returns the starting position of a substring within a string (0 if not found)“Hello World”, “World” -> 7
    concat(str1, str2…)Concatenates (joins) multiple strings“Hello”, ” “, “World” -> “Hello World”
    concat_ws(sep, str1, str2…)Concatenates strings with a specified separator“Hello”, “,”, “World” -> “Hello,World”
    regexp_extract(str, pattern)Extracts a pattern (regular expression) from a string“Hello_World”, r”[^]+” -> “_World”
    regexp_replace(str, pattern, replacement)Replaces a pattern (regular expression) in a string with a replacement string“Hello_World”, r”_”, ” ” -> “Hello World”
    split(str, delimiter)Splits a string into an array based on a delimiter“Hello,World,How”, “,” -> [“Hello”, “World”, “How”]
    array_join(arr, sep)Joins the elements of an array into a string with a specified separator[“Hello”, “World”, “How”], “,” -> “Hello,World,How”
    soundex(str)Returns the Soundex code of a string (phonetic equivalent)“Hello” -> “H400”
    translate(str, remove, replace)Removes characters from a string and replaces them with“Hello World”, “e”, “” -> “Hllo World”
    overlay(str, overlay_str, pos, len)Replaces a substring within a string with another“Hello World”, “there”, 6, 5 -> “Hello thered”
    reverse(str)Reverses the order of characters in a string“Hello World” -> “dlroW olleH”
    instrc(str, substr)Returns the starting position of a substring within a string (case-insensitive, 0 if not found)“Hello World”, “world” -> 7
    levenshtein(str1, str2)Calculates the Levenshtein distance (minimum number of edits) to transform one string to another“Hello”, “Jello” -> 1

    Summary in Detail:-

    a guide on how to perform common string manipulation tasks in PySpark:-

    concat: Concatenates two or more strings.

    Syntax: concat(col1, col2, ..., colN)

    Example:

    from pyspark.sql.functions import concat, col
    
    df = df.withColumn("Full Name", concat(col("First Name"), lit(" "), col("Last Name")))

    substr: Extracts a substring from a string.

    Syntax: substr(col, start, length)

    Example:

    from pyspark.sql.functions import substr, col
    
    df = df.withColumn("First Name", substr(col("Name"), 1, 4))

    split: Splits a string into an array of substrings.

    Syntax: split(col, pattern)

    Example:

    from pyspark.sql.functions import split, col
    
    df = df.withColumn("Address Parts", split(col("Address"), " "))

    regex_extract: Extracts a substring using a regular expression.

    Syntax: regex_extract(col, pattern, group)

    Example:

    from pyspark.sql.functions import regex_extract, col
    
    df = df.withColumn("Phone Number", regex_extract(col("Contact Info"), "d{3}-d{3}-d{4}", 0))

    translate: Replaces specified characters in a string.

    Syntax: translate(col, matching, replace)

    Example:

    from pyspark.sql.functions import translate, col
    
    df = df.withColumn("Clean Name", translate(col("Name"), "aeiou", "AEIOU"))

    trim: Removes leading and trailing whitespace from a string.

    Syntax: trim(col)

    Example:

    from pyspark.sql.functions import trim, col
    
    df = df.withColumn("Clean Address", trim(col("Address")))

    lower: Converts a string to lowercase.

    Syntax: lower(col)

    Example:

    from pyspark.sql.functions import lower, col
    
    df = df.withColumn("Lower Name", lower(col("Name")))

    upper: Converts a string to uppercase.

    Syntax: upper(col)

    Example:

    from pyspark.sql.functions import upper, col
    
    df = df.withColumn("Upper Name", upper(col("Name")))

    String Data Cleaning in PySpark

    Here are some common string data cleaning functions in PySpark, along with their syntax and examples:

    trim: Removes leading and trailing whitespace from a string.

    Syntax: trim(col)

    Example:

    from pyspark.sql.functions import trim, col
    
    df = df.withColumn("Clean Address", trim(col("Address")))

    regexp_replace: Replaces substrings matching a regular expression.

    Syntax: regexp_replace(col, pattern, replacement)

    Example:

    from pyspark.sql.functions import regexp_replace, col
    
    df = df.withColumn("Clean Name", regexp_replace(col("Name"), "[^a-zA-Z]", ""))

    replace: Replaces specified characters or substrings in a string.

    Syntax: replace(col, matching, replace)

    Example:

    from pyspark.sql.functions import replace, col
    
    df = df.withColumn("Clean Address", replace(col("Address"), " ", ""))

    remove_accents: Removes accents from a string.

    Syntax: remove_accents(col)

    Example:

    from pyspark.sql.functions import remove_accents, col
    
    df = df.withColumn("Clean Name", remove_accents(col("Name")))

    standardize: Standardizes a string by removing punctuation and converting to lowercase.

    Syntax: standardize(col)

    Example:

    from pyspark.sql.functions import standardize, col
    
    df = df.withColumn("Standardized Name", standardize(col("Name")))

    Pages: 1 2 3

  • ✅ What is a DataFrame in PySpark?

    A DataFrame in PySpark is a distributed collection of data organized into named columns, similar to a table in a relational database or a Pandas DataFrame.

    It is built on top of RDDs and provides:

    • Schema awareness (column names & data types)
    • High-level APIs (like select, groupBy, join)
    • Query optimization via Catalyst engine
    • Integration with SQL, Hive, Delta, Parquet, etc.

    📊 DataFrame = RDD + Schema

    Under the hood:

    DataFrame = RDD[Row] + Schema
    

    So while RDD is just distributed records (no structure), DataFrame adds:

    • Schema: like a table
    • Optimizations: Catalyst query planner
    • Better performance: due to SQL-style planning

    📎 Example: From RDD to DataFrame

    from pyspark.sql import Row
    rdd = spark.sparkContext.parallelize([Row(name="Alice", age=30), Row(name="Bob", age=25)])
    
    # Convert to DataFrame
    df = spark.createDataFrame(rdd)
    
    df.printSchema()
    df.show()
    

    📌 Output:

    root
     |-- name: string
     |-- age: long
    +-----+---+
    |name |age|
    +-----+---+
    |Alice| 30|
    |Bob  | 25|
    +-----+---+
    

    🆚 DataFrame vs RDD

    FeatureRDDDataFrame
    StructureUnstructuredStructured (schema: columns & types)
    APIsLow-level (map, filter, reduce)High-level (select, groupBy, SQL)
    PerformanceSlowerFaster (optimized via Catalyst)
    OptimizationManualAutomatic (Catalyst + Tungsten)
    Ease of UseMore codeConcise SQL-like operations
    Use CasesFine-grained control, custom opsMost ETL, Analytics, SQL workloads

    🧠 When to Use What?

    • Use RDD if:
      • You need fine-grained control (custom partitioning, complex transformations)
      • You’re doing low-level transformations or working with unstructured data
    • Use DataFrame if:
      • You want performance & ease
      • You’re working with structured/semi-structured data (JSON, CSV, Parquet, Hive)
      • You want to write SQL-like logic

    🚀 Bonus: PySpark SQL + DataFrame

    DataFrames integrate with SQL:

    df.createOrReplaceTempView("people")
    spark.sql("SELECT name FROM people WHERE age > 28").show()
    

    ✅ Summary

    • PySpark DataFrame is the go-to API for structured big data processing.
    • It’s built on RDDs, but adds structure and optimization.
    • Immutable and distributed just like RDDs, but more powerful for most real-world use cases.

    In PySpark, DataFrames are immutable, just like RDDs.


    ✅ What Does Immutable Mean?

    Once a DataFrame is created, it cannot be changed in-place. Any operation you perform returns a new DataFrame with the transformation applied.


    🔁 Example:

    df = spark.read.csv("data.csv", header=True)
    
    # filter does not change df, it returns a new DataFrame
    df_filtered = df.filter(df["price"] > 100)
    
    # df is still the original
    df.show()
    df_filtered.show()
    

    Here:

    • df is unchanged.
    • df_filtered is a new DataFrame with a subset of rows.

    🧠 Why Are DataFrames Immutable?

    • Fault tolerance: Spark builds a lineage graph, so if a node fails, Spark can recompute lost partitions.
    • Optimizations: Catalyst optimizer rewrites queries knowing transformations don’t mutate the source.
    • Parallelism: Safe to operate across clusters without race conditions.

    🔄 But You Can Reassign

    While the DataFrame object is immutable, you can reassign the variable:

    df = df.filter(df["price"] > 100)  # now df points to the new result
    

    But this is just Python variable reassignment, not mutation.


    ✅ Summary

    ConceptMutable?Notes
    RDD❌ NoImmutable
    DataFrame❌ NoImmutable, optimized via Catalyst
    Variable (Python)✅ YesCan point to a new DataFrame

    Pages: 1 2 3 4 5 6 7

  • Let us go through the Project requirement:-

    1.Let us create One or Multiple dynamic lists of variables and save it in dictionary or Array or other datastructre for further repeating use in python. Variable names are in form of dynamic names for example Month_202401 to Month_202312 for 24 months( Take these 24 month backdated or as per current month’s Progression).

    2.Later we will use this dictionary to create 24 csv files by filtering on year and month.

    3.we will also use to create arrays based on this dynamic dictionary.

    4.We will create custom excel files for 24 year months combination where column names are also in form of above 24 { year and months combination such as xyz_2404_04 to xyz_2404_12 , abc_xyz_2404_0405 to abc_xyz_2405_1201}.

    Various Versions to Achieve it:-

    import datetime
    import pandas as pd
    import os
    
    # Step 1: Generate month names for the next 24 months
    def generate_month_names():
        current_date = datetime.datetime.now()
        month_names = []
    
        for i in range(24):
            new_date = current_date + datetime.timedelta(days=30 * i)
            month_name = new_date.strftime("Month_%Y%m")
            month_names.append(month_name)
    
        return month_names
    
    # Step 2: Create variables and store them in a dictionary
    def create_variables(month_names):
        variables = {}
        for month in month_names:
            # Create some dummy data for demonstration
            data = pd.DataFrame({
                'Name': ['Alice', 'Bob', 'Charlie'],
                'Age': [25, 30, 35],
                'Month': [month] * 3
            })
            variables[month] = data
    
        return variables
    
    # Step 3: Create CSV files from the dictionary
    def create_csv_files(variables, output_directory):
        if not os.path.exists(output_directory):
            os.makedirs(output_directory)
    
        for month, data in variables.items():
            file_name = f"{output_directory}/{month}.csv"
            data.to_csv(file_name, index=False)
            print(f"Saved {file_name}")
    
    # Step 4: Create an Excel file with dynamic columns
    def create_excel_file(variables, excel_file_name):
        with pd.ExcelWriter(excel_file_name) as writer:
            for month, data in variables.items():
                data.to_excel(writer, sheet_name=month, index=False)
            print(f"Saved {excel_file_name}")
    
    # Main execution
    if __name__ == "__main__":
        # Generate month names
        month_names = generate_month_names()
    
        # Create variables
        dynamic_vars = create_variables(month_names)
    
        # Directory to save CSV files
        output_directory = "output_csv_files"
    
        # Create CSV files
        create_csv_files(dynamic_vars, output_directory)
    
        # Excel file name
        excel_file_name = "dynamic_month_data.xlsx"
    
        # Create Excel file
        create_excel_file(dynamic_vars, excel_file_name)
    
    import datetime
    import pandas as pd
    import os
    from dateutil.relativedelta import relativedelta
    
    # Step 1: Generate month names for the next 24 months
    def generate_month_names():
        current_date = datetime.datetime.now()
        month_names = []
    
        for i in range(24):
            new_date = current_date + relativedelta(months=i)
            month_name = new_date.strftime("Month_%Y%m")
            month_names.append(month_name)
    
        return month_names
    
    # Step 2: Create example data and store in a dictionary
    def create_variables(month_names):
        variables = {}
        for month in month_names:
            # Create some example data for demonstration
            data = pd.DataFrame({
                'Name': ['Alice', 'Bob', 'Charlie'],
                'Age': [25, 30, 35],
                'Month': [month] * 3
            })
            variables[month] = data
    
        return variables
    
    # Step 3: Create CSV files from the dictionary
    def create_csv_files(variables, output_directory):
        if not os.path.exists(output_directory):
            os.makedirs(output_directory)
    
        for month, data in variables.items():
            file_name = f"{output_directory}/{month}.csv"
            data.to_csv(file_name, index=False)
            print(f"Saved {file_name}")
    
    # Step 4: Create arrays based on the data stored in the dictionary
    def create_arrays(variables):
        arrays = {}
        for month, data in variables.items():
            arrays[month] = data.values
        return arrays
    
    # Step 5: Create custom Excel files for 24-month combinations
    def create_custom_excel_files(variables, excel_file_name):
        with pd.ExcelWriter(excel_file_name) as writer:
            for i, (month, data) in enumerate(variables.items()):
                # Generate custom column names
                custom_columns = [f"xyz_{month}_{j+1:02d}" for j in range(len(data.columns))]
                custom_data = data.copy()
                custom_data.columns = custom_columns
                custom_data.to_excel(writer, sheet_name=month, index=False)
    
            # Example of creating more complex column names and data combination
            for i in range(1, 25):
                month_name = f"abc_xyz_{variables[list(variables.keys())[i-1]].iloc[0]['Month']}_{variables[list(variables.keys())[i%24]].iloc[0]['Month']}"
                complex_data = pd.concat([variables[list(variables.keys())[i-1]], variables[list(variables.keys())[i%24]]], axis=1)
                complex_data.columns = [f"{month_name}_{col}" for col in complex_data.columns]
                complex_data.to_excel(writer, sheet_name=month_name, index=False)
    
            print(f"Saved {excel_file_name}")
    
    # Main execution
    if __name__ == "__main__":
        # Generate month names
        month_names = generate_month_names()
    
        # Create example data and store in a dictionary
        dynamic_vars = create_variables(month_names)
    
        # Directory to save CSV files
        output_directory = "output_csv_files"
    
        # Create CSV files
        create_csv_files(dynamic_vars, output_directory)
    
        # Create arrays based on the data stored in the dictionary
        arrays = create_arrays(dynamic_vars)
        print("Created arrays based on the dictionary.")
    
        # Excel file name
        excel_file_name = "dynamic_month_data.xlsx"
    
        # Create custom Excel files
        create_custom_excel_files(dynamic_vars, excel_file_name)
    
    from datetime import date, timedelta
    
    def generate_month_names(months):
      """
      Generates a list of dynamic variable names representing month names
      based on the provided number of months in the past.
    
      Args:
          months (int): The number of months to consider (including the current month).
    
      Returns:
          list: A list of strings representing dynamic month variable names.
      """
      current_date = date.today()
      month_names = []
      for i in range(months):
        month = current_date - timedelta(days=i * 31)  # Adjust for approximate month length
        month_name = f"Month_{month.year}{month.month:02d}"
        month_names.append(month_name)
      return month_names
    
    def create_data_structure(month_names):
      """
      Creates a dictionary where keys are dynamic month names and values are
      empty lists (to be populated with data later).
    
      Args:
          month_names (list): A list of dynamic month variable names.
    
      Returns:
          dict: A dictionary with dynamic month names as keys and empty lists as values.
      """
      data_structure = {}
      for month_name in month_names:
        data_structure[month_name] = []
      return data_structure
    
    def create_csv_files(data_structure):
      """
      Creates CSV files for each month in the data structure with appropriate column names.
    
      Args:
          data_structure (dict): A dictionary with dynamic month names as keys and lists as values.
      """
      for month_name, data in data_structure.items():
        year, month = month_name.split("_")[1:]
        # Sample data for illustration. Replace with your actual data
        data.append(["col1", "col2", "col3"])
        filename = f"{month_name}.csv"
        with open(filename, "w", newline="") as csvfile:
          writer = csv.writer(csvfile)
          writer.writerow([f"xyz_{year}{month:02d}_{d:02d}" for d in range(4, 13)])  # Sample column names
          writer.writerows(data)
    
    def create_excel_files(data_structure):
      """
      Creates custom Excel files (using a library like openpyxl) for each month,
      with dynamic column names based on year and month combinations.
    
      **Note:** This requires an external library like openpyxl for Excel manipulation.
    
      Args:
          data_structure (dict): A dictionary with dynamic month names as keys and lists as values.
      """
      # Replace this with your Excel file creation logic using a library like openpyxl
      pass
    
    # Example Usage
    months = 24
    month_names = generate_month_names(months)
    data_structure = create_data_structure(month_names)
    
    # Populate data_structure with your actual data for each month
    
    create_csv_files(data_structure)
    create_excel_files(data_structure)  # Requires an external library
    
    print("Data structures and files created successfully!")
    
  • Temporary functions allow users to define functions that are session-specific and used to encapsulate reusable logic within a database session. While both PL/SQL and Spark SQL support the concept of user-defined functions, their implementation and usage differ significantly.

    Temporary Functions in PL/SQL

    PL/SQL, primarily used with Oracle databases, allows you to create temporary or anonymous functions within a block of code. These functions are often used to perform specific tasks within a session and are not stored in the database schema permanently.

    Example: Temporary Function in PL/SQL

    Here’s an example of how to create and use a temporary function within a PL/SQL block:

    DECLARE
        -- Define a local function
        FUNCTION concatenate(first_name IN VARCHAR2, last_name IN VARCHAR2) RETURN VARCHAR2 IS
        BEGIN
            RETURN first_name || '-' || last_name;
        END concatenate;
    
    BEGIN
        -- Use the local function
        DBMS_OUTPUT.PUT_LINE(concatenate('Alice', 'Smith'));
        DBMS_OUTPUT.PUT_LINE(concatenate('Bob', 'Johnson'));
    END;
    /
    

    In this example:

    • A temporary function concatenate is defined within a PL/SQL block.
    • This function concatenates two strings with a hyphen.
    • The function is used within the same block to print concatenated names.

    Temporary Functions in Spark SQL

    Spark SQL does not support defining temporary functions directly within SQL code as PL/SQL does. Instead, Spark SQL uses user-defined functions (UDFs) registered through the Spark API (e.g., in Python or Scala). These UDFs can be registered for the duration of a Spark session and used within SQL queries.

    Example: Temporary Function in Spark SQL (Python)

    Here’s how you define and use a UDF in Spark SQL:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType
    
    # Initialize Spark session
    spark = SparkSession.builder.appName("Temporary Functions").getOrCreate()
    
    # Sample data
    data = [
        ("Alice", "Smith"),
        ("Bob", "Johnson"),
        ("Charlie", "Williams")
    ]
    
    columns = ["first_name", "last_name"]
    
    # Create DataFrame
    df = spark.createDataFrame(data, columns)
    
    # Register the DataFrame as a temporary view
    df.createOrReplaceTempView("people")
    
    # Define the UDF
    def concatenate(first, last):
        return f"{first}-{last}"
    
    # Register the UDF as a temporary function
    spark.udf.register("concatenate", concatenate, StringType())
    
    # Use the temporary function in a Spark SQL query
    result_df = spark.sql("""
    SELECT first_name, last_name, concatenate(first_name, last_name) AS full_name
    FROM people
    """)
    
    # Show the result
    result_df.show()
    

    In this example:

    • A Spark session is initialized, and a sample DataFrame is created and registered as a temporary view.
    • A UDF concatenate is defined in Python to concatenate two strings with a hyphen.
    • The UDF is registered with the Spark session and can be used as a temporary function in SQL queries.

    Comparison

    Defining Temporary Functions

    • PL/SQL:
      • Functions can be defined directly within a PL/SQL block.
      • Functions are local to the block and not stored permanently.
    • Spark SQL:
      • Functions (UDFs) are defined using the Spark API (e.g., in Python, Scala).
      • UDFs must be registered with the Spark session to be used in SQL queries.
      • Functions are session-specific but not directly written in SQL.

    Usage Scope

    • PL/SQL:
      • Temporary functions are used within the scope of the PL/SQL block in which they are defined.
    • Spark SQL:
      • UDFs are registered for the Spark session and can be used across multiple SQL queries within that session.

    Language and Flexibility

    • PL/SQL:
      • Functions are written in PL/SQL, which is closely integrated with Oracle databases.
      • Provides strong support for procedural logic.
    • Spark SQL:
      • UDFs can be written in various languages supported by Spark (e.g., Python, Scala).
      • Provides flexibility to use different programming languages and integrate complex logic from external libraries.

    While both PL/SQL and Spark SQL support the concept of temporary or session-specific functions, their implementation and usage differ. PL/SQL allows for the direct definition of temporary functions within blocks of code, providing a tightly integrated procedural approach. In contrast, Spark SQL relies on user-defined functions (UDFs) registered through the Spark API, offering flexibility and language diversity but requiring an additional step to register and use these functions in SQL queries.

  • Apache Spark, including PySpark, automatically optimizes job execution by breaking it down into stages and tasks based on data dependencies. This process is facilitated by Spark’s Directed Acyclic Graph (DAG) Scheduler, which helps in optimizing the execution plan for efficiency. Let’s break this down with a detailed example and accompanying numbers to illustrate the process.

    In Apache Spark, including PySpark, the Directed Acyclic Graph (DAG) is a fundamental concept that represents the sequence of computations that need to be performed on data. Understanding how Spark breaks down a job into stages and tasks within this DAG is crucial for grasping how Spark optimizes and executes jobs.

    Understanding DAG in PySpark

    A DAG in Spark is a logical representation of the operations to be executed. It is built when an action (like collect, save, count, etc.) is called on an RDD, DataFrame, or Dataset. The DAG contains all the transformations (like map, filter, groupBy, etc.) that Spark must perform on the data.

    Breakdown of a Job into Stages and Tasks

    1. Logical Plan:
      • When you define transformations, Spark constructs a logical plan. This plan captures the sequence of transformations on the data.
    2. Physical Plan:
      • Spark’s Catalyst optimizer converts the logical plan into a physical plan, which specifies how the operations will be executed.
    3. DAG of Stages:
      • The physical plan is divided into stages. Each stage consists of tasks that can be executed in parallel. Stages are determined by shuffle boundaries (i.e., points where data needs to be redistributed across the cluster).
    4. Tasks within Stages:
      • Each stage is further divided into tasks. A task is the smallest unit of work and is executed on a single partition of the data. Tasks within a stage are executed in parallel across the cluster.

    Example: Complex ETL Pipeline

    Scenario: Process a large dataset of user activity logs to compute user session statistics, filter erroneous data, and generate aggregated reports.

    Steps in the Pipeline:

    1. Data Ingestion:
      • Read raw logs from HDFS.
    2. Data Cleaning:
      • Filter out records with missing or malformed fields.
    3. Sessionization:
      • Group records by user and time interval to form sessions.
    4. Aggregation:
      • Compute session statistics such as total time spent and number of pages visited.

    PySpark Code Example:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, window, sum as _sum

    # Initialize Spark session
    spark = SparkSession.builder.appName("Complex ETL Pipeline").getOrCreate()

    # Data Ingestion
    df = spark.read.csv("hdfs:///path/to/input/*.csv", header=True, inferSchema=True)

    # Data Cleaning
    df_cleaned = df.filter(col("user_id").isNotNull() & col("timestamp").isNotNull())

    # Sessionization
    df_sessions = df_cleaned.groupBy("user_id", window("timestamp", "30 minutes")).agg({"page": "count"})

    # Aggregation
    df_aggregated = df_sessions.groupBy("user_id").agg(
    _sum("count(page)").alias("total_pages"),
    _sum("session_duration").alias("total_duration")
    )

    # Write results to HDFS
    df_aggregated.write.parquet("hdfs:///path/to/output/")

    How PySpark Optimizes Job Execution:

    1. Logical Plan:
      • When transformations are applied (e.g., filter, groupBy), Spark builds a logical plan. This plan is an abstract representation of the operations to be performed.
    2. Physical Plan:
      • Spark’s Catalyst optimizer converts the logical plan into a physical plan. This plan is optimized for execution by selecting the most efficient strategies for operations.
    3. DAG Scheduler:
      • The physical plan is translated into a DAG of stages and tasks. Each stage consists of tasks that can be executed in parallel.

    Breaking Down into Stages and Tasks:

    • Stage 1:
      • Tasks: Reading data from HDFS and filtering (Data Cleaning).
      • Operations: df.filter(...)
      • Number of Tasks: Equal to the number of HDFS blocks (e.g., 100 blocks -> 100 tasks).
      • Memory Usage: Depends on the size of the data being read and filtered.
    • Stage 2:
      • Tasks: Grouping data by user and session window (Sessionization).
      • Operations: df_sessions = df_cleaned.groupBy("user_id", window(...))
      • Number of Tasks: Determined by the number of unique keys (users and time intervals) and partitioning strategy (e.g., 200 partitions).
      • Memory Usage: Depends on the intermediate shuffle data size.
    • Stage 3:
      • Tasks: Aggregating session data (Aggregation).
      • Operations: df_aggregated = df_sessions.groupBy("user_id").agg(...)
      • Number of Tasks: Again determined by the number of unique keys (users) and partitioning strategy (e.g., 50 partitions).
      • Memory Usage: Depends on the size of the aggregated data.
    • Stage 4:
      • Tasks: Writing results to HDFS.
      • Operations: df_aggregated.write.parquet(...)
      • Number of Tasks: Typically determined by the number of output partitions (e.g., 50 tasks).
      • Memory Usage: Depends on the size of the data being written.

    Example with Numbers:

    Let’s assume we have a dataset of 1 TB, distributed across 100 HDFS blocks.

    1. Stage 1 (Data Cleaning):
      • Tasks: 100 (one per HDFS block)
      • Data Read: 1 TB (distributed across tasks)
      • Intermediate Data: 900 GB (after filtering out 10% erroneous data)
    2. Stage 2 (Sessionization):
      • Tasks: 200 (partitioned by user and time interval)
      • Intermediate Data: 900 GB
      • Shuffled Data: 900 GB (data shuffled across tasks for grouping)
    3. Stage 3 (Aggregation):
      • Tasks: 50 (grouped by user)
      • Intermediate Data: 50 GB (assuming significant aggregation)
      • Shuffled Data: 50 GB
    4. Stage 4 (Write to HDFS):
      • Tasks: 50 (one per output partition)
      • Output Data: 50 GB (final aggregated data)

    Memory Management:

    • Executor Memory: Configured based on the size of the data and the number of tasks. E.g., each executor allocated 8 GB memory.
    • Caching and Persistence: Intermediate results (e.g., df_cleaned and df_sessions) can be cached in memory to avoid recomputation.
    • Shuffle Memory: Managed dynamically to balance between storage and execution needs.

    Execution Log Example:

    plaintextCopy codeINFO DAGScheduler: Final stage: ResultStage 4 (parquet at ETL_Pipeline.py:24)
    INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 3, ShuffleMapStage 2, ShuffleMapStage 1)
    INFO DAGScheduler: Missing parents: List(ShuffleMapStage 3, ShuffleMapStage 2, ShuffleMapStage 1)
    INFO DAGScheduler: Submitting ShuffleMapStage 1 (MapPartitionsRDD[4] at filter at ETL_Pipeline.py:10), which has no missing parents
    INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 4.1 KB, free 2004.6 MB)
    INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on executor_1 (size: 4.1 KB, free: 2004.6 MB)
    INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on executor_2 (size: 4.1 KB, free: 2004.6 MB)
    INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on executor_3 (size: 4.1 KB, free: 2004.6 MB)
    INFO DAGScheduler: Submitting 100 missing tasks from ShuffleMapStage 1 (MapPartitionsRDD[4] at filter at ETL_Pipeline.py:10) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
    INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 0, executor 1, partition 0, PROCESS_LOCAL, 7987 bytes)
    INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 1, executor 2, partition 1, PROCESS_LOCAL, 7987 bytes)
    INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 2, executor 3, partition 2, PROCESS_LOCAL, 7987 bytes)
    ...
    INFO BlockManagerInfo: Added rdd_4_0 in memory on executor_1 (size: 45.6 MB, free: 1958.6 MB)
    INFO BlockManagerInfo: Added rdd_4_1 in memory on executor_2 (size: 45.6 MB, free: 1958.6 MB)
    INFO BlockManagerInfo: Added rdd_4_2 in memory on executor_3 (size: 45.6 MB, free: 1958.6 MB)
    ...
    INFO DAGScheduler: Submitting 200 missing tasks from ShuffleMapStage 2 (MapPartitionsRDD[7] at groupBy at ETL_Pipeline.py:14) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
    INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 100, executor 1, partition 0, PROCESS_LOCAL, 7987 bytes)
    ...
    INFO BlockManagerInfo: Added rdd_7_0 in memory on executor_1 (size: 10.2 MB, free: 1948.4 MB)
    INFO BlockManagerInfo: Added rdd_7_1 in memory on executor_2 (size: 10.2 MB, free: 1948.4 MB)
    INFO BlockManagerInfo: Added rdd_7_2 in memory on executor_3 (size: 10.2 MB, free: 1948.4 MB)
    ...
    INFO DAGScheduler: Submitting 50 missing tasks from ResultStage 4 (ShuffledRDD[10] at write at ETL_Pipeline.py:24) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
    INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 300, executor 1, partition 0, PROCESS_LOCAL, 7987 bytes)
    ...
    INFO BlockManagerInfo: Removed broadcast_0_piece0 on executor_1 in memory (size: 4.1 KB, free: 1958.6 MB)
    INFO BlockManagerInfo: Removed broadcast_0_piece0 on executor_2 in memory (size: 4.1 KB, free: 1958.6 MB)
    INFO BlockManagerInfo: Removed broadcast_0_piece0 on executor_3 in memory (size: 4.1 KB, free: 1958.6 MB)
    

    PySpark optimizes job execution by breaking it down into stages and tasks based on data dependencies. The DAG Scheduler orchestrates the process, ensuring efficient memory management and parallel execution. By caching intermediate results and leveraging in-memory computation, PySpark minimizes disk I/O and improves overall performance. This approach contrasts with the more disk-intensive Hadoop MapReduce, highlighting PySpark’s advantages in handling complex data pipelines.

  • explain a typical Pyspark execution Logs

    A typical PySpark execution log provides detailed information about the various stages and tasks of a Spark job. These logs are essential for debugging and optimizing Spark applications. Here’s a step-by-step explanation of what you might see in a typical PySpark execution log:

    Step 1: Spark Context Initialization

    When you start a PySpark application, the first entries in the log are related to initializing the SparkContext and SparkSession.

    INFO SparkContext: Running Spark version 3.1.2
    INFO SparkContext: Submitted application: Example App
    INFO ResourceUtils: ==============================================================
    INFO ResourceUtils: No custom resources configured for spark.driver.
    INFO SparkContext: Added JAR file:/path/to/pyspark-shell

    Step 2: Application and Job Start

    The log will show details about the Spark application and the start of a job.

    INFO SparkContext: Starting job: show at ExampleApp.py:25
    INFO DAGScheduler: Got job 0 (show at ExampleApp.py:25) with 1 output partitions
    INFO DAGScheduler: Final stage: ResultStage 0 (show at ExampleApp.py:25)
    INFO DAGScheduler: Parents of final stage: List()
    INFO DAGScheduler: Missing parents: List()

    Step 3: Stage and Task Scheduling

    Next, the log will include details about the stages and tasks that Spark schedules for execution.

    INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at show at ExampleApp.py:25), which has no missing parents
    INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 4.5 KB, free 365.2 MB)
    INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.1 KB, free 365.2 MB)
    INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.1:49919 (size: 2.1 KB, free: 366.3 MB)
    INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1039
    INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at show at ExampleApp.py:25) (first 15 tasks are for partitions Vector(0))
    INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
    INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 192.168.1.1, executor driver, partition 0, PROCESS_LOCAL, 4735 bytes)
    INFO Executor: Running task 0.0 in stage 0.0 (TID 0)

    Step 4: Task Execution

    Each task’s execution will be logged, including any shuffling and data read/write operations.

    INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.1:49919 (size: 2.1 KB, free: 366.3 MB)
    INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1380 bytes result sent to driver
    INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 213 ms on 192.168.1.1 (executor driver) (1/1)
    INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
    INFO DAGScheduler: ResultStage 0 (show at ExampleApp.py:25) finished in 0.254 s
    INFO DAGScheduler: Job 0 is finished. 1/1 tasks completed.

    Step 5: Stage and Job Completion

    Logs will indicate the completion of stages and jobs, including the time taken for execution.

    INFO DAGScheduler: Job 0 finished: show at ExampleApp.py:25, took 0.270467 s
    INFO SparkContext: Starting job: count at ExampleApp.py:27
    INFO DAGScheduler: Registering RDD 4 (count at ExampleApp.py:27)
    INFO DAGScheduler: Got job 1 (count at ExampleApp.py:27) with 1 output partitions
    INFO DAGScheduler: Final stage: ResultStage 1 (count at ExampleApp.py:27)
    INFO DAGScheduler: Parents of final stage: List()
    INFO DAGScheduler: Missing parents: List()
    INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at count at ExampleApp.py:27), which has no missing parents
    INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 5.4 KB, free 365.2 MB)
    INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.7 KB, free 365.2 MB)
    INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.1:49919 (size: 2.7 KB, free: 366.3 MB)
    INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1039
    INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at count at ExampleApp.py:27) (first 15 tasks are for partitions Vector(0))
    INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
    INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, 192.168.1.1, executor driver, partition 0, PROCESS_LOCAL, 4735 bytes)
    INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
    INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.1:49919 (size: 2.7 KB, free: 366.3 MB)
    INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1380 bytes result sent to driver
    INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 221 ms on 192.168.1.1 (executor driver) (1/1)
    INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
    INFO DAGScheduler: ResultStage 1 (count at ExampleApp.py:27) finished in 0.273 s
    INFO DAGScheduler: Job 1 finished: count at ExampleApp.py:27, took 0.277596 s

    Step 6: Resource Cleanup

    After job completion, Spark will clean up the resources and shut down the session.

    INFO SparkUI: Stopped Spark web UI at http://192.168.1.1:4040
    INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    INFO MemoryStore: MemoryStore cleared
    INFO BlockManager: BlockManager stopped
    INFO BlockManagerMaster: BlockManagerMaster stopped
    INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    INFO SparkContext: Successfully stopped SparkContext
    INFO ShutdownHookManager: Shutdown hook called
    INFO ShutdownHookManager: Deleting directory /tmp/spark-12a26f8f-d833-439a-ae3a-6c9ed6c29c67

    Summary of Key Log Entries

    1. Initialization:
      • Information about Spark version, application submission, and resource configuration.
    2. Job Start:
      • Details of job submission, including the number of output partitions and stages.
    3. Stage and Task Scheduling:
      • Logs about stage submission, task scheduling, and resource allocation.
    4. Task Execution:
      • Logs on task start, execution, data shuffling, and completion.
    5. Stage and Job Completion:
      • Logs indicating the completion of stages and jobs with execution time.
    6. Resource Cleanup:
      • Information on stopping the Spark context, cleaning up resources, and shutting down Spark.

    Example PySpark Log Analysis

    Here’s a breakdown of an example PySpark log entry:

    INFO DAGScheduler: Got job 0 (show at ExampleApp.py:25) with 1 output partitions
    • INFO: Log level.
    • DAGScheduler: Component generating the log.
    • Got job 0 (show at ExampleApp.py:25): Job ID and action.
    • with 1 output partitions: Number of output partitions for the job.

    By understanding these log entries, you can monitor the execution of your PySpark jobs, identify performance bottlenecks, and debug errors efficiently.

    let’s delve deeper into the key components and execution process of a PySpark job, explaining each step with a real-life example and corresponding log entries.

    Real-Life Example: Word Count Application

    We’ll use a word count application as our example. This application reads a text file, counts the occurrences of each word, and outputs the result.

    from pyspark import SparkConf, SparkContext
    from pyspark.sql import SparkSession

    # Initialize SparkConf and SparkContext
    conf = SparkConf().setAppName("Word Count").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    # Create SparkSession
    spark = SparkSession(sc)

    # Read text file into RDD
    text_file = sc.textFile("path/to/textfile.txt")

    # Split lines into words, map each word to a (word, 1) pair, and reduce by key to count occurrences
    words = text_file.flatMap(lambda line: line.split(" "))
    word_pairs = words.map(lambda word: (word, 1))
    word_counts = word_pairs.reduceByKey(lambda a, b: a + b)

    # Collect results
    results = word_counts.collect()

    # Print results
    for word, count in results:
    print(f"{word}: {count}")

    # Stop the SparkContext
    sc.stop()

    Explanation with Log Entries

    1. Initialization:

    Code:
    conf = SparkConf().setAppName("Word Count").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)
    Log Entries:
    INFO SparkContext: Running Spark version 3.1.2
    INFO SparkContext: Submitted application: Word Count
    INFO ResourceUtils: ==============================================================
    INFO ResourceUtils: No custom resources configured for spark.driver.
    INFO SparkContext: Added JAR file:/path/to/pyspark-shell
    INFO Utils: Successfully started service 'SparkUI' on port 4040.
    INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.1:4040
    • Explanation: Initializes the SparkConf and SparkContext. A SparkSession is created using the existing SparkContext. The Spark UI is started on port 4040.

    2. Resource Allocation:

    Spark allocates resources for the application, such as memory and CPUs specified in the configuration (local[*] means using all available cores).

    Log Entries:
    INFO SparkContext: Added JAR file:/path/to/pyspark-shell
    INFO MemoryStore: MemoryStore started with capacity 366.3 MB
    INFO DiskBlockManager: Created local directory at /tmp/blockmgr-123456
    INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.1, 49919)
    INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.1:49919 with 366.3 MB RAM, BlockManagerId(driver, 192.168.1.1, 49919)
    INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.1, 49919)
    • Explanation: Registers block managers and allocates memory and disk space for storing RDD partitions.

    3. Tasks Creation by Driver for Jobs:

    Code:
    text_file = sc.textFile("path/to/textfile.txt")
    Log Entries:
    INFO SparkContext: Starting job: textFile at WordCount.py:12
    INFO DAGScheduler: Got job 0 (textFile at WordCount.py:12) with 1 output partitions
    INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at textFile at WordCount.py:12), which has no missing parents
    INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 4.5 KB, free 365.2 MB)
    INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1039
    • Explanation: The driver converts the RDD operation (textFile) into a job and creates a stage with tasks. The file is split into partitions.

    4. DAG Scheduler Working:

    The DAG (Directed Acyclic Graph) scheduler divides the job into stages based on shuffle boundaries.

    Log Entries:
    INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at textFile at WordCount.py:12), which has no missing parents
    INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[2] at flatMap at WordCount.py:14), which has no missing parents
    INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[3] at map at WordCount.py:15), which has no missing parents
    INFO DAGScheduler: Submitting ResultStage 3 (ShuffledRDD[4] at reduceByKey at WordCount.py:16), which has no missing parents
    • Explanation: Submits stages in the DAG to be executed. Here, stages correspond to reading the text file, performing flatMap, map, and reduceByKey operations.

    5. Broadcast:

    Broadcast variables are used to efficiently distribute large read-only data to all worker nodes.

    Log Entries:
    plaintextCopy codeINFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.5 KB, free 365.2 MB)
    INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.1:49919 (size: 2.1 KB, free: 366.3 MB)
    INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1039
    
    • Explanation: Broadcasts the variable to all executors. In this example, broadcasting might occur if there are configuration settings or large data structures needed by all tasks.

    6. Stages Creations:

    Stages are created for different transformations. Each stage contains tasks that are executed in parallel.

    Log Entries:
    INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at textFile at WordCount.py:12)
    INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[2] at flatMap at WordCount.py:14)
    INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[3] at map at WordCount.py:15)
    INFO DAGScheduler: Submitting ResultStage 3 (ShuffledRDD[4] at reduceByKey at WordCount.py:16)
    • Explanation: Each transformation (flatMap, map, reduceByKey) results in a new stage.

    7. Task Execution:

    Tasks within each stage are sent to executors for parallel execution.

    Log Entries:
    INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 192.168.1.1, executor driver, partition 0, PROCESS_LOCAL, 4735 bytes)
    INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
    INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.1:49919 (size: 2.1 KB, free: 366.3 MB)
    INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1380 bytes result sent to driver
    INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 213 ms on 192.168.1.1 (executor driver) (1/1)
    INFO DAGScheduler: ResultStage 0 (MapPartitionsRDD[1] at textFile at WordCount.py:12) finished in 0.254 s
    INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
    • Explanation: Each task processes its partition of data. The executor runs the task and returns the result to the driver.

    8. Next Job Initiation:

    Once the current job is complete, the next job starts if there are further actions.

    Code:
    results = word_counts.collect()
    Log Entries:
    INFO SparkContext: Starting job: collect at WordCount.py:18
    INFO DAGScheduler: Got job 1 (collect at WordCount.py:18) with 2 output partitions
    INFO DAGScheduler: Final stage: ResultStage 4 (collect at WordCount.py:18)
    INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 3)
    INFO DAGScheduler: Missing parents: List(ShuffleMapStage 3)
    INFO DAGScheduler: Submitting ShuffleMapStage 3 (MapPartitionsRDD[3] at map at WordCount.py:15) with 2 output partitions
    INFO DAGScheduler: Submitting ResultStage 4 (ShuffledRDD[4] at reduceByKey at WordCount.py:16) with 1 output partitions
    • Explanation: The next job is initiated, here the collect action, and Spark plans the stages and tasks accordingly.

  • Yes, DataFrames in PySpark are lazily evaluated, similar to RDDs. Lazy evaluation is a key feature of Spark’s processing model, which helps optimize the execution of transformations and actions on large datasets.

    What is Lazy Evaluation?

    Lazy evaluation means that Spark does not immediately execute the transformations you apply to a DataFrame. Instead, it builds a logical plan of the transformations, which is only executed when an action is called. This allows Spark to optimize the entire data processing workflow before executing it.

    How Lazy Evaluation Works in Spark DataFrames

    1. Transformations:
      • Operations like select, filter, groupBy, and join are considered transformations.
      • These transformations are not executed immediately. Spark keeps track of the operations and builds a logical execution plan.
    2. Actions:
      • Operations like show, collect, count, write, and take are actions.
      • When an action is called, Spark’s Catalyst optimizer converts the logical plan into a physical execution plan, applying various optimizations.
    3. Optimization:
      • The logical plan is optimized by the Catalyst optimizer. This includes:
        • Predicate pushdown: Moving filters closer to the data source.
        • Column pruning: Selecting only the necessary columns.
        • Join optimization: Reordering joins for efficiency.
        • Aggregation pushdown: Pushing aggregations closer to the data source.
    4. Execution:
      • Once the optimized plan is ready, the DAG scheduler breaks it into stages and tasks.
      • Tasks are then distributed across the cluster and executed by the worker nodes.
      • Results are collected and returned to the driver program.

    Example to Illustrate Lazy Evaluation

    Here is an example to demonstrate lazy evaluation with DataFrames in Spark:

    from pyspark.sql import SparkSession

    # Initialize Spark session
    spark = SparkSession.builder.appName("Lazy Evaluation Example").getOrCreate()

    # Load data from a CSV file into a DataFrame
    df = spark.read.csv("hdfs:///path/to/data.csv", header=True, inferSchema=True)

    # Apply transformations (these are lazily evaluated)
    filtered_df = df.filter(df['age'] > 21)
    selected_df = filtered_df.select('name', 'age')
    grouped_df = selected_df.groupBy('age').count()

    # The above transformations are not executed yet.

    # Trigger an action
    result = grouped_df.collect()

    # The above action triggers the execution of all the previous transformations.

    # Stop Spark session
    spark.stop()

    In this example:

    1. Transformations: filter, select, and groupBy are transformations. Spark records these operations but does not execute them immediately.
    2. Action: collect is an action that triggers the execution of the recorded transformations.
    3. Execution: When collect is called, Spark’s optimizer generates an optimized physical plan, and the execution plan is executed.

    Benefits of Lazy Evaluation

    1. Optimization: Allows Spark to optimize the entire workflow, resulting in more efficient execution.
    2. Fault Tolerance: Facilitates recomputation in case of failures, as the logical plan is preserved.
    3. Efficiency: Reduces unnecessary data movement and computation by applying optimizations like predicate pushdown and column pruning.

    Summary

    • Lazy Evaluation: Spark DataFrames are lazily evaluated, meaning transformations are not executed until an action is called.
    • Transformations vs. Actions: Transformations build a logical plan, while actions trigger the execution of that plan.
    • Optimization: Spark’s Catalyst optimizer optimizes the logical plan before execution, leading to efficient data processing.

    This lazy evaluation mechanism is a powerful feature of Spark, enabling it to handle large-scale data processing tasks efficiently.

  • Big Data Lake: Data Storage

    HDFS is a scalable storage solution designed to handle massive datasets across clusters of machines. Hive tables provide a structured approach for querying and analyzing data stored in HDFS. Understanding how these components work together is essential for effectively managing data in your BDL ecosystem.

    HDFS – Hadoop Distributed File System

    • Scalable storage for large datasets
    • Distributes data across clusters of machines
    • Offers fault tolerance: data loss is minimized if a node fails
    • Stores data in blocks (typically 128 MB)
    • Manages data through NameNode (metadata) and DataNodes (storage)

    HDFS is the foundation for storing large datasets within a Big Data Lake. It leverages a distributed architecture, where data is split into blocks and stored across multiple machines (nodes) in a cluster. This approach ensures scalability and fault tolerance. Even if a node fails, the data can be reconstructed from replicas stored on other nodes. HDFS manages data through two key components:

    • NameNode: Stores the metadata (location information) of all data blocks in the cluster.
    • DataNode: Stores the actual data blocks on the machines in the cluster.

    HDFS – Benefits

    • Scalability: Easily scales to accommodate growing data volumes by adding more nodes to the cluster.
    • Fault Tolerance: Minimizes data loss by replicating data blocks across multiple nodes.
    • Cost-effective: Leverages commodity hardware, making it a cost-efficient storage solution for big data.
    • Highly Available: Ensures continuous data access even if a node fails.

    HDFS offers several advantages for storing data in a Big Data Lake:

    • Scalability: As your data volume grows, you can easily add more nodes to the cluster, allowing HDFS to scale seamlessly.
    • Fault Tolerance: Data replication ensures that even if a node fails, the data remains accessible and recoverable.
    • Cost-effective: HDFS utilizes commodity hardware, making it a cost-efficient solution for storing large datasets compared to traditional storage options.
    • High Availability: By replicating data, HDFS ensures that data is continuously available for access and analysis, even during node failures.

    Hive Tables

    Provides a structured schema and SQL-like interface for accessing data stored in HDFS. Hive tables don’t store the data themselves; they act as a metadata layer pointing to the actual data location in HDFS.

    • Imposes structure on data stored in HDFS
    • Provides a SQL-like interface for querying data
    • Enables efficient data analysis using familiar syntax
    • Supports various data formats (text, ORC, Parquet)
    • Abstracts the underlying storage details of HDFS from users

    While HDFS offers a scalable storage solution, the data itself remains in a raw, unstructured format. Hive tables address this by providing a layer of structure on top of data stored in HDFS. Hive tables resemble traditional database tables with rows and columns, allowing users to query the data using a familiar SQL-like syntax. This simplifies data analysis and exploration within the Big Data Lake. Additionally, Hive tables support various data formats like text, ORC, and Parquet, offering flexibility and optimization benefits. Importantly, Hive abstracts the complexities of HDFS from users, allowing them to focus on querying and analyzing data without needing to manage the underlying storage details.

    HDFS vs Hive Tables

    • HDFS:
      • Stores raw, unstructured data
      • Scalable and fault-tolerant storage
      • Manages data through NameNode and DataNodes
    • Hive Tables:
      • Imposes structure on data in HDFS
      • Provides SQL-like interface for querying data

    Pages: 1 2 3

HintsToday

Hints and Answers for Everything

Skip to content ↓

Subscribe