This Post is Collection of Handy Tricks and Snippets.


Passing Parameters in Automation of Scripts using Python

Python provides several ways to pass parameters in automation of scripts, mimicking SAS macro variables, macro modules, and macro scripting. Here are some methods:

1. Command-Line Arguments

Use the argparse library to pass command-line arguments to your Python script.

Python

import argparse

parser = argparse.ArgumentParser()
parser.add_argument('--input_file', help='Input file path')
parser.add_argument('--output_file', help='Output file path')

args = parser.parse_args()

# Use the arguments
input_file = args.input_file
output_file = args.output_file

2. Environment Variables

Set environment variables using the os library and access them in your script.

Python

import os

input_file = os.environ.get('INPUT_FILE')
output_file = os.environ.get('OUTPUT_FILE')

3. Configuration Files

Use configuration files like JSON, YAML, or INI files to store parameters.

Python

import json

with open('config.json') as f:
    config = json.load(f)

input_file = config['input_file']
output_file = config['output_file']

4. Python Variables

Define Python variables at the top of your script or in a separate module.

Python

# config.py
INPUT_FILE = 'input.csv'
OUTPUT_FILE = 'output.csv'

5. Lit Function (Apache Spark)

In Apache Spark, use the lit function to create a literal column with a specified value.

Python

from pyspark.sql.functions import lit

df = spark.read.csv('input.csv')
df = df.withColumn('output_file', lit('output.csv'))

6. Spark Config

Use Spark configuration properties to pass parameters.

Python

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('My App').config('spark.input.file', 'input.csv').getOrCreate()

Mimicking SAS Macro Variables

To mimic SAS macro variables, you can use a combination of the above methods. For example:

  • Define a dictionary with macro variable names as keys and values as values.
  • Use the format method to substitute macro variable values into strings.

Python

macro_vars = {
    'INPUT_FILE': 'input.csv',
    'OUTPUT_FILE': 'output.csv'
}

input_file = macro_vars['INPUT_FILE']
output_file = macro_vars['OUTPUT_FILE']

query = "SELECT * FROM {INPUT_FILE}".format(**macro_vars)

Mimicking SAS Macro Modules

To mimic SAS macro modules, you can create separate Python modules with functions that perform specific tasks. Import these modules into your main script and call the functions as needed.

Python

# module1.py
def read_input_file(input_file):
    # code to read input file
    pass

# module2.py
def process_data(data):
    # code to process data
    pass

# main.py
from module1 import read_input_file
from module2 import process_data

input_file = 'input.csv'
data = read_input_file(input_file)
processed_data = process_data(data)

Mimicking SAS Macro Scripting

To mimic SAS macro scripting, you can use a combination of the above methods. For example:

  • Define a dictionary with macro variable names as keys and values as values.
  • Use the format method to substitute macro variable values into strings.
  • Create separate Python modules with functions that perform specific tasks.
  • Import these modules into your main script and call the functions as needed.

Python

# config.py
macro_vars = {
    'INPUT_FILE': 'input.csv',
    'OUTPUT_FILE': 'output.csv'
}

# module1.py
def read_input_file(input_file):
    # code to read input file
    pass

# module2.py
def process_data(data):
    # code to process data
    pass

# main.py
from config import macro_vars
from module1 import read_input_file
from module2 import process_data

input_file = macro_vars['INPUT_FILE']
data = read_input_file(input_file)
processed_data = process_data(data)

Mimicking SAS Macro Variables in Python and PySpark

SAS macro variables can be created from tables, rows, or columns, and can also be used to dynamically generate dates. Here’s how you can mimic these functionalities in Python and PySpark:

1. Creating Macro Variables from Tables

SAS:

proc sql;
  select distinct dept into :dept1-:dept100
  from employees;
quit;

Python:

import pandas as pd

# assuming 'employees' is a pandas DataFrame
depts = employees['dept'].unique()
dept_vars = [f'dept{i+1}' for i in range(len(depts))]
for var, dept in zip(dept_vars, depts):
    globals()[var] = dept

PySpark:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
employees = spark.read.csv('employees.csv', header=True, inferSchema=True)

depts = employees.select('dept').distinct().collect()
dept_vars = [f'dept{i+1}' for i in range(len(depts))]
for var, dept in zip(dept_vars, depts):
    globals()[var] = dept.value

2. Creating Macro Variables from Rows

SAS:

proc sql;
  select name, age into :name1, :age1, :name2, :age2
  from employees
  where rownum <= 2;
quit;

Python:

import pandas as pd

# assuming 'employees' is a pandas DataFrame
rows = employees.head(2).values.tolist()
name_vars = [f'name{i+1}' for i in range(len(rows))]
age_vars = [f'age{i+1}' for i in range(len(rows))]
for var, row in zip(name_vars, rows):
    globals()[var] = row[0]
