β
PART 2: Data Engineering Project Using OOP + PySpark
π― Problem Statement:
Build a Metadata-driven ETL Framework in Python using OOP principles, powered by PySpark.
π¦ Project Modules:
Module | Purpose | OOP Feature Used |
---|
DataReader | Abstract file reader class | Abstract class |
CSVReader , JSONReader | Concrete file readers | Inheritance |
Transformer | Encapsulates transformations | Composition |
LoggerMixin | Adds logging to any class | Mixin |
ETLJob | Encapsulates full job | High-level class |
PipelineRunner | Manages job execution flow | Reusability |
π§± Project Structure:
etl_framework/
βββ base/
β βββ reader.py
β βββ transformer.py
β βββ logger.py
βββ jobs/
β βββ etl_job.py
βββ main.py
π§° reader.py
from abc import ABC, abstractmethod
class DataReader(ABC):
@abstractmethod
def read(self, path): pass
class CSVReader(DataReader):
def read(self, path):
return spark.read.option("header", True).csv(path)
class JSONReader(DataReader):
def read(self, path):
return spark.read.option("multiline", True).json(path)
βοΈ transformer.py
from pyspark.sql.functions import col
class Transformer:
def __init__(self, df):
self.df = df
def drop_nulls(self):
self.df = self.df.dropna()
return self
def uppercase(self, column):
self.df = self.df.withColumn(column, col(column).cast("string").alias(column.upper()))
return self
def get(self):
return self.df
π logger.py
class LoggerMixin:
def log(self, message):
print(f"[{self.__class__.__name__}] {message}")
π etl_job.py
from base.logger import LoggerMixin
from base.reader import CSVReader
from base.transformer import Transformer
class ETLJob(LoggerMixin):
def __init__(self, path):
self.path = path
self.reader = CSVReader()
def run(self):
self.log("Starting job")
df = self.reader.read(self.path)
df = Transformer(df).drop_nulls().get()
df.show()
self.log("Job finished")
βΆοΈ main.py
from jobs.etl_job import ETLJob
if __name__ == "__main__":
job = ETLJob("/data/sample.csv")
job.run()
π Advanced Add-Ons
- Add
MetadataTableReader
class (reads from Hive metastore) - Add
RetryMixin
, AuditLoggerMixin
- Add
ParameterStore
as Singleton class - Add versioned schema enforcement
- Track success/failure using a
LogWriter
class
π₯ Benefits of OOP in Data Engineering
Benefit | Explanation |
---|
Reusability | Reuse readers/transformers across jobs |
Encapsulation | Logic isolated in classes |
Testability | Easy to unit test each class |
Extensibility | Add new readers, transformations, loggers |
Maintainability | Easier to manage larger projects |
Pages: 1 2 3 4 5
Leave a Reply