spalah
PySpark helpers for everyday DataFrame surgery
pip install spalah
spalah — PySpark DataFrame & Delta Lake Helpers
spalah is an open-source Python library that simplifies working with complex PySpark DataFrames, nested schemas, and Delta Lake tables. The name comes from the Ukrainian word for "spark" 🇺🇦
- PyPI: pip install spalah
- Source: github.com/avolok/spalah
- Docs: avolok.github.io/spalah
- License: MIT | Python: 3.11+
Why spalah?
Modern data lakehouses ingest semi-structured JSON events that end up as deeply nested Spark schemas — arrays of structs, structs inside arrays, multiple levels of nesting. Standard PySpark doesn't give you a clean way to:
- prune a specific nested field without rebuilding the whole schema manually,
- compare two schemas and get a human-readable diff,
- flatten a hierarchy into dot-notation paths for audit or documentation,
- reproduce a small test DataFrame from a single executable script.
spalah fills these gaps with four focused utilities.
Installation
pip install spalah
spalah.dataframe
slice_dataframe — include or exclude nested columns
Selectively keep or drop columns — including deeply nested struct fields and fields inside arrays — without touching the rest of the schema.
from spalah.dataframe import slice_dataframe# Keep only two root columns
df_sliced = slice_dataframe(
input_dataframe=df,
columns_to_include=["column_a", "column_c"],
)
# Drop one deeply nested field while keeping everything else
df_sliced = slice_dataframe(
input_dataframe=df,
columns_to_exclude=["column_c.column_c_2.c_2_1"],
)
# Mix include and exclude — keep column_b and column_c but drop a sub-field
df_sliced = slice_dataframe(
input_dataframe=df,
columns_to_include=["column_b", "column_c"],
columns_to_exclude=["column_c.column_c_2.c_2_1"],
)
Key behaviours
- Column matching is case-insensitive (["Column_A"] matches column_a).
- Supports arbitrarily deep nesting: structs, arrays of structs, arrays of arrays of structs.
- nullify_only=True sets excluded fields to null instead of dropping them — useful when downstream code expects a fixed schema.
- Raises clear exceptions for bad inputs: passing a string instead of a list, non-string values in the list, or a columns_to_include / columns_to_exclude pair that cancels out completely.
flatten_schema — convert a nested schema to dot-notation paths
Returns every leaf field as a flat list, using dot notation for nested paths. Optionally includes the Spark data type.
from spalah.dataframe import flatten_schema
# Paths only
flatten_schema(df)
# ["column_a", "column_b", "column_c.column_c_1",
# "column_c.column_c_2.c_2_1", "column_c.column_c_2.c_2_2",
# "column_c.column_c_2.c_2_3"]
# Paths + data types
flatten_schema(df, include_datatype=True)
# [("column_a", "IntegerType()"), ("column_b", "DecimalType(2,1)"),
# ("column_c.column_c_1", "StringType()"), ...]
# Flat DataFrame
flatten_schema(flat_df)
# ["a", "b", "c", "d", "e"]
flatten_schema(flat_df, include_datatype=True)
# [("a", "LongType()"), ("b", "DoubleType()"), ("c", "StringType()"),
# ("d", "DateType()"), ("e", "TimestampType()")]
Useful for schema documentation, data dictionaries, and building column mappings between systems.
script_dataframe — generate a self-contained DataFrame script
Serialises a small DataFrame (up to 20 rows) into a standalone Python script that recreates the exact data and schema. Handy for writing unit tests or sharing reproducible examples.
from spalah.dataframe import script_dataframe
# df contains: [Row(a=1, b=2.0), Row(a=2, b=3.0)]
script = script_dataframe(df.select("a", "b"))
print(script)
# from pyspark.sql import SparkSession
# from pyspark.sql.types import *
# spark = SparkSession.builder.getOrCreate()
# schema = StructType([StructField("a", LongType(), True), ...])
# data = [(1, 2.0), (2, 3.0)]
# outcome_dataframe = spark.createDataFrame(data, schema)
# Execute the generated script to get the DataFrame back
exec(script)
assert outcome_dataframe.collect() == [Row(a=1, b=2.0), Row(a=2, b=3.0)]
Raises ValueError if the DataFrame exceeds 20 rows — the function is intentionally scoped to test fixtures, not bulk exports.
SchemaComparer — diff two schemas
Compares a source schema against a target schema and categorises every column as matched or not matched, with a human-readable reason for each mismatch.
from spalah.dataframe import SchemaComparer
comparer = SchemaComparer(
source_schema=df_source.schema,
target_schema=df_target.schema,
)
comparer.compare()
# Inspect results
print(comparer.matched) # columns present and compatible in both
print(comparer.not_matched) # columns with differences
Mismatch categories detected
SituationExampleColumn only in source"a" present in source, missing from targetColumn only in target"z" present in target, missing from sourceCase mismatchsource has "c", target has "C"Type conflictsource "b" is DoubleType, target "b" is StringType
Passing a DataFrame (instead of .schema) or a plain string raises a descriptive exception, guiding you to fix the call.
spalah.dataset
DeltaTableConfig — manage Delta Lake table properties
Reads and writes Delta table properties and check constraints programmatically, identified by either a file path or a Hive metastore table name.
from spalah.dataset import DeltaTableConfig
# Identify by path
config = DeltaTableConfig(table_path="/mnt/lake/my_table", spark_session=spark)
# Identify by Hive name
config = DeltaTableConfig(table_name="my_database.my_table", spark_session=spark)
# Read current properties
config.properties
# {"delta.deletedFileRetentionDuration": "interval 15 days"}
# Set properties — merge with existing by default
config.set_properties(
properties={"delta.logRetentionDuration": "interval 10 days",
"delta.deletedFileRetentionDuration": "interval 16 days"}
)
# Overwrite all properties (remove existing)
config.set_properties(
properties={"delta.logRetentionDuration": "interval 7 days"},
keep_existing_properties=False,
)
# Read check constraints
config.constraints
# {"id_is_not_null": "id is not null"}
# Add a constraint while keeping existing ones
config.set_constraints(
constraints={"name_is_not_null": "name is not null"},
keep_existing_check_constraints=True,
)
# Inspect table structure
config.columns # {"id": "bigint", "name": "string", "value": "string"}
config.partition_columns # ["id"]
config.clustering_columns # []
# Full table details in one call
config.table_details
# {
# "columns": {...},
# "properties": {...},
# "constraints": {...},
# "partition_columns": [...],
# "clustering_columns": [...],
# }
Raises ValueError if both table_path and table_name are provided, or if neither is provided.