✅ Schema Enforcement in PySpark DataFrames
In PySpark, schema enforcement ensures that the structure (schema) of your DataFrame strictly matches the expected data types and field names during read and write operations.
🧠 What is Schema Enforcement?
Schema enforcement (also called schema-on-write) is the ability of Spark to validate incoming data against a predefined schema, and reject rows or throw errors when mismatches occur.
🔧 1. Enforcing Schema During DataFrame Read
🧪 Example:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.read.schema(schema).json("people.json")
df.show()
✅ What happens here:
- Spark does not infer schema
- Spark expects the JSON to match the defined schema
- If data has
"age": "twenty"(a string), Spark sets it tonullor throws an error depending on file format and options
🔍 What If You Don’t Define Schema?
df = spark.read.json("people.json")
- Spark infers the schema
- No enforcement → Can lead to inconsistent column types or nested ambiguity
🛠️ 2. Enforcing Schema While Creating a DataFrame from Python Objects
data = [("Alice", 30), ("Bob", 28)]
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.createDataFrame(data, schema=schema)
If data has a wrong type (e.g., ("Alice", "thirty")), Spark will:
- Try to cast or
- Throw an error depending on the strictness of schema matching
💾 3. Schema Enforcement on Write (with Delta Lake or Parquet)
Delta Example:
df.write.format("delta").mode("overwrite").save("/delta/users")
Then:
# Writing different schema (e.g., extra column)
df2.write.format("delta").mode("append").save("/delta/users")
📛 If df2 has a different schema (e.g., extra column gender), this fails unless you allow schema evolution.
Enabling schema evolution:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", True)
🔐 4. Related Spark Configurations
| Config | Description |
|---|---|
spark.sql.caseSensitive | Field name case enforcement |
spark.sql.parquet.mergeSchema | Enable schema merging in Parquet |
spark.databricks.delta.schema.autoMerge.enabled | Allow schema evolution on writes (Delta) |
✅ Summary Table
| Operation | Enforcement Applies? | Behavior |
|---|---|---|
read.json(..., schema) | ✅ Yes | Rejects/warns on mismatch |
createDataFrame(data, schema) | ✅ Yes | Enforces casting and structure |
write.format("delta") | ✅ Yes | Requires same schema unless auto-merge is on |
write.csv() or write.parquet() | ✅ Yes (with metadata) | Fails if schema mismatches with saved one |
📘 Best Practices
- Always define schema explicitly for production pipelines.
- Validate schema before writing with
.printSchema()orschema.json(). - Use Delta Lake for strong schema enforcement + evolution combo.
- Avoid relying on schema inference with complex or nested data.
✅ Read Modes in PySpark DataFrames (including Databricks)
When reading data in PySpark (whether on vanilla Spark or Databricks), read modes define how Spark handles malformed records, corrupt rows, or schema mismatches during file reading.
📘 mode Parameter in DataFrameReader
Applies to formats like:
.read.format("json" / "csv" / "parquet" / "delta")
🔧 Syntax:
df = spark.read.format("json").option("mode", "<mode>").load("path")
🧠 Available Read Modes in PySpark
| Mode | Description |
|---|---|
"PERMISSIVE" (default) | Tries to parse all rows, sets corrupt or missing fields to null, and puts bad data in a special column (e.g., _corrupt_record for JSON) |
"DROPMALFORMED" | Silently drops rows that don’t match the schema |
"FAILFAST" | Throws an exception as soon as it encounters a malformed row |
"IGNORE" | Ignores corrupted rows completely, does not load them (used mainly for Parquet/ORC with corrupt block skipping) |
🧪 Example: Reading Malformed JSON
File (bad.json)
{"name": "Alice", "age": 30}
{"name": "Bob", "age": "thirty"} <-- Malformed
{"name": "Carol"}
Read in PERMISSIVE mode:
df = spark.read.option("mode", "PERMISSIVE").json("bad.json")
df.show()
✅ Output:
- Parses valid rows
agebecomesnullwhere invalid_corrupt_recordcolumn may be added
Read in FAILFAST mode:
df = spark.read.option("mode", "FAILFAST").json("bad.json")
🚨 Fails immediately on second row.
Read in DROPMALFORMED mode:
df = spark.read.option("mode", "DROPMALFORMED").json("bad.json")
✅ Drops rows that cannot be parsed.
🧱 Parquet, ORC, and Delta Specifics
| Format | Mode Supported | Default Behavior |
|---|---|---|
| CSV | All modes | PERMISSIVE |
| JSON | All modes | PERMISSIVE |
| Parquet | Supports IGNORE corrupt files (spark.sql.files.ignoreCorruptFiles = true) | Fails |
| Delta | Schema must match or be evolved (Delta has stronger enforcement) | Fails on mismatch unless autoMerge is on |
🛠️ In Databricks
Databricks also supports these modes, plus:
- Auto-detect schema with
.option("inferSchema", "true") - Notebook-level configs to handle corrupt files:
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
✅ Summary Table
| Mode | Good For | Risk/Note |
|---|---|---|
| PERMISSIVE | Default, safest, logs errors | May include invalid rows |
| DROPMALFORMED | Quick cleanup pipelines | May drop valid but slightly malformed data |
| FAILFAST | Strict schema enforcement | Fails fast, best for production |
| IGNORE | Corrupt file skipping (Parquet, ORC) | Doesn’t work for JSON/CSV |
🔄 Bonus Tip: Audit Malformed Rows
df = spark.read.option("mode", "PERMISSIVE").json("bad.json")
df.filter("_corrupt_record IS NOT NULL").show()
✅ Helps you isolate bad records for manual review or automated cleanup.
Leave a Reply