Skip to content

Commit abbfa59

Browse files
authored
fix: Do not set UniForm properties on managed iceberg (#1256)
<!-- Please review our pull request review process in CONTRIBUTING.md before your proceed. --> Resolves # <!--- Include the number of the issue addressed by this PR above if applicable. Example: resolves #1234 Please review our pull request review process in CONTRIBUTING.md before your proceed. --> ### Description There is a bug in `TblPropertiesProcessor` that is assuming that all tables with iceberg format need UniForm properties to be set. When managed iceberg behavior flag is enabled, this causes the materialization to fail The fix is to only apply these properties when working with a UniForm table. Unfortunately, we cannot use the `file_format` to distinguish because we do not actually require the user to set `file_format` to `parquet` for managed iceberg. We infer it in the adapter based on the `table_format` and behavior flag. To work around this fact, I am setting `model.config.extra["_databricks_use_managed_iceberg"]` to pass the information to the config processor ### Checklist - [ ] I have run this code in development and it appears to resolve the stated issue - [ ] This PR includes tests, or tests are not required/relevant for this PR - [ ] I have updated the `CHANGELOG.md` and added information about my change to the "dbt-databricks next" section.
1 parent 088710b commit abbfa59

7 files changed

Lines changed: 127 additions & 4 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
## dbt-databricks 1.11.1 (TBD)
22

3+
### Fixes
4+
5+
- Fix bug that was applying UniForm tblproperties on managed Iceberg tables causing materializations to fail
6+
37
### Under the hood
48

59
- Add validation for query tag value length and auto-escape special characters

dbt/adapters/databricks/global_state.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,15 @@ def get_connector_log_level(cls) -> str:
5050
"DBT_DATABRICKS_CONNECTOR_LOG_LEVEL", "WARN"
5151
).upper()
5252
return cls.__connector_log_level
53+
54+
__use_managed_iceberg: ClassVar[Optional[bool]] = None
55+
56+
@classmethod
57+
def set_use_managed_iceberg(cls, value: bool) -> None:
58+
"""Set the use_managed_iceberg flag for the session."""
59+
cls.__use_managed_iceberg = value
60+
61+
@classmethod
62+
def get_use_managed_iceberg(cls) -> Optional[bool]:
63+
"""Get the use_managed_iceberg flag for the session."""
64+
return cls.__use_managed_iceberg

dbt/adapters/databricks/impl.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,9 @@ def __init__(self, config: Any, mp_context: SpawnContext) -> None:
236236
self.add_catalog_integration(constants.DEFAULT_UNITY_CATALOG)
237237
self.add_catalog_integration(constants.DEFAULT_HIVE_METASTORE_CATALOG)
238238

239+
# Store the use_managed_iceberg flag in GlobalState for the session
240+
GlobalState.set_use_managed_iceberg(bool(self.behavior.use_managed_iceberg))
241+
239242
@property
240243
def _behavior_flags(self) -> list[BehaviorFlag]:
241244
return [
@@ -920,6 +923,7 @@ def get_relation_config(self, relation: DatabricksRelation) -> DatabricksRelatio
920923
@available.parse(lambda *a, **k: {})
921924
def get_config_from_model(self, model: RelationConfig) -> DatabricksRelationConfigBase:
922925
assert model.config, "Config was missing from relation"
926+
923927
if model.config.materialized == "materialized_view":
924928
return MaterializedViewAPI.get_from_relation_config(model)
925929
elif model.config.materialized == "streaming_table":

dbt/adapters/databricks/relation_configs/tblproperties.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from dbt.adapters.relation_configs.config_base import RelationResults
55
from dbt_common.exceptions import DbtRuntimeError
66

7+
from dbt.adapters.databricks import constants
8+
from dbt.adapters.databricks.global_state import GlobalState
79
from dbt.adapters.databricks.relation_configs import base
810
from dbt.adapters.databricks.relation_configs.base import (
911
DatabricksComponentConfig,
@@ -81,18 +83,22 @@ def from_relation_results(cls, results: RelationResults) -> TblPropertiesConfig:
8183
@classmethod
8284
def from_relation_config(cls, relation_config: RelationConfig) -> TblPropertiesConfig:
8385
tblproperties = base.get_config_value(relation_config, "tblproperties") or {}
84-
is_iceberg = base.get_config_value(relation_config, "table_format") == "iceberg"
8586

8687
if not isinstance(tblproperties, dict):
8788
raise DbtRuntimeError("tblproperties must be a dictionary")
8889

89-
# If the table format is Iceberg, we need to set the iceberg-specific tblproperties
90-
if is_iceberg:
90+
table_format = base.get_config_value(relation_config, "table_format")
91+
use_managed_iceberg = GlobalState.get_use_managed_iceberg()
92+
93+
is_uniform = table_format == constants.ICEBERG_TABLE_FORMAT and use_managed_iceberg is False
94+
95+
if is_uniform:
9196
tblproperties.update(
9297
{
9398
"delta.enableIcebergCompatV2": "true",
94-
"delta.universalFormat.enabledFormats": "iceberg",
99+
"delta.universalFormat.enabledFormats": constants.ICEBERG_TABLE_FORMAT,
95100
}
96101
)
102+
97103
tblproperties = {str(k): str(v) for k, v in tblproperties.items()}
98104
return TblPropertiesConfig(tblproperties=tblproperties)

tests/functional/adapter/iceberg/fixtures.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,29 @@
6969
}}
7070
select 1 as id
7171
"""
72+
73+
incremental_iceberg_base = """
74+
{{
75+
config(
76+
materialized = "incremental",
77+
table_format = "iceberg",
78+
incremental_strategy = "merge",
79+
unique_key = "id",
80+
)
81+
}}
82+
select 1 as id, 'initial' as status
83+
"""
84+
85+
incremental_iceberg_update = """
86+
{{
87+
config(
88+
materialized = "incremental",
89+
table_format = "iceberg",
90+
incremental_strategy = "merge",
91+
unique_key = "id",
92+
)
93+
}}
94+
select 1 as id, 'updated' as status
95+
union all
96+
select 2 as id, 'new' as status
97+
"""

tests/functional/adapter/iceberg/test_iceberg_support.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,33 @@ class TestManagedIcebergTables(TestIcebergTables, ManagedIcebergMixin):
6565

6666
class TestManagedIcebergSwap(TestIcebergSwap, ManagedIcebergMixin):
6767
pass
68+
69+
70+
@pytest.mark.skip_profile("databricks_cluster")
71+
class TestIcebergIncrementalMerge(ManagedIcebergMixin):
72+
@pytest.fixture(scope="class")
73+
def models(self):
74+
return {"iceberg_incremental.sql": fixtures.incremental_iceberg_base}
75+
76+
def test_iceberg_incremental_merge(self, project):
77+
results = util.run_dbt()
78+
assert len(results) == 1
79+
80+
result = project.run_sql("select * from iceberg_incremental order by id", fetch="all")
81+
assert len(result) == 1
82+
assert result[0][0] == 1 # id
83+
assert result[0][1] == "initial" # status
84+
85+
util.write_file(fixtures.incremental_iceberg_update, "models", "iceberg_incremental.sql")
86+
87+
# Second run should merge successfully without UniForm property errors
88+
results = util.run_dbt()
89+
assert len(results) == 1
90+
91+
# Verify merged data: id=1 should be updated, id=2 should be new
92+
result = project.run_sql("select * from iceberg_incremental order by id", fetch="all")
93+
assert len(result) == 2
94+
assert result[0][0] == 1
95+
assert result[0][1] == "updated" # Updated via merge
96+
assert result[1][0] == 2
97+
assert result[1][1] == "new" # New row

tests/unit/relation_configs/test_tblproperties.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import pytest
44
from dbt.exceptions import DbtRuntimeError
55

6+
from dbt.adapters.databricks import constants
7+
from dbt.adapters.databricks.global_state import GlobalState
68
from dbt.adapters.databricks.relation_configs.tblproperties import (
79
TblPropertiesConfig,
810
TblPropertiesProcessor,
@@ -56,3 +58,42 @@ def test_from_model_node__with_incorrect_tblproperties(self):
5658
match="tblproperties must be a dictionary",
5759
):
5860
_ = TblPropertiesProcessor.from_relation_config(model)
61+
62+
def test_from_model_node__with_uniform_iceberg_adds_properties(self):
63+
GlobalState.set_use_managed_iceberg(False)
64+
model = Mock()
65+
model.config.extra = {
66+
"table_format": constants.ICEBERG_TABLE_FORMAT,
67+
"tblproperties": {"custom_prop": "value"},
68+
}
69+
spec = TblPropertiesProcessor.from_relation_config(model)
70+
# Should have both custom property AND UniForm properties
71+
assert spec == TblPropertiesConfig(
72+
tblproperties={
73+
"custom_prop": "value",
74+
"delta.enableIcebergCompatV2": "true",
75+
"delta.universalFormat.enabledFormats": constants.ICEBERG_TABLE_FORMAT,
76+
}
77+
)
78+
79+
def test_from_model_node__with_managed_iceberg_no_uniform_properties(self):
80+
GlobalState.set_use_managed_iceberg(True)
81+
model = Mock()
82+
model.config.extra = {
83+
"table_format": constants.ICEBERG_TABLE_FORMAT,
84+
"tblproperties": {"custom_prop": "value"},
85+
}
86+
spec = TblPropertiesProcessor.from_relation_config(model)
87+
# Should only have the custom property, NOT the UniForm properties
88+
assert spec == TblPropertiesConfig(tblproperties={"custom_prop": "value"})
89+
90+
def test_from_model_node__with_iceberg_no_flag_no_properties(self):
91+
GlobalState.set_use_managed_iceberg(None)
92+
model = Mock()
93+
model.config.extra = {
94+
"table_format": constants.ICEBERG_TABLE_FORMAT,
95+
"tblproperties": {},
96+
}
97+
spec = TblPropertiesProcessor.from_relation_config(model)
98+
# Should not have UniForm properties without explicit use_managed_iceberg=False
99+
assert spec == TblPropertiesConfig(tblproperties={})

0 commit comments

Comments
 (0)