Delta Properties and Check Constraints at Scale
Delta arguably is the most popular data format in the current Data Engineering landscape. It is mainly used with Databricks/Spark ecosystem; however, this is optional. It is a universal open-source format and works great out of the box. However, it might be that the default behavior is not sufficient for some use cases, and then custom table-level properties must be defined.
It is not that hard to hit that point. To define the Delta table property or the check constraint, you have the only option — ALTER TABLE statement. This command is easy to embed in an ad-hoc notebook. What to do when there are no manual processes and delta objects are delivered in a fully automated way using config-driven Python pipelines?
This story answers the question above and reveals my path from the exploration to the solution that scales.
Prepare a clean environment
First thing first, let’s prepare a clean Python environment to run showcase examples.
Traditionally, I am going to run the code on Apple Silicon; however, all snippets must be compatible with Linux and WSL of Windows 10/11:
mkdir delta-properties cd delta-properties python3.10 -m venv .venv source .venv/bin/activate pip install pyspark==3.3.1 delta-spark==2.3.0 touch delta-properties.ipynb code .
Then initialize the minimalistic configuration of Spark with the latest delta jar, and disable all Spark UI:
from pyspark.sql import SparkSession spark = ( SparkSession.builder.master("local[*]") .appName("MyApp") .config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config( "spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog", ) .config("spark.ui.showConsoleProgress", "false") .config("spark.ui.enabled", "false") .config("spark.ui.dagGraph.retainedRootRDDs", "1") .config("spark.ui.retainedJobs", "1") .config("spark.ui.retainedStages", "1") .config("spark.ui.retainedTasks", "1") .config("spark.sql.ui.retainedExecutions", "1") .config("spark.worker.ui.retainedExecutors", "1") .config("spark.worker.ui.retainedDrivers", "1") .config("spark.driver.memory", "4g") ).getOrCreate()
When the spark is up, create a sample delta table that will be the subject of coming experiments:
df = spark.sql( """ SELECT 1 as id, "John" AS name, struct( "Longstreet 1" AS line1, "1234AB" AS zipcode ) AS address """) df.printSchema() """ root |-- id: integer (nullable = false) |-- name: string (nullable = false) |-- address: struct (nullable = false) | |-- line1: string (nullable = false) | |-- zipcode: string (nullable = false) """ df.write.format("delta").mode("overwrite").save("/tmp/sample_delta_dataset")
A real-world example
So you build a Spark-based data processing framework. You want to improve Structured Streaming latencies by modifying default table properties. Because the streaming logic uses.ForEachBatch
with a MERGE
statement, you also plan to add NOT NULL
constraints on Primary Key columns in the silver layer.
Table Properties
This part is easy to construct with a bit of dynamic SQL and then embed into the framework, and it will work:
spark.sql( """ ALTER TABLE delta.`/tmp/sample_delta_dataset` SET TBLPROPERTIES ( delta.checkpoint.writeStatsAsStruct = true, delta.checkpoint.writeStatsAsJson = false ); """ )
However, the framework triggered the data load five times, and you noticed that Delta History is polluted by the SET TBLPROPERTIES
messages:
( spark.sql( "DESCRIBE HISTORY delta.`/tmp/sample_delta_dataset`;" ).select("version", "operation", "operationParameters") .show(truncate=False) ) """ +-------+-----------------+----------------------------------------------------------------------------------------------------------+ |version|operation |operationParameters | +-------+-----------------+----------------------------------------------------------------------------------------------------------+ |5 |SET TBLPROPERTIES|{properties -> {"delta.checkpoint.writeStatsAsStruct":"true","delta.checkpoint.writeStatsAsJson":"false"}}| |4 |SET TBLPROPERTIES|{properties -> {"delta.checkpoint.writeStatsAsStruct":"true","delta.checkpoint.writeStatsAsJson":"false"}}| |3 |SET TBLPROPERTIES|{properties -> {"delta.checkpoint.writeStatsAsStruct":"true","delta.checkpoint.writeStatsAsJson":"false"}}| |2 |SET TBLPROPERTIES|{properties -> {"delta.checkpoint.writeStatsAsStruct":"true","delta.checkpoint.writeStatsAsJson":"false"}}| |1 |SET TBLPROPERTIES|{properties -> {"delta.checkpoint.writeStatsAsStruct":"true","delta.checkpoint.writeStatsAsJson":"false"}}| |0 |WRITE |{mode -> Overwrite, partitionBy -> []} | +-------+-----------------+----------------------------------------------------------------------------------------------------------+ """
This is not a perfect situation, but still not a roadblock. At the end of the day, you can write the script that checks current table properties and changes them only when needed.
Table Check Constraints
Similarly to table properties, you extend the framework to create NOT NULL constraints for each Primary Key column of the input dataset config, so the logic gets one more dynamically generated SQL command:
spark.sql( """ ALTER TABLE delta.`/tmp/sample_delta_dataset` ADD CONSTRAINT id_is_not_null CHECK (id is not null) """ )
The first time it runs correctly, but the consecutive runs break the execution flow:
AnalysisException: Constraint 'id_is_not_null' already exists. Please delete the old constraint first. Old constraint: id is not null
This time, things get nastier. The error is a blocker and must be fixed in a structured way.
An elephant in the room
This elephant is a generation of SQL DDL statements. It is not just a non-Pythonic way to deal with this problem. The embedded DDL statements are hard for abstraction, not always idempotent, and inconvenient for unit testing.
Ideally, the official Delta-Spark package must contain the logic to deal with such tasks. However, at this point, the existing official class DeltaTable does not have any Python abstraction for properties, constraints, table metadata, etc. So it still must be defined using SQL DDL format.
What about native NOT NULL column constraint
Some may ask why I define a table-level constraint to enforce NOT NUL for a particular column while the documentation says it is possible to do on the column level:
spark.sql( """ ALTER TABLE delta.`/tmp/sample_delta_dataset` CHANGE COLUMN id SET NOT NULL; """ )
Even though such column-level Constraints are documented and must be supported, I failed to make them work in Databricks 11.3 and Spark 3.3.1. The command fails with the same error in both offerings:
AnalysisException: Cannot change nullable column to non-nullable: id; line 2 pos 4; AlterColumn resolvedfieldname(StructField(id,IntegerType,true)), false
I hope this issue will be solved soon, but for the time being, check constraints are used instead.
The internal benchmarks show that the overhead of table constraint checks is relatively tiny in data manipulation operations; however, due to the mentioned issue, I cannot directly compare table and column-level constraints.
An alternative using Python
The limitations mentioned earlier and the vision of having a “desired state configuration” kind of experience brought me to the need to write a Python abstraction around the discussed subject. So, to abstract the configuration process of the Delta tables. That all resulted in the Python class DeltaTableConfig
.
Table Properties
So, the first example. The framework knows that the table sample_delta_dataset
is a streaming dataset, and it prescribes the following two standard options for such a kind table:
STREAMING_PROPERTIES = { "delta.checkpoint.writeStatsAsStruct": "true", "delta.checkpoint.writeStatsAsJson": "false", } dtc = DeltaTableConfig(table_path="/tmp/sample_delta_dataset") dtc.properties = STREAMING_PROPERTIES
The first run enables the properties:
2023-05-21 16:03:35 INFO Applying table properties on 'delta.`/tmp/sample_delta_dataset`': 2023-05-21 16:03:35 INFO Checking if 'delta.checkpoint.writeStatsAsStruct = true' is set on delta.`/tmp/sample_delta_dataset` 2023-05-21 16:03:36 INFO The property has been set 2023-05-21 16:03:36 INFO Checking if 'delta.checkpoint.writeStatsAsJson = false' is set on delta.`/tmp/sample_delta_dataset` 2023-05-21 16:03:36 INFO The property has been set
And the following runs detect that properties already exist so it has nothing to do:
2023-05-21 16:09:06 INFO Applying table properties on 'delta.`/tmp/sample_delta_dataset`': 2023-05-21 16:09:06 INFO Checking if 'delta.checkpoint.writeStatsAsStruct = true' is set on delta.`/tmp/sample_delta_dataset` 2023-05-21 16:09:06 INFO The property already exists on the table 2023-05-21 16:09:06 INFO Checking if 'delta.checkpoint.writeStatsAsJson = false' is set on delta.`/tmp/sample_delta_dataset` 2023-05-21 16:09:06 INFO The property already exists on the table
The check of delta history confirms that the property-related messages appear in a single instance:
( spark.sql( "DESCRIBE HISTORY delta.`/tmp/sample_delta_dataset`;" ).select("version", "operation", "operationParameters") .show(truncate=False) ) +-------+-----------------+--------------------------------------------------------------+ |version|operation |operationParameters | +-------+-----------------+--------------------------------------------------------------+ |2 |SET TBLPROPERTIES|{properties -> {"delta.checkpoint.writeStatsAsJson":"false"}} | |1 |SET TBLPROPERTIES|{properties -> {"delta.checkpoint.writeStatsAsStruct":"true"}}| |0 |WRITE |{mode -> Overwrite, partitionBy -> []} | +-------+-----------------+--------------------------------------------------------------+
Imagine another scenario: the table type changed from “streaming” to “append-only.” The framework might have different options for that kind of table:
APPEND_ONLY_PROPERTIES = { "delta.appendOnly": "true", "delta.enableChangeDataFeed": "true", } dtc.properties = APPEND_ONLY_PROPERTIES
The DeltaTableConfig unsets previous properties and sets two new items:
2023-05-21 16:17:06 INFO Applying table properties on 'delta.`/tmp/sample_delta_dataset`': 2023-05-21 16:17:06 INFO Checking if 'delta.appendOnly = true' is set on delta.`/tmp/sample_delta_dataset` 2023-05-21 16:17:07 INFO The property has been set 2023-05-21 16:17:07 INFO Checking if 'delta.enableChangeDataFeed = true' is set on delta.`/tmp/sample_delta_dataset` 2023-05-21 16:17:07 INFO The property has been set 2023-05-21 16:17:08 INFO The property 'delta.checkpoint.writeStatsAsStruct = true' has been unset because it is not defined in the original dict 2023-05-21 16:17:08 INFO The property 'delta.checkpoint.writeStatsAsJson = false' has been unset because it is not defined in the original dict
If the new properties must be added with keeping all existing ones, use the flag .keep_existing_properties = True.
So, let’s re-add previously removed delta properties:
dtc.keep_existing_properties = True dtc.properties = STREAMING_PROPERTIES
The attribute .properties
acts as a getter and setter, so the existing values can be retrieved by getting the value of the attribute:
from pprint import pprint pprint(dtc.properties) {'delta.appendOnly': 'true', 'delta.checkpoint.writeStatsAsJson': 'false', 'delta.checkpoint.writeStatsAsStruct': 'true', 'delta.enableChangeDataFeed': 'true'}
Under the hood, the helper class runs DeltaTable.detail()
each time to get the current state.
Check Constraints
Similarly to the Table Properties, the data processing framework can pre-generate the input dictionary PK_CHECK_CONSTRAINTS
with the constraints definitions to add. DeltaTableConfig
then adds prescribed check constraints if they still need to be created.
dtc = DeltaTableConfig(table_path="/tmp/sample_delta_dataset") PK_CHECK_CONSTRAINTS = { "id_is_not_null": "id is not null", } dtc.check_constraints = PK_CHECK_CONSTRAINTS
results to the output:
2023-05-21 16:30:00 INFO Applying check constraints on 'delta.`/tmp/sample_delta_dataset`': 2023-05-21 16:30:00 INFO Checking if constraint 'id_is_not_null' was already set on delta.`/tmp/sample_delta_dataset` 2023-05-21 16:30:01 INFO The constraint id_is_not_null has been successfully added to 'delta.`/tmp/sample_delta_dataset`'
In case of new constraints are to be added instead of existing ones, provide the new dictionary to apply:
NEW_CHECK_CONSTRAINTS = { "address_zipcode_is_not_null": "address.zipcode is not null", } dtc.check_constraints = NEW_CHECK_CONSTRAINTS
As you might expect, the old constraint is removed, and a new one is added:
2023-05-21 16:32:00,180 INFO Applying check constraints on 'delta.`/tmp/sample_delta_dataset`': 2023-05-21 16:32:01,085 INFO The constraint 'id_is_not_null' has been dropped from the table 'delta.`/tmp/sample_delta_dataset`' 2023-05-21 16:32:01,085 INFO Checking if constraint 'address_zipcode_is_not_null' was already set on delta.`/tmp/sample_delta_dataset` 2023-05-21 16:32:01,616 INFO The constraint address_zipcode_is_not_null has been successfully added to 'delta.`/tmp/sample_delta_dataset`'
If existing check constraints must be preserved, use the optional argument .keep_existing_check_constraints=True
dtc.keep_existing_check_constraints = True dtc.check_constraints = PK_CHECK_CONSTRAINTS
All existing constraints can be programmatically retrieved via getter .check_constraints
:
print(dtc.check_constraints) { 'address_zipcode_is_not_null': 'address.zipcode is not null', 'id_is_not_null': 'id is not null' }
Delta Table Check Constraints in Action
Let’s then run a final smoke test to ensure that all those constraint efforts pay back:
spark.sql( """ INSERT INTO delta.`/tmp/sample_delta_dataset` VALUES (1, 'John Doe', STRUCT('123 Main St', '10001')) """ )
So, the insert with non-null values works as expected.
The following command intentionally inserts a NULL
in the nested attribute address.zipcode,
which is protected by the recently added check constraint:
spark.sql( """ INSERT INTO delta.`/tmp/sample_delta_dataset` VALUES (1, 'John Doe', STRUCT('123 Main St', NULL)) """ ) """ org.apache.spark.sql.delta.schema.DeltaInvariantViolationException: CHECK constraint address_zipcode_is_not_null (address.zipcode IS NOT NULL) violated by row with values: - address.zipcode : null """
The code fails with a clear message that must eventually trigger the data team to fix the issue.
The source code
from delta import DeltaTable from pyspark.sql import SparkSession from typing import Union import logging def get_logger(name: str): """Returns an instance of the logger""" logger = logging.getLogger(name) logger.setLevel(logging.INFO) if not logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter("%(asctime)s %(levelname)-8s %(message)s") handler.setFormatter(formatter) logger.addHandler(handler) return logger logger = get_logger(__name__) class DeltaTableConfig: """ Manages Delta Table properties, constraints, etc. Attributes: keep_existing_properties (bool): Preserves existing table properties if they are not in the input value. Defaults to False keep_existing_check_constraints (bool): Preserves existing table constraints if they are not in the input value. Defaults to False """ keep_existing_properties = False keep_existing_check_constraints = False def __init__( self, table_path: str = "", table_name: str = "", spark_session: SparkSession = None, ) -> None: """ Args: table_path (str, optional): Path to delta table. For instance: /mnt/db1/table1 table_name (str, optional): Delta table name. For instance: db1.table1 spark_session: (SparkSession, optional) The current spark context. Raises: ValueError: if values for both 'table_path' and 'table_name' provided provide values to one of them ValueError: if values for neither 'table_path' nor 'table_name' provided provide values to one of them Examples: >>> from spalah.dataset import DeltaTableConfig >>> dp = DeltaTableConfig(table_path="/path/dataset") >>> print(dp.properties) {'delta.deletedFileRetentionDuration': 'interval 15 days'} """ self.spark_session = ( SparkSession.getActiveSession() if not spark_session else spark_session ) self.table_name = self.__get_table_identifier( table_path=table_path, table_name=table_name ) self.original_table_name = table_name def __get_table_identifier( self, table_path: str = "", table_name: str = "", ) -> Union[str, None]: """Constructs table identifier from provided values.""" if table_path and table_name: raise ValueError( "Both 'table_path' and 'table_name' provided. Use one of them." ) if not table_path and not table_name: raise ValueError( "Neither 'table_path' nor 'table_name' provided. Use one of them." ) if table_path: table_name = f"delta.`{table_path}`" _identifier = table_path else: _identifier = table_name if not DeltaTable.isDeltaTable( sparkSession=self.spark_session, identifier=_identifier ): logger.warning(f"{table_name} is not a Delta Table") return None return table_name @property def properties(self) -> Union[dict, None]: """Gets/sets dataset's delta table properties. Args: value (dict): An input dictionary in the format: `{"property_name": "value"}` Examples: >>> from spalah.dataset import DeltaTableConfig >>> dp = DeltaTableConfig(table_path="/path/dataset") >>> >>> # get existing properties >>> print(dp.properties) {'delta.deletedFileRetentionDuration': 'interval 15 days'} >>> >>> # Adjust the property value from 15 to 30 days >>> dp.properties = {'delta.deletedFileRetentionDuration': 'interval 30 days'} """ if self.table_name: existing_properties = ( self.spark_session.sql(f"DESCRIBE DETAIL {self.table_name}") .select("properties") .collect()[0] .asDict()["properties"] ) else: existing_properties = None return existing_properties @properties.setter def properties( self, value: dict, ) -> None: """Sets delta properties Args: value (dict): An input dictionary in the format: `{"property_name": "value"}` """ _existing_properties = self.properties _new_properties = value if self.table_name: logger.info(f"Applying table properties on '{self.table_name}':") for k, v in _new_properties.items(): logger.info(f"Checking if '{k} = {v}' is set on {self.table_name}") if k in _existing_properties and _existing_properties[k] == str(v): logger.info("The property already exists on the table") else: _sql = ( f"ALTER TABLE {self.table_name} SET TBLPROPERTIES ({k} = '{v}')" ) self.spark_session.sql(_sql) logger.info("The property has been set") if not self.keep_existing_properties: for k, v in _existing_properties.items(): if k not in _new_properties: _sql = ( f"ALTER TABLE {self.table_name} UNSET TBLPROPERTIES ({k})" ) self.spark_session.sql(_sql) logger.info( f"The property '{k} = {v}' has been unset because it is not defined in " "the original dict" ) @property def check_constraints(self) -> Union[dict, None]: """Gets/sets dataset's delta table check constraints. Args: value (dict): An input dictionary in the format: `{"property_name": "value"}` Examples: >>> from spalah.dataset import DeltaTableConfig >>> dp = DeltaTableConfig(table_path="/path/dataset") >>> >>> # get existing constraints >>> print(dp.check_constraints) {} >>> >>> # Add a new check constraint >>> dp.check_constraints = {'id_is_not_null': 'id is not null'} """ _constraints = {} if self.table_name: for k, v in self.properties.items(): if k.startswith("delta.constraints."): _new_key = k.replace("delta.constraints.", "") _constraints[_new_key] = v else: _constraints = None return _constraints @check_constraints.setter def check_constraints( self, value: dict, ) -> None: """Dataset's delta table check constraints setter method. Args: value (dict): inptut dictionary in the format: `{"constraint_name": "constraint definition"}` """ if self.table_name: _existing_constraints = self.check_constraints _new_constraints = value logger.info(f"Applying check constraints on '{self.table_name}':") if not self.keep_existing_check_constraints: for k, v in _existing_constraints.items(): if k not in _new_constraints: _sql = f"ALTER TABLE {self.table_name} DROP CONSTRAINT {k}" self.spark_session.sql(_sql) logger.info( f"The constraint '{k}' has been dropped from the " f"table '{self.table_name}'" ) for k, v in _new_constraints.items(): _constraint_name = k.lower() logger.info( f"Checking if constraint '{_constraint_name}' was " f"already set on {self.table_name}" ) if _constraint_name in _existing_constraints: logger.info( f"The constraint '{_constraint_name}' already exists on the table" ) else: if v in _existing_constraints.values(): logger.warning( f" The constraint definition '{v}' already present on the table" ) else: _sql = ( f"ALTER TABLE {self.table_name} ADD CONSTRAINT " f"{_constraint_name} CHECK ({v})" ) self.spark_session.sql(_sql) logger.info( f"The constraint {_constraint_name} has been successfully " f"added to '{self.table_name}'" )
Alternatively, the source code is documented, maintained, tested, and constantly updated as a GitHub-hosted open-source project. So the usage is a few lines of the code away:
# %pip install spalah from spalah.dataset import DeltaProperty dt = DeltaProperty(table_path='/tmp/sample_delta_dataset') print(dt.properties)
Final Words
Although Delta is a self-sufficient and mature dataset format, sometimes it must be additionally configured, and specific table options, constraints, and metadata to be added.
While all that is possible via SQL DDL commands, it is just not handy to use at scale. Therefore, Python abstractions come into the picture with the hope that, eventually, something similar will be a part of the official Delta-Spark package.
Currently, DeltaTableConfig fills the missing functionality and helps manage delta properties and constraints in a Pythonic way and at scale.
Thank you for reading.