Delta Properties and Check Constraints at Scale

Delta arguably is the most popular data format in the current Data Engineering landscape. It is mainly used with Databricks/Spark ecosystem; however, this is optional. It is a universal open-source format and works great out of the box. However, it might be that the default behavior is not sufficient for some use cases, and then custom table-level properties must be defined.

It is not that hard to hit that point. To define the Delta table property or the check constraint, you have the only option — ALTER TABLE statement. This command is easy to embed in an ad-hoc notebook. What to do when there are no manual processes and delta objects are delivered in a fully automated way using config-driven Python pipelines?

This story answers the question above and reveals my path from the exploration to the solution that scales.

Prepare a clean environment

First thing first, let’s prepare a clean Python environment to run showcase examples.

Traditionally, I am going to run the code on Apple Silicon; however, all snippets must be compatible with Linux and WSL of Windows 10/11:

mkdir delta-properties
cd delta-properties
python3.10 -m venv .venv
source .venv/bin/activate
pip install pyspark==3.3.1 delta-spark==2.3.0
touch delta-properties.ipynb
code .

Then initialize the minimalistic configuration of Spark with the latest delta jar, and disable all Spark UI:

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.master("local[*]")
    .appName("MyApp")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )    
    .config("spark.ui.showConsoleProgress", "false")
    .config("spark.ui.enabled", "false")
    .config("spark.ui.dagGraph.retainedRootRDDs", "1")
    .config("spark.ui.retainedJobs", "1")
    .config("spark.ui.retainedStages", "1")
    .config("spark.ui.retainedTasks", "1")
    .config("spark.sql.ui.retainedExecutions", "1")
    .config("spark.worker.ui.retainedExecutors", "1")
    .config("spark.worker.ui.retainedDrivers", "1")
    .config("spark.driver.memory", "4g") 
).getOrCreate()

When the spark is up, create a sample delta table that will be the subject of coming experiments:

df = spark.sql(
    """
    SELECT 
        1 as id, 
        "John" AS name, 
        struct(
            "Longstreet 1" AS line1, 
            "1234AB" AS zipcode
        ) AS address
""")
df.printSchema()

"""
root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = false)
 |-- address: struct (nullable = false)
 |    |-- line1: string (nullable = false)
 |    |-- zipcode: string (nullable = false)
"""

df.write.format("delta").mode("overwrite").save("/tmp/sample_delta_dataset")

A real-world example

So you build a Spark-based data processing framework. You want to improve Structured Streaming latencies by modifying default table properties. Because the streaming logic uses.ForEachBatch with a MERGE statement, you also plan to add NOT NULL constraints on Primary Key columns in the silver layer.

Table Properties

This part is easy to construct with a bit of dynamic SQL and then embed into the framework, and it will work:

spark.sql(
    """
    ALTER TABLE delta.`/tmp/sample_delta_dataset`
    SET TBLPROPERTIES (
        delta.checkpoint.writeStatsAsStruct = true,
        delta.checkpoint.writeStatsAsJson = false
    );
    """
)

However, the framework triggered the data load five times, and you noticed that Delta History is polluted by the SET TBLPROPERTIES messages:

(
    spark.sql(
        "DESCRIBE HISTORY delta.`/tmp/sample_delta_dataset`;"
    ).select("version", "operation", "operationParameters")
    .show(truncate=False)
)

"""
+-------+-----------------+----------------------------------------------------------------------------------------------------------+
|version|operation        |operationParameters                                                                                       |
+-------+-----------------+----------------------------------------------------------------------------------------------------------+
|5      |SET TBLPROPERTIES|{properties -> {"delta.checkpoint.writeStatsAsStruct":"true","delta.checkpoint.writeStatsAsJson":"false"}}|
|4      |SET TBLPROPERTIES|{properties -> {"delta.checkpoint.writeStatsAsStruct":"true","delta.checkpoint.writeStatsAsJson":"false"}}|
|3      |SET TBLPROPERTIES|{properties -> {"delta.checkpoint.writeStatsAsStruct":"true","delta.checkpoint.writeStatsAsJson":"false"}}|
|2      |SET TBLPROPERTIES|{properties -> {"delta.checkpoint.writeStatsAsStruct":"true","delta.checkpoint.writeStatsAsJson":"false"}}|
|1      |SET TBLPROPERTIES|{properties -> {"delta.checkpoint.writeStatsAsStruct":"true","delta.checkpoint.writeStatsAsJson":"false"}}|
|0      |WRITE            |{mode -> Overwrite, partitionBy -> []}                                                                    |
+-------+-----------------+----------------------------------------------------------------------------------------------------------+
"""

