This Post is Collection of Handy Tricks and Snippets.
Passing Parameters in Automation of Scripts using Python
Python provides several ways to pass parameters in automation of scripts, mimicking SAS macro variables, macro modules, and macro scripting. Here are some methods:
1. Command-Line Arguments
Use the argparse
library to pass command-line arguments to your Python script.
Python
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--input_file', help='Input file path')
parser.add_argument('--output_file', help='Output file path')
args = parser.parse_args()
# Use the arguments
input_file = args.input_file
output_file = args.output_file
2. Environment Variables
Set environment variables using the os
library and access them in your script.
Python
import os
input_file = os.environ.get('INPUT_FILE')
output_file = os.environ.get('OUTPUT_FILE')
3. Configuration Files
Use configuration files like JSON, YAML, or INI files to store parameters.
Python
import json
with open('config.json') as f:
config = json.load(f)
input_file = config['input_file']
output_file = config['output_file']
4. Python Variables
Define Python variables at the top of your script or in a separate module.
Python
# config.py
INPUT_FILE = 'input.csv'
OUTPUT_FILE = 'output.csv'
5. Lit Function (Apache Spark)
In Apache Spark, use the lit
function to create a literal column with a specified value.
Python
from pyspark.sql.functions import lit
df = spark.read.csv('input.csv')
df = df.withColumn('output_file', lit('output.csv'))
6. Spark Config
Use Spark configuration properties to pass parameters.
Python
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('My App').config('spark.input.file', 'input.csv').getOrCreate()
Mimicking SAS Macro Variables
To mimic SAS macro variables, you can use a combination of the above methods. For example:
- Define a dictionary with macro variable names as keys and values as values.
- Use the
format
method to substitute macro variable values into strings.
Python
macro_vars = {
'INPUT_FILE': 'input.csv',
'OUTPUT_FILE': 'output.csv'
}
input_file = macro_vars['INPUT_FILE']
output_file = macro_vars['OUTPUT_FILE']
query = "SELECT * FROM {INPUT_FILE}".format(**macro_vars)
Mimicking SAS Macro Modules
To mimic SAS macro modules, you can create separate Python modules with functions that perform specific tasks. Import these modules into your main script and call the functions as needed.
Python
# module1.py
def read_input_file(input_file):
# code to read input file
pass
# module2.py
def process_data(data):
# code to process data
pass
# main.py
from module1 import read_input_file
from module2 import process_data
input_file = 'input.csv'
data = read_input_file(input_file)
processed_data = process_data(data)
Mimicking SAS Macro Scripting
To mimic SAS macro scripting, you can use a combination of the above methods. For example:
- Define a dictionary with macro variable names as keys and values as values.
- Use the
format
method to substitute macro variable values into strings. - Create separate Python modules with functions that perform specific tasks.
- Import these modules into your main script and call the functions as needed.
Python
# config.py
macro_vars = {
'INPUT_FILE': 'input.csv',
'OUTPUT_FILE': 'output.csv'
}
# module1.py
def read_input_file(input_file):
# code to read input file
pass
# module2.py
def process_data(data):
# code to process data
pass
# main.py
from config import macro_vars
from module1 import read_input_file
from module2 import process_data
input_file = macro_vars['INPUT_FILE']
data = read_input_file(input_file)
processed_data = process_data(data)
Mimicking SAS Macro Variables in Python and PySpark
SAS macro variables can be created from tables, rows, or columns, and can also be used to dynamically generate dates. Here’s how you can mimic these functionalities in Python and PySpark:
1. Creating Macro Variables from Tables
SAS:
proc sql;
select distinct dept into :dept1-:dept100
from employees;
quit;
Python:
import pandas as pd
# assuming 'employees' is a pandas DataFrame
depts = employees['dept'].unique()
dept_vars = [f'dept{i+1}' for i in range(len(depts))]
for var, dept in zip(dept_vars, depts):
globals()[var] = dept
PySpark:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
employees = spark.read.csv('employees.csv', header=True, inferSchema=True)
depts = employees.select('dept').distinct().collect()
dept_vars = [f'dept{i+1}' for i in range(len(depts))]
for var, dept in zip(dept_vars, depts):
globals()[var] = dept.value
2. Creating Macro Variables from Rows
SAS:
proc sql;
select name, age into :name1, :age1, :name2, :age2
from employees
where rownum <= 2;
quit;
Python:
import pandas as pd
# assuming 'employees' is a pandas DataFrame
rows = employees.head(2).values.tolist()
name_vars = [f'name{i+1}' for i in range(len(rows))]
age_vars = [f'age{i+1}' for i in range(len(rows))]
for var, row in zip(name_vars, rows):
globals()[var] = row[0]
for var, row in zip(age_vars, rows):
globals()[var] = row[1]
PySpark:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
employees = spark.read.csv('employees.csv', header=True, inferSchema=True)
rows = employees.limit(2).collect()
name_vars = [f'name{i+1}' for i in range(len(rows))]
age_vars = [f'age{i+1}' for i in range(len(rows))]
for var, row in zip(name_vars, rows):
globals()[var] = row.name
for var, row in zip(age_vars, rows):
globals()[var] = row.age
3. Creating Macro Variables from Columns
SAS:
proc sql;
select distinct column1 into :col1-:col100
from table1;
quit;
Python:
import pandas as pd
# assuming 'table1' is a pandas DataFrame
cols = table1['column1'].unique()
col_vars = [f'col{i+1}' for i in range(len(cols))]
for var, col in zip(col_vars, cols):
globals()[var] = col
PySpark:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
table1 = spark.read.csv('table1.csv', header=True, inferSchema=True)
cols = table1.select('column1').distinct().collect()
col_vars = [f'col{i+1}' for i in range(len(cols))]
for var, col in zip(col_vars, cols):
globals()[var] = col.value
4. Dynamic Date Values
SAS:
%let today = %sysfunc(date());
%let yesterday = %sysfunc(intnx('day', &today, -1));
Python:
from datetime import date, timedelta
today = date.today()
yesterday = today - timedelta(days=1)
PySpark:
from pyspark.sql.functions import current_date, date_sub
today = current_date()
yesterday = date_sub(today, 1)
collect()
and foreach()
collect()
and foreach()
are two commonly used methods in Apache Spark, particularly in PySpark. Here’s how they can be used in automation:
collect()
The collect()
method is used to retrieve all the elements of a DataFrame or an RDD (Resilient Distributed Dataset) as a list. This method is useful when you need to perform some operation on the entire dataset.
Example:
Python
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = spark.read.csv('data.csv', header=True, inferSchema=True)
# Collect the data as a list
data_list = data.collect()
# Perform some operation on the list
for row in data_list:
print(row)
In automation, collect()
can be used to:
- Retrieve data from a DataFrame or RDD and perform some operation on it.
- Validate data by checking for specific values or patterns.
- Transform data by applying some function to each element.
foreach()
The foreach()
method is used to apply a function to each element of a DataFrame or an RDD. This method is useful when you need to perform some operation on each element of the dataset.
Example:
Python
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = spark.read.csv('data.csv', header=True, inferSchema=True)
# Define a function to apply to each element
def print_row(row):
print(row)
# Apply the function to each element
data.foreach(print_row)
In automation, foreach()
can be used to:
- Perform some operation on each element of a dataset.
- Validate data by checking for specific values or patterns.
- Transform data by applying some function to each element.
Use cases in automation
Here are some use cases for collect()
and foreach()
in automation:
- Data validation: Use
collect()
to retrieve data from a DataFrame or RDD and then validate it by checking for specific values or patterns. - Data transformation: Use
foreach()
to apply a function to each element of a DataFrame or RDD and transform the data. - Reporting: Use
collect()
to retrieve data from a DataFrame or RDD and then generate reports based on the data. - Alerting: Use
foreach()
to apply a function to each element of a DataFrame or RDD and send alerts based on specific conditions.
In summary, collect()
and foreach()
are powerful methods in PySpark that can be used in automation to retrieve data, perform operations on each element, and transform data.
Common operations, optimizations, and scenarios encountered in data engineering and data analysis workflows
1. Python Automation Snippets
Reading Configurations
Use a JSON or YAML file to manage configurations dynamically:
pythonCopy codeimport json
# Load configuration
with open("config.json", "r") as file:
config = json.load(file)
# Access configurations
input_path = config["input_path"]
output_path = config["output_path"]
Dynamic Function Execution
Automatically execute functions based on a mapping dictionary:
pythonCopy codedef task_a():
print("Executing Task A")
def task_b():
print("Executing Task B")
tasks = {"task_a": task_a, "task_b": task_b}
# Dynamically execute based on input
task_name = "task_a"
tasks[task_name]()
Parallel Processing
Use concurrent.futures
for simple parallel processing:
pythonCopy codefrom concurrent.futures import ThreadPoolExecutor
def process_data(partition):
# Your processing logic
return partition * 2
data = [1, 2, 3, 4, 5]
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(process_data, data))
print(results)
File Monitoring
Automatically detect new files in a directory and process them:
pythonCopy codeimport os
import time
directory = "/path/to/watch"
def process_file(file):
print(f"Processing file: {file}")
processed_files = set()
while True:
files = set(os.listdir(directory))
new_files = files - processed_files
for file in new_files:
process_file(file)
processed_files.add(file)
time.sleep(10)
2. PySpark Automation Snippets
Dynamic Column Selection
Generate a dynamic list of columns based on conditions:
pythonCopy codefrom pyspark.sql import functions as F
columns = [col for col in df.columns if col.startswith("metric")]
df.select(columns).show()
Dynamic SQL Query
Build and execute dynamic SQL queries:
pythonCopy codequery_template = """
SELECT {columns}
FROM {table}
WHERE {conditions}
"""
columns = "id, name"
table = "users"
conditions = "age > 30"
query = query_template.format(columns=columns, table=table, conditions=conditions)
result_df = spark.sql(query)
result_df.show()
Automated Partition Handling
Write data to Hive with dynamic partitioning:
pythonCopy codedf.write
.partitionBy("year", "month")
.mode("overwrite")
.saveAsTable("database.table")
Broadcast Variables
Broadcast a small dataset to optimize joins:
pythonCopy codefrom pyspark.sql.functions import broadcast
small_df = spark.table("small_table")
large_df = spark.table("large_table")
result = large_df.join(broadcast(small_df), "key")
result.show()
3. General Automation Patterns
Error Handling and Logging
Centralized error logging:
pythonCopy codeimport logging
logging.basicConfig(level=logging.INFO, filename="app.log", filemode="a",
format="%(asctime)s - %(levelname)s - %(message)s")
try:
result = 10 / 0
except ZeroDivisionError as e:
logging.error(f"Error occurred: {e}")
Metadata-Driven Automation
Control job execution through metadata tables:
pythonCopy codemetadata = spark.sql("SELECT * FROM job_metadata WHERE status = 'active'")
for row in metadata.collect():
query = row["query"]
table_name = row["output_table"]
df = spark.sql(query)
df.write.mode("overwrite").saveAsTable(table_name)
Dynamic DataFrame Transformations
Apply transformations dynamically based on conditions:
pythonCopy codetransformations = {
"uppercase": lambda col: F.upper(F.col(col)),
"add_prefix": lambda col: F.concat(F.lit("prefix_"), F.col(col)),
}
transformation = "uppercase"
column = "name"
df = df.withColumn(column, transformations[transformation](column))
df.show()
4. Optimizations
Avoid Data Skew
Add salting for skewed joins:
pythonCopy codedf = df.withColumn("salt", (F.rand() * 10).cast("int"))
joined_df = df1.join(df2, (df1.key == df2.key) & (df1.salt == df2.salt))
Reduce Shuffling
Use reduceByKey
instead of groupByKey
:
pythonCopy coderdd = sc.parallelize([(1, 2), (1, 3), (2, 4)])
result = rdd.reduceByKey(lambda x, y: x + y).collect()
5. Automation for ETL
Dynamic ETL Framework
Use a configuration table to control ETL:
pythonCopy codemetadata = spark.sql("SELECT * FROM etl_metadata WHERE status = 'active'")
for row in metadata.collect():
source = row["source_table"]
target = row["target_table"]
query = row["query"]
df = spark.sql(query)
df.write.mode("overwrite").saveAsTable(target)
Automated Logging
Log job statuses automatically:
pythonCopy codefrom datetime import datetime
def log_status(job_name, status, message=None):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] Job: {job_name} | Status: {status} | Message: {message}")
log_status("ETL_Job", "SUCCESS")
6.Schedule tasks with schedule library:
import schedule
import time
def job():
print("I'm working...")
schedule.every(10).minutes.do(job) # run job every 10 minutes
while True:
schedule.run_pending()
time.sleep(1)
7.Use argparse for command-line arguments:
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--input_file", help="Input file path")
args = parser.parse_args()
print(args.input_file)
8.Automate file operations with shutil:
import shutil
shutil.copyfile("source.txt", "destination.txt")
9. Dynamic File Processing with Metadata
Use metadata to control file processing logic dynamically:
pythonCopy codefile_metadata = [
{"file_path": "/data/sales_2023.csv", "delimiter": ",", "table_name": "sales_data"},
{"file_path": "/data/users_2023.txt", "delimiter": "t", "table_name": "user_data"}
]
for meta in file_metadata:
file_path = meta["file_path"]
delimiter = meta["delimiter"]
table_name = meta["table_name"]
df = spark.read.option("delimiter", delimiter).csv(file_path, header=True)
df.write.mode("overwrite").saveAsTable(table_name)
10. Dynamic Transformation Using Configurations
Apply transformations based on metadata-driven configurations:
pythonCopy codetransformation_rules = [
{"column": "price", "operation": "multiply", "value": 1.1},
{"column": "quantity", "operation": "add", "value": 5}
]
for rule in transformation_rules:
column = rule["column"]
operation = rule["operation"]
value = rule["value"]
if operation == "multiply":
df = df.withColumn(column, F.col(column) * value)
elif operation == "add":
df = df.withColumn(column, F.col(column) + value)
11. Conditional Data Validation and Logging
Validate data dynamically and log issues:
pythonCopy codevalidation_rules = [
{"column": "age", "condition": "age > 0", "message": "Invalid age"},
{"column": "salary", "condition": "salary > 0", "message": "Invalid salary"}
]
for rule in validation_rules:
column = rule["column"]
condition = rule["condition"]
message = rule["message"]
invalid_rows = df.filter(f"NOT ({condition})")
invalid_rows.write.mode("append").csv(f"/logs/{column}_errors.csv")
12. Metadata-Driven Job Scheduling
Control job execution using a metadata table:
pythonCopy codejobs_metadata = spark.sql("SELECT * FROM job_schedule WHERE status = 'active'")
for job in jobs_metadata.collect():
job_name = job["job_name"]
script_path = job["script_path"]
print(f"Running Job: {job_name}")
exec(open(script_path).read())
13. Exception Handling with Retry Logic
Implement retry logic for error handling in transformations:
pythonCopy codeMAX_RETRIES = 3
RETRY_DELAY = 10 # seconds
def run_with_retry(func, *args, **kwargs):
for attempt in range(MAX_RETRIES):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt < MAX_RETRIES - 1:
print(f"Retrying after error: {e}")
time.sleep(RETRY_DELAY)
else:
print(f"Failed after {MAX_RETRIES} retries: {e}")
raise
# Example usage
run_with_retry(spark.sql, "SELECT * FROM table")
14. Efficient Grouped Operations
Optimize grouped operations with mapGroups
:
pythonCopy codefrom pyspark.sql.functions import col
from pyspark.sql import Row
def custom_group_processing(key, rows):
total = sum(row.value for row in rows)
return [Row(group=key, total=total)]
df.rdd.mapPartitions(lambda partition: partition.map(lambda x: (x.group, x))).groupByKey().flatMap(
lambda group: custom_group_processing(*group)
).toDF().show()
15. Dynamic Partitioning and Bucketing
Dynamically partition and bucket tables based on metadata:
pythonCopy codepartition_metadata = {
"sales_data": {"partition_by": "region", "buckets": 10},
"user_data": {"partition_by": "country", "buckets": 5}
}
for table, meta in partition_metadata.items():
partition_by = meta["partition_by"]
buckets = meta["buckets"]
df = spark.table(table)
df.write.partitionBy(partition_by).bucketBy(buckets, "id").saveAsTable(f"{table}_optimized")
16. Dynamic Broadcast Join Based on Metadata
Use metadata to decide whether to broadcast a table:
join_metadata = {
"broadcast": True,
"small_table": "reference_data",
"large_table": "transaction_data",
"join_key": "id"
}
small_table = spark.table(join_metadata["small_table"])
large_table = spark.table(join_metadata["large_table"])
if join_metadata["broadcast"]:
result = large_table.join(F.broadcast(small_table), join_metadata["join_key"])
else:
result = large_table.join(small_table, join_metadata["join_key"])
result.show()
17. Caching Strategy Automation
Dynamically cache tables based on usage frequency:
pythonCopy codecache_metadata = {
"frequent_tables": ["table1", "table2"],
"occasional_tables": ["table3"]
}
for table in cache_metadata["frequent_tables"]:
spark.table(table).cache().count() # Trigger caching
18. Schema Validation and Enforcement
Validate schema dynamically:
expected_schema = ["id", "name", "age"]
actual_schema = df.columns
missing_columns = set(expected_schema) - set(actual_schema)
if missing_columns:
print(f"Missing columns: {missing_columns}")
else:
print("Schema validation passed")
19. Dynamic Aggregations
Apply aggregations dynamically based on metadata:
aggregation_rules = [
{"column": "sales", "agg_func": "sum"},
{"column": "quantity", "agg_func": "avg"}
]
agg_exprs = [F.__getattr__(rule["agg_func"])(rule["column"]).alias(rule["column"]) for rule in aggregation_rules]
df.groupBy("region").agg(*agg_exprs).show()
20.Send emails with smtplib:
import smtplib
from email.mime.text import MIMEText
msg = MIMEText("Hello, world!")
msg['Subject'] = "Test Email"
msg['From'] = "your_email@gmail.com"
msg['To'] = "recipient_email@gmail.com"
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login("your_email@gmail.com", "your_password")
server.sendmail("your_email@gmail.com", "recipient_email@gmail.com", msg.as_string())
server.quit()
Discover more from HintsToday
Subscribe to get the latest posts sent to your email.