ETL framework for Dynamic Pyspark SQL Api Code Execution

from pyspark.storagelevel import StorageLevel

def get_table_size(df):
    """Fetch table size efficiently"""
    if df.is_cached:
        return df.storageLevel.useDisk  # Approximate size in memory
    else:
        return None  # Not cached, so size is unknown

table_size = get_table_size(df)


from pyspark.sql.functions import col

def estimate_table_size(df):
    """Estimates table size without collecting data"""
    row_count = df.count()
    
    # Approximate column sizes (in bytes)
    size_per_column = {
        "int": 4, "bigint": 8, "double": 8, "float": 4, 
        "string": 50, "boolean": 1, "timestamp": 8  # Approx. per row
    }

    total_size = 0
    for col_name, dtype in df.dtypes:
        total_size += size_per_column.get(dtype, 50)  # Default to 50 bytes for unknown types

    return row_count * total_size  # Total estimated size in bytes

table_size = estimate_table_size(df)


##Final Optimized Code in ETL Logging
Replace:
table_size = sum(df.toPandas().memory_usage(deep=True)) if df else None
With:
table_size = estimate_table_size(df)

df_metadata = spark.createDataFrame([
    (1, "filter_data", "SELECT * FROM source_data WHERE status = 'active'", None, True, False, "category", 1, 1),
    (2, "aggregate_data", "SELECT category, SUM(amount) AS total FROM filter_data GROUP BY category", "filter_data", True, False, None, 2, 1),
    (3, "join_with_small", "SELECT a.*, b.extra_info FROM aggregate_data a JOIN small_data b ON a.category = b.category", "aggregate_data", False, True, None, 3, 1)
], ["step_id", "process_name", "sql_query", "depends_on", "cache_flag", "broadcast_flag", "repartition_col", "execution_order", "version"])

df_metadata.write.mode("overwrite").saveAsTable("etl_metadata")

df_logs = spark.createDataFrame([], schema="""
    step_id INT, process_name STRING, start_time TIMESTAMP, end_time TIMESTAMP, 
    status STRING, error_message STRING, count_rows BIGINT, table_size STRING
""")
df_logs.write.mode("overwrite").saveAsTable("etl_logs")

from pyspark.sql.functions import col, max as spark_max

def update_metadata(new_data):
    # Get latest version
    latest_version = spark.sql("SELECT MAX(version) FROM etl_metadata").collect()[0][0] or 0

    # Mark previous version as inactive
    spark.sql(f"ALTER TABLE etl_metadata SET TBLPROPERTIES ('retention'='1')")
    
    # Insert new version of metadata
    new_df = spark.createDataFrame(new_data, df_metadata.schema).withColumn("version", lit(latest_version + 1))
    new_df.write.mode("append").insertInto("etl_metadata")

# Sample Metadata Update
new_metadata_row = [(4, "new_transformation", "SELECT * FROM filter_data WHERE amount > 100", "filter_data", True, False, None, 4, None)]
update_metadata(new_metadata_row)
df_metadata = spark.createDataFrame([
    (1, "filter_data", "SELECT * FROM source_data WHERE status = 'active'", None, True, False, "category", 1, 1),
    (2, "aggregate_data", "SELECT category, SUM(amount) AS total FROM filter_data GROUP BY category", "filter_data", True, False, None, 2, 1),
    (3, "join_with_small", "SELECT a.*, b.extra_info FROM aggregate_data a JOIN small_data b ON a.category = b.category", "aggregate_data", False, True, None, 3, 1),
    (4, "final_output", "SELECT * FROM join_with_small WHERE total > 500", "join_with_small", False, False, None, 4, 1)
], ["step_id", "process_name", "sql_query", "depends_on", "cache_flag", "broadcast_flag", "repartition_col", "execution_order", "version"])

df_metadata.write.mode("overwrite").saveAsTable("etl_metadata")

 New Feature: depends_on can now store multiple dependencies as a comma-separated list (e.g., "step1, step2").
Pages ( 2 of 3 ): « Previous1 2 3Next »

Subscribe