This is not a perfect situation, but still not a roadblock. At the end of the day, you can write the script that checks current table properties and changes them only when needed.

Table Check Constraints

Similarly to table properties, you extend the framework to create NOT NULL constraints for each Primary Key column of the input dataset config, so the logic gets one more dynamically generated SQL command:

spark.sql(
    """
    ALTER TABLE delta.`/tmp/sample_delta_dataset`
    ADD CONSTRAINT id_is_not_null CHECK (id is not null)
    """
)

The first time it runs correctly, but the consecutive runs break the execution flow:

AnalysisException: Constraint 'id_is_not_null' already exists. Please delete the old constraint first.
Old constraint: id is not null

This time, things get nastier. The error is a blocker and must be fixed in a structured way.

An elephant in the room

This elephant is a generation of SQL DDL statements. It is not just a non-Pythonic way to deal with this problem. The embedded DDL statements are hard for abstraction, not always idempotent, and inconvenient for unit testing.

Ideally, the official Delta-Spark package must contain the logic to deal with such tasks. However, at this point, the existing official class DeltaTable does not have any Python abstraction for properties, constraints, table metadata, etc. So it still must be defined using SQL DDL format.

What about native NOT NULL column constraint

Some may ask why I define a table-level constraint to enforce NOT NUL for a particular column while the documentation says it is possible to do on the column level:

spark.sql(
    """
    ALTER TABLE delta.`/tmp/sample_delta_dataset`
    CHANGE COLUMN id SET NOT NULL;
    """
)

Even though such column-level Constraints are documented and must be supported, I failed to make them work in Databricks 11.3 and Spark 3.3.1. The command fails with the same error in both offerings:

AnalysisException: Cannot change nullable column to non-nullable: id; line 2 pos 4;
AlterColumn resolvedfieldname(StructField(id,IntegerType,true)), false

I hope this issue will be solved soon, but for the time being, check constraints are used instead.

The internal benchmarks show that the overhead of table constraint checks is relatively tiny in data manipulation operations; however, due to the mentioned issue, I cannot directly compare table and column-level constraints.

An alternative using Python

The limitations mentioned earlier and the vision of having a “desired state configuration” kind of experience brought me to the need to write a Python abstraction around the discussed subject. So, to abstract the configuration process of the Delta tables. That all resulted in the Python class DeltaTableConfig.

Table Properties

So, the first example. The framework knows that the table sample_delta_dataset is a streaming dataset, and it prescribes the following two standard options for such a kind table:

STREAMING_PROPERTIES = {
    "delta.checkpoint.writeStatsAsStruct": "true",
    "delta.checkpoint.writeStatsAsJson": "false",
}
dtc = DeltaTableConfig(table_path="/tmp/sample_delta_dataset")
dtc.properties = STREAMING_PROPERTIES

The first run enables the properties:

2023-05-21 16:03:35 INFO   Applying table properties on 'delta.`/tmp/sample_delta_dataset`':
2023-05-21 16:03:35 INFO   Checking if 'delta.checkpoint.writeStatsAsStruct = true' is set on delta.`/tmp/sample_delta_dataset`
2023-05-21 16:03:36 INFO   The property has been set
2023-05-21 16:03:36 INFO   Checking if 'delta.checkpoint.writeStatsAsJson = false' is set on delta.`/tmp/sample_delta_dataset`
2023-05-21 16:03:36 INFO   The property has been set

And the following runs detect that properties already exist so it has nothing to do:

2023-05-21 16:09:06 INFO   Applying table properties on 'delta.`/tmp/sample_delta_dataset`':
2023-05-21 16:09:06 INFO   Checking if 'delta.checkpoint.writeStatsAsStruct = true' is set on delta.`/tmp/sample_delta_dataset`
2023-05-21 16:09:06 INFO   The property already exists on the table
2023-05-21 16:09:06 INFO   Checking if 'delta.checkpoint.writeStatsAsJson = false' is set on delta.`/tmp/sample_delta_dataset`
2023-05-21 16:09:06 INFO   The property already exists on the table

The check of delta history confirms that the property-related messages appear in a single instance:

(
    spark.sql(
        "DESCRIBE HISTORY delta.`/tmp/sample_delta_dataset`;"
    ).select("version", "operation", "operationParameters")
    .show(truncate=False)
)