for var, row in zip(age_vars, rows):
    globals()[var] = row[1]

PySpark:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
employees = spark.read.csv('employees.csv', header=True, inferSchema=True)

rows = employees.limit(2).collect()
name_vars = [f'name{i+1}' for i in range(len(rows))]
age_vars = [f'age{i+1}' for i in range(len(rows))]
for var, row in zip(name_vars, rows):
    globals()[var] = row.name
for var, row in zip(age_vars, rows):
    globals()[var] = row.age

3. Creating Macro Variables from Columns

SAS:

proc sql;
  select distinct column1 into :col1-:col100
  from table1;
quit;

Python:

import pandas as pd

# assuming 'table1' is a pandas DataFrame
cols = table1['column1'].unique()
col_vars = [f'col{i+1}' for i in range(len(cols))]
for var, col in zip(col_vars, cols):
    globals()[var] = col

PySpark:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
table1 = spark.read.csv('table1.csv', header=True, inferSchema=True)

cols = table1.select('column1').distinct().collect()
col_vars = [f'col{i+1}' for i in range(len(cols))]
for var, col in zip(col_vars, cols):
    globals()[var] = col.value

4. Dynamic Date Values

SAS:

%let today = %sysfunc(date());
%let yesterday = %sysfunc(intnx('day', &today, -1));

Python:

from datetime import date, timedelta

today = date.today()
yesterday = today - timedelta(days=1)

PySpark:

from pyspark.sql.functions import current_date, date_sub

today = current_date()
yesterday = date_sub(today, 1)

collect() and foreach()

collect() and foreach() are two commonly used methods in Apache Spark, particularly in PySpark. Here’s how they can be used in automation:

collect()

The collect() method is used to retrieve all the elements of a DataFrame or an RDD (Resilient Distributed Dataset) as a list. This method is useful when you need to perform some operation on the entire dataset.

Example:

Python

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
data = spark.read.csv('data.csv', header=True, inferSchema=True)

# Collect the data as a list
data_list = data.collect()

# Perform some operation on the list
for row in data_list:
    print(row)

In automation, collect() can be used to:

  • Retrieve data from a DataFrame or RDD and perform some operation on it.
  • Validate data by checking for specific values or patterns.
  • Transform data by applying some function to each element.

foreach()

The foreach() method is used to apply a function to each element of a DataFrame or an RDD. This method is useful when you need to perform some operation on each element of the dataset.

Example:

Python

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
data = spark.read.csv('data.csv', header=True, inferSchema=True)

# Define a function to apply to each element
def print_row(row):
    print(row)

# Apply the function to each element
data.foreach(print_row)

In automation, foreach() can be used to:

  • Perform some operation on each element of a dataset.
  • Validate data by checking for specific values or patterns.
  • Transform data by applying some function to each element.

Use cases in automation

Here are some use cases for collect() and foreach() in automation:

  • Data validation: Use collect() to retrieve data from a DataFrame or RDD and then validate it by checking for specific values or patterns.
  • Data transformation: Use foreach() to apply a function to each element of a DataFrame or RDD and transform the data.
  • Reporting: Use collect() to retrieve data from a DataFrame or RDD and then generate reports based on the data.
  • Alerting: Use foreach() to apply a function to each element of a DataFrame or RDD and send alerts based on specific conditions.

In summary, collect() and foreach() are powerful methods in PySpark that can be used in automation to retrieve data, perform operations on each element, and transform data.

Common operations, optimizations, and scenarios encountered in data engineering and data analysis workflows

1. Python Automation Snippets

Reading Configurations

Use a JSON or YAML file to manage configurations dynamically:

pythonCopy codeimport json

# Load configuration
with open("config.json", "r") as file:
    config = json.load(file)

# Access configurations
input_path = config["input_path"]
output_path = config["output_path"]

Dynamic Function Execution

Automatically execute functions based on a mapping dictionary:

pythonCopy codedef task_a():
    print("Executing Task A")

def task_b():
    print("Executing Task B")

tasks = {"task_a": task_a, "task_b": task_b}

# Dynamically execute based on input
task_name = "task_a"
tasks[task_name]()

Parallel Processing

Use concurrent.futures for simple parallel processing:

pythonCopy codefrom concurrent.futures import ThreadPoolExecutor

def process_data(partition):
    # Your processing logic
    return partition * 2

data = [1, 2, 3, 4, 5]
with ThreadPoolExecutor(max_workers=3) as executor:
    results = list(executor.map(process_data, data))

print(results)

File Monitoring

Automatically detect new files in a directory and process them:

pythonCopy codeimport os
import time

directory = "/path/to/watch"

def process_file(file):
    print(f"Processing file: {file}")

processed_files = set()

