from pyspark.storagelevel import StorageLevel
def get_table_size(df):
"""Fetch table size efficiently"""
if df.is_cached:
return df.storageLevel.useDisk # Approximate size in memory
else:
return None # Not cached, so size is unknown
table_size = get_table_size(df)
from pyspark.sql.functions import col
def estimate_table_size(df):
"""Estimates table size without collecting data"""
row_count = df.count()
# Approximate column sizes (in bytes)
size_per_column = {
"int": 4, "bigint": 8, "double": 8, "float": 4,
"string": 50, "boolean": 1, "timestamp": 8 # Approx. per row
}
total_size = 0
for col_name, dtype in df.dtypes:
total_size += size_per_column.get(dtype, 50) # Default to 50 bytes for unknown types
return row_count * total_size # Total estimated size in bytes
table_size = estimate_table_size(df)
##Final Optimized Code in ETL Logging
Replace:
table_size = sum(df.toPandas().memory_usage(deep=True)) if df else None
With:
table_size = estimate_table_size(df)
df_metadata = spark.createDataFrame([
(1, "filter_data", "SELECT * FROM source_data WHERE status = 'active'", None, True, False, "category", 1, 1),
(2, "aggregate_data", "SELECT category, SUM(amount) AS total FROM filter_data GROUP BY category", "filter_data", True, False, None, 2, 1),
(3, "join_with_small", "SELECT a.*, b.extra_info FROM aggregate_data a JOIN small_data b ON a.category = b.category", "aggregate_data", False, True, None, 3, 1)
], ["step_id", "process_name", "sql_query", "depends_on", "cache_flag", "broadcast_flag", "repartition_col", "execution_order", "version"])
df_metadata.write.mode("overwrite").saveAsTable("etl_metadata")
df_logs = spark.createDataFrame([], schema="""
step_id INT, process_name STRING, start_time TIMESTAMP, end_time TIMESTAMP,
status STRING, error_message STRING, count_rows BIGINT, table_size STRING
""")
df_logs.write.mode("overwrite").saveAsTable("etl_logs")
from pyspark.sql.functions import col, max as spark_max
def update_metadata(new_data):
# Get latest version
latest_version = spark.sql("SELECT MAX(version) FROM etl_metadata").collect()[0][0] or 0
# Mark previous version as inactive
spark.sql(f"ALTER TABLE etl_metadata SET TBLPROPERTIES ('retention'='1')")
# Insert new version of metadata
new_df = spark.createDataFrame(new_data, df_metadata.schema).withColumn("version", lit(latest_version + 1))
new_df.write.mode("append").insertInto("etl_metadata")
# Sample Metadata Update
new_metadata_row = [(4, "new_transformation", "SELECT * FROM filter_data WHERE amount > 100", "filter_data", True, False, None, 4, None)]
update_metadata(new_metadata_row)
df_metadata = spark.createDataFrame([
(1, "filter_data", "SELECT * FROM source_data WHERE status = 'active'", None, True, False, "category", 1, 1),
(2, "aggregate_data", "SELECT category, SUM(amount) AS total FROM filter_data GROUP BY category", "filter_data", True, False, None, 2, 1),
(3, "join_with_small", "SELECT a.*, b.extra_info FROM aggregate_data a JOIN small_data b ON a.category = b.category", "aggregate_data", False, True, None, 3, 1),
(4, "final_output", "SELECT * FROM join_with_small WHERE total > 500", "join_with_small", False, False, None, 4, 1)
], ["step_id", "process_name", "sql_query", "depends_on", "cache_flag", "broadcast_flag", "repartition_col", "execution_order", "version"])
df_metadata.write.mode("overwrite").saveAsTable("etl_metadata")
New Feature: depends_on can now store multiple dependencies as a comma-separated list (e.g., "step1, step2").
Share this- Make us Famous:-