+-------+-----------------+--------------------------------------------------------------+
|version|operation        |operationParameters                                           |
+-------+-----------------+--------------------------------------------------------------+
|2      |SET TBLPROPERTIES|{properties -> {"delta.checkpoint.writeStatsAsJson":"false"}} |
|1      |SET TBLPROPERTIES|{properties -> {"delta.checkpoint.writeStatsAsStruct":"true"}}|
|0      |WRITE            |{mode -> Overwrite, partitionBy -> []}                        |
+-------+-----------------+--------------------------------------------------------------+

Imagine another scenario: the table type changed from “streaming” to “append-only.” The framework might have different options for that kind of table:

APPEND_ONLY_PROPERTIES = {
    "delta.appendOnly": "true",
    "delta.enableChangeDataFeed": "true",
}
dtc.properties = APPEND_ONLY_PROPERTIES

The DeltaTableConfig unsets previous properties and sets two new items:

2023-05-21 16:17:06 INFO   Applying table properties on 'delta.`/tmp/sample_delta_dataset`':
2023-05-21 16:17:06 INFO   Checking if 'delta.appendOnly = true' is set on delta.`/tmp/sample_delta_dataset`
2023-05-21 16:17:07 INFO   The property has been set
2023-05-21 16:17:07 INFO   Checking if 'delta.enableChangeDataFeed = true' is set on delta.`/tmp/sample_delta_dataset`
2023-05-21 16:17:07 INFO   The property has been set
2023-05-21 16:17:08 INFO   The property 'delta.checkpoint.writeStatsAsStruct = true' has been unset because it is not defined in the original dict
2023-05-21 16:17:08 INFO   The property 'delta.checkpoint.writeStatsAsJson = false' has been unset because it is not defined in the original dict

If the new properties must be added with keeping all existing ones, use the flag .keep_existing_properties = True. So, let’s re-add previously removed delta properties:

dtc.keep_existing_properties = True
dtc.properties = STREAMING_PROPERTIES

The attribute .properties acts as a getter and setter, so the existing values can be retrieved by getting the value of the attribute:

from pprint import pprint
pprint(dtc.properties)

{'delta.appendOnly': 'true',
 'delta.checkpoint.writeStatsAsJson': 'false',
 'delta.checkpoint.writeStatsAsStruct': 'true',
 'delta.enableChangeDataFeed': 'true'}

Under the hood, the helper class runs DeltaTable.detail()each time to get the current state.

Check Constraints

Similarly to the Table Properties, the data processing framework can pre-generate the input dictionary PK_CHECK_CONSTRAINTS with the constraints definitions to add. DeltaTableConfig then adds prescribed check constraints if they still need to be created.

dtc = DeltaTableConfig(table_path="/tmp/sample_delta_dataset")

PK_CHECK_CONSTRAINTS = {
    "id_is_not_null": "id is not null",
}
dtc.check_constraints = PK_CHECK_CONSTRAINTS

results to the output:

2023-05-21 16:30:00 INFO   Applying check constraints on 'delta.`/tmp/sample_delta_dataset`':
2023-05-21 16:30:00 INFO   Checking if constraint 'id_is_not_null' was already set on delta.`/tmp/sample_delta_dataset`
2023-05-21 16:30:01 INFO   The constraint id_is_not_null has been successfully added to 'delta.`/tmp/sample_delta_dataset`'

In case of new constraints are to be added instead of existing ones, provide the new dictionary to apply:

NEW_CHECK_CONSTRAINTS = {
    "address_zipcode_is_not_null": "address.zipcode is not null",
}
dtc.check_constraints = NEW_CHECK_CONSTRAINTS

As you might expect, the old constraint is removed, and a new one is added:

2023-05-21 16:32:00,180 INFO    Applying check constraints on 'delta.`/tmp/sample_delta_dataset`':
2023-05-21 16:32:01,085 INFO    The constraint 'id_is_not_null' has been dropped from the table 'delta.`/tmp/sample_delta_dataset`'
2023-05-21 16:32:01,085 INFO    Checking if constraint 'address_zipcode_is_not_null' was already set on delta.`/tmp/sample_delta_dataset`
2023-05-21 16:32:01,616 INFO    The constraint address_zipcode_is_not_null has been successfully added to 'delta.`/tmp/sample_delta_dataset`'

If existing check constraints must be preserved, use the optional argument .keep_existing_check_constraints=True

dtc.keep_existing_check_constraints = True
dtc.check_constraints = PK_CHECK_CONSTRAINTS

All existing constraints can be programmatically retrieved via getter .check_constraints:

print(dtc.check_constraints)

{
    'address_zipcode_is_not_null': 'address.zipcode is not null',
    'id_is_not_null': 'id is not null'
}