while True:
    files = set(os.listdir(directory))
    new_files = files - processed_files
    for file in new_files:
        process_file(file)
        processed_files.add(file)
    time.sleep(10)

2. PySpark Automation Snippets

Dynamic Column Selection

Generate a dynamic list of columns based on conditions:

pythonCopy codefrom pyspark.sql import functions as F

columns = [col for col in df.columns if col.startswith("metric")]
df.select(columns).show()

Dynamic SQL Query

Build and execute dynamic SQL queries:

pythonCopy codequery_template = """
    SELECT {columns}
    FROM {table}
    WHERE {conditions}
"""
columns = "id, name"
table = "users"
conditions = "age > 30"

query = query_template.format(columns=columns, table=table, conditions=conditions)
result_df = spark.sql(query)
result_df.show()

Automated Partition Handling

Write data to Hive with dynamic partitioning:

pythonCopy codedf.write 
  .partitionBy("year", "month") 
  .mode("overwrite") 
  .saveAsTable("database.table")

Broadcast Variables

Broadcast a small dataset to optimize joins:

pythonCopy codefrom pyspark.sql.functions import broadcast

small_df = spark.table("small_table")
large_df = spark.table("large_table")

result = large_df.join(broadcast(small_df), "key")
result.show()

3. General Automation Patterns

Error Handling and Logging

Centralized error logging:

pythonCopy codeimport logging

logging.basicConfig(level=logging.INFO, filename="app.log", filemode="a",
                    format="%(asctime)s - %(levelname)s - %(message)s")

try:
    result = 10 / 0
except ZeroDivisionError as e:
    logging.error(f"Error occurred: {e}")

Metadata-Driven Automation

Control job execution through metadata tables:

pythonCopy codemetadata = spark.sql("SELECT * FROM job_metadata WHERE status = 'active'")

for row in metadata.collect():
    query = row["query"]
    table_name = row["output_table"]
    df = spark.sql(query)
    df.write.mode("overwrite").saveAsTable(table_name)

Dynamic DataFrame Transformations

Apply transformations dynamically based on conditions:

pythonCopy codetransformations = {
    "uppercase": lambda col: F.upper(F.col(col)),
    "add_prefix": lambda col: F.concat(F.lit("prefix_"), F.col(col)),
}

transformation = "uppercase"
column = "name"

df = df.withColumn(column, transformations[transformation](column))
df.show()

4. Optimizations

Avoid Data Skew

Add salting for skewed joins:

pythonCopy codedf = df.withColumn("salt", (F.rand() * 10).cast("int"))
joined_df = df1.join(df2, (df1.key == df2.key) & (df1.salt == df2.salt))

Reduce Shuffling

Use reduceByKey instead of groupByKey:

pythonCopy coderdd = sc.parallelize([(1, 2), (1, 3), (2, 4)])
result = rdd.reduceByKey(lambda x, y: x + y).collect()

5. Automation for ETL

Dynamic ETL Framework

Use a configuration table to control ETL:

pythonCopy codemetadata = spark.sql("SELECT * FROM etl_metadata WHERE status = 'active'")
for row in metadata.collect():
    source = row["source_table"]
    target = row["target_table"]
    query = row["query"]
    df = spark.sql(query)
    df.write.mode("overwrite").saveAsTable(target)

Automated Logging

Log job statuses automatically:

pythonCopy codefrom datetime import datetime

def log_status(job_name, status, message=None):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{timestamp}] Job: {job_name} | Status: {status} | Message: {message}")

log_status("ETL_Job", "SUCCESS")

6.Schedule tasks with schedule library:

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)  # run job every 10 minutes

while True:
    schedule.run_pending()
    time.sleep(1)

7.Use argparse for command-line arguments:

import argparse

parser = argparse.ArgumentParser()
parser.add_argument("--input_file", help="Input file path")
args = parser.parse_args()

print(args.input_file)

8.Automate file operations with shutil:

import shutil

shutil.copyfile("source.txt", "destination.txt")

9. Dynamic File Processing with Metadata

Use metadata to control file processing logic dynamically:

pythonCopy codefile_metadata = [
    {"file_path": "/data/sales_2023.csv", "delimiter": ",", "table_name": "sales_data"},
    {"file_path": "/data/users_2023.txt", "delimiter": "t", "table_name": "user_data"}
]

for meta in file_metadata:
    file_path = meta["file_path"]
    delimiter = meta["delimiter"]
    table_name = meta["table_name"]
    
    df = spark.read.option("delimiter", delimiter).csv(file_path, header=True)
    df.write.mode("overwrite").saveAsTable(table_name)

10. Dynamic Transformation Using Configurations

Apply transformations based on metadata-driven configurations:

pythonCopy codetransformation_rules = [
    {"column": "price", "operation": "multiply", "value": 1.1},
    {"column": "quantity", "operation": "add", "value": 5}
]