Delta Table Check Constraints in Action

Let’s then run a final smoke test to ensure that all those constraint efforts pay back:

spark.sql(
    """
    INSERT INTO delta.`/tmp/sample_delta_dataset`
    VALUES (1, 'John Doe', STRUCT('123 Main St', '10001'))
    """
)

So, the insert with non-null values works as expected.

The following command intentionally inserts a NULL in the nested attribute address.zipcode, which is protected by the recently added check constraint:

spark.sql(
    """
    INSERT INTO delta.`/tmp/sample_delta_dataset`
    VALUES (1, 'John Doe', STRUCT('123 Main St', NULL))
    """
)

"""
org.apache.spark.sql.delta.schema.DeltaInvariantViolationException:
CHECK constraint address_zipcode_is_not_null (address.zipcode IS NOT NULL) violated by row with values:
 - address.zipcode : null
"""

The code fails with a clear message that must eventually trigger the data team to fix the issue.

The source code

from delta import DeltaTable
from pyspark.sql import SparkSession
from typing import Union
import logging


def get_logger(name: str):
    """Returns an instance of the logger"""

    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)

    if not logger.handlers:
        handler = logging.StreamHandler()
        formatter = logging.Formatter("%(asctime)s %(levelname)-8s  %(message)s")
        handler.setFormatter(formatter)
        logger.addHandler(handler)

    return logger


logger = get_logger(__name__)


class DeltaTableConfig:
    """
    Manages Delta Table properties, constraints, etc.
    Attributes:
        keep_existing_properties (bool): Preserves existing table properties if they are not
                                                in the input value. Defaults to False
        keep_existing_check_constraints (bool): Preserves existing table constraints if they are not
                                                in the input value. Defaults to False
    """

    keep_existing_properties = False
    keep_existing_check_constraints = False

    def __init__(
        self,
        table_path: str = "",
        table_name: str = "",
        spark_session: SparkSession = None,
    ) -> None:
        """
        Args:
            table_path (str, optional): Path to delta table. For instance: /mnt/db1/table1
            table_name (str, optional): Delta table name. For instance: db1.table1
            spark_session: (SparkSession, optional)  The current spark context.
        Raises:
            ValueError: if values for both 'table_path' and 'table_name' provided
                        provide values to one of them
            ValueError: if values for neither 'table_path' nor 'table_name' provided
                        provide values to one of them
        Examples:
            >>> from spalah.dataset import DeltaTableConfig
            >>> dp = DeltaTableConfig(table_path="/path/dataset")
            >>> print(dp.properties)
            {'delta.deletedFileRetentionDuration': 'interval 15 days'}
        """

        self.spark_session = (
            SparkSession.getActiveSession() if not spark_session else spark_session
        )
        self.table_name = self.__get_table_identifier(
            table_path=table_path, table_name=table_name
        )
        self.original_table_name = table_name

    def __get_table_identifier(
        self,
        table_path: str = "",
        table_name: str = "",
    ) -> Union[str, None]:
        """Constructs table identifier from provided values."""

        if table_path and table_name:
            raise ValueError(
                "Both 'table_path' and 'table_name' provided. Use one of them."
            )

        if not table_path and not table_name:
            raise ValueError(
                "Neither 'table_path' nor 'table_name' provided. Use one of them."
            )

        if table_path:
            table_name = f"delta.`{table_path}`"
            _identifier = table_path
        else:
            _identifier = table_name

        if not DeltaTable.isDeltaTable(
            sparkSession=self.spark_session, identifier=_identifier
        ):
            logger.warning(f"{table_name} is not a Delta Table")
            return None

        return table_name

    @property
    def properties(self) -> Union[dict, None]:
        """Gets/sets dataset's delta table properties.
        Args:
            value (dict):  An input dictionary in the format: `{"property_name": "value"}`
        Examples:
            >>> from spalah.dataset import DeltaTableConfig
            >>> dp = DeltaTableConfig(table_path="/path/dataset")
            >>>
            >>> # get existing properties
            >>> print(dp.properties)
            {'delta.deletedFileRetentionDuration': 'interval 15 days'}
            >>>
            >>> # Adjust the property value from 15 to 30 days
            >>> dp.properties = {'delta.deletedFileRetentionDuration': 'interval 30 days'}
        """

        if self.table_name:
            existing_properties = (
                self.spark_session.sql(f"DESCRIBE DETAIL {self.table_name}")
                .select("properties")
                .collect()[0]
                .asDict()["properties"]
            )
        else:
            existing_properties = None

        return existing_properties

    @properties.setter
    def properties(
        self,
        value: dict,
    ) -> None:
        """Sets delta properties
        Args:
            value (dict):  An input dictionary in the format: `{"property_name": "value"}`
        """

        _existing_properties = self.properties
        _new_properties = value

        if self.table_name:
            logger.info(f"Applying table properties on '{self.table_name}':")

            for k, v in _new_properties.items():
                logger.info(f"Checking if '{k} = {v}' is set on {self.table_name}")

                if k in _existing_properties and _existing_properties[k] == str(v):
                    logger.info("The property already exists on the table")
                else:
                    _sql = (
                        f"ALTER TABLE {self.table_name} SET TBLPROPERTIES ({k} = '{v}')"
                    )
                    self.spark_session.sql(_sql)
                    logger.info("The property has been set")

            if not self.keep_existing_properties:
                for k, v in _existing_properties.items():
                    if k not in _new_properties:
                        _sql = (
                            f"ALTER TABLE {self.table_name} UNSET TBLPROPERTIES ({k})"
                        )
                        self.spark_session.sql(_sql)
                        logger.info(
                            f"The property '{k} = {v}' has been unset because it is not defined in "
                            "the original dict"
                        )

    @property
    def check_constraints(self) -> Union[dict, None]:
        """Gets/sets dataset's delta table check constraints.
        Args:
            value (dict):  An input dictionary in the format: `{"property_name": "value"}`
        Examples:
            >>> from spalah.dataset import DeltaTableConfig
            >>> dp = DeltaTableConfig(table_path="/path/dataset")
            >>>
            >>> # get existing constraints
            >>> print(dp.check_constraints)
            {}
            >>>
            >>> # Add a new check constraint
            >>> dp.check_constraints = {'id_is_not_null': 'id is not null'}
        """

        _constraints = {}

        if self.table_name:
            for k, v in self.properties.items():
                if k.startswith("delta.constraints."):
                    _new_key = k.replace("delta.constraints.", "")
                    _constraints[_new_key] = v
        else:
            _constraints = None

        return _constraints

    @check_constraints.setter
    def check_constraints(
        self,
        value: dict,
    ) -> None:
        """Dataset's delta table check constraints setter method.
        Args:
            value (dict): inptut dictionary in the format:
                            `{"constraint_name": "constraint definition"}`
        """

        if self.table_name:
            _existing_constraints = self.check_constraints
            _new_constraints = value

            logger.info(f"Applying check constraints on '{self.table_name}':")

            if not self.keep_existing_check_constraints:
                for k, v in _existing_constraints.items():
                    if k not in _new_constraints:
                        _sql = f"ALTER TABLE {self.table_name} DROP CONSTRAINT {k}"
                        self.spark_session.sql(_sql)
                        logger.info(
                            f"The constraint '{k}' has been dropped from the "
                            f"table '{self.table_name}'"
                        )

            for k, v in _new_constraints.items():
                _constraint_name = k.lower()
                logger.info(
                    f"Checking if constraint '{_constraint_name}' was "
                    f"already set on {self.table_name}"
                )

                if _constraint_name in _existing_constraints:
                    logger.info(
                        f"The constraint '{_constraint_name}' already exists on the table"
                    )
                else:
                    if v in _existing_constraints.values():
                        logger.warning(
                            f" The constraint definition '{v}' already present on the table"
                        )
                    else:
                        _sql = (
                            f"ALTER TABLE {self.table_name} ADD CONSTRAINT "
                            f"{_constraint_name} CHECK ({v})"
                        )

                        self.spark_session.sql(_sql)
                        logger.info(
                            f"The constraint {_constraint_name} has been successfully "
                            f"added to '{self.table_name}'"
                        )

Alternatively, the source code is documented, maintained, tested, and constantly updated as a GitHub-hosted open-source project. So the usage is a few lines of the code away:

# %pip install spalah

from spalah.dataset import DeltaProperty

dt = DeltaProperty(table_path='/tmp/sample_delta_dataset')
print(dt.properties)

Final Words

Although Delta is a self-sufficient and mature dataset format, sometimes it must be additionally configured, and specific table options, constraints, and metadata to be added.

While all that is possible via SQL DDL commands, it is just not handy to use at scale. Therefore, Python abstractions come into the picture with the hope that, eventually, something similar will be a part of the official Delta-Spark package.

Currently, DeltaTableConfig fills the missing functionality and helps manage delta properties and constraints in a Pythonic way and at scale.

Thank you for reading.