for rule in transformation_rules:
    column = rule["column"]
    operation = rule["operation"]
    value = rule["value"]

    if operation == "multiply":
        df = df.withColumn(column, F.col(column) * value)
    elif operation == "add":
        df = df.withColumn(column, F.col(column) + value)

11. Conditional Data Validation and Logging

Validate data dynamically and log issues:

pythonCopy codevalidation_rules = [
    {"column": "age", "condition": "age > 0", "message": "Invalid age"},
    {"column": "salary", "condition": "salary > 0", "message": "Invalid salary"}
]

for rule in validation_rules:
    column = rule["column"]
    condition = rule["condition"]
    message = rule["message"]

    invalid_rows = df.filter(f"NOT ({condition})")
    invalid_rows.write.mode("append").csv(f"/logs/{column}_errors.csv")

12. Metadata-Driven Job Scheduling

Control job execution using a metadata table:

pythonCopy codejobs_metadata = spark.sql("SELECT * FROM job_schedule WHERE status = 'active'")

for job in jobs_metadata.collect():
    job_name = job["job_name"]
    script_path = job["script_path"]

    print(f"Running Job: {job_name}")
    exec(open(script_path).read())

13. Exception Handling with Retry Logic

Implement retry logic for error handling in transformations:

pythonCopy codeMAX_RETRIES = 3
RETRY_DELAY = 10  # seconds

def run_with_retry(func, *args, **kwargs):
    for attempt in range(MAX_RETRIES):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            if attempt < MAX_RETRIES - 1:
                print(f"Retrying after error: {e}")
                time.sleep(RETRY_DELAY)
            else:
                print(f"Failed after {MAX_RETRIES} retries: {e}")
                raise

# Example usage
run_with_retry(spark.sql, "SELECT * FROM table")

14. Efficient Grouped Operations

Optimize grouped operations with mapGroups:

pythonCopy codefrom pyspark.sql.functions import col
from pyspark.sql import Row

def custom_group_processing(key, rows):
    total = sum(row.value for row in rows)
    return [Row(group=key, total=total)]

df.rdd.mapPartitions(lambda partition: partition.map(lambda x: (x.group, x))).groupByKey().flatMap(
    lambda group: custom_group_processing(*group)
).toDF().show()

15. Dynamic Partitioning and Bucketing

Dynamically partition and bucket tables based on metadata:

pythonCopy codepartition_metadata = {
    "sales_data": {"partition_by": "region", "buckets": 10},
    "user_data": {"partition_by": "country", "buckets": 5}
}

for table, meta in partition_metadata.items():
    partition_by = meta["partition_by"]
    buckets = meta["buckets"]
    df = spark.table(table)
    df.write.partitionBy(partition_by).bucketBy(buckets, "id").saveAsTable(f"{table}_optimized")

16. Dynamic Broadcast Join Based on Metadata

Use metadata to decide whether to broadcast a table:

join_metadata = {
    "broadcast": True,
    "small_table": "reference_data",
    "large_table": "transaction_data",
    "join_key": "id"
}

small_table = spark.table(join_metadata["small_table"])
large_table = spark.table(join_metadata["large_table"])

if join_metadata["broadcast"]:
    result = large_table.join(F.broadcast(small_table), join_metadata["join_key"])
else:
    result = large_table.join(small_table, join_metadata["join_key"])

result.show()

17. Caching Strategy Automation

Dynamically cache tables based on usage frequency:

pythonCopy codecache_metadata = {
    "frequent_tables": ["table1", "table2"],
    "occasional_tables": ["table3"]
}

for table in cache_metadata["frequent_tables"]:
    spark.table(table).cache().count()  # Trigger caching

18. Schema Validation and Enforcement

Validate schema dynamically:

expected_schema = ["id", "name", "age"]
actual_schema = df.columns

missing_columns = set(expected_schema) - set(actual_schema)
if missing_columns:
    print(f"Missing columns: {missing_columns}")
else:
    print("Schema validation passed")

19. Dynamic Aggregations

Apply aggregations dynamically based on metadata:

aggregation_rules = [
    {"column": "sales", "agg_func": "sum"},
    {"column": "quantity", "agg_func": "avg"}
]

agg_exprs = [F.__getattr__(rule["agg_func"])(rule["column"]).alias(rule["column"]) for rule in aggregation_rules]
df.groupBy("region").agg(*agg_exprs).show()

20.Send emails with smtplib:

import smtplib
from email.mime.text import MIMEText

msg = MIMEText("Hello, world!")
msg['Subject'] = "Test Email"
msg['From'] = "your_email@gmail.com"
msg['To'] = "recipient_email@gmail.com"

server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login("your_email@gmail.com", "your_password")
server.sendmail("your_email@gmail.com", "recipient_email@gmail.com", msg.as_string())
server.quit()

Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading