|
4 | 4 | from dbt.adapters.base.relation import BaseRelation |
5 | 5 | from dbt.tests import util |
6 | 6 | from dbt.tests.adapter.materialized_view.files import MY_SEED, MY_TABLE, MY_VIEW |
| 7 | +from dbt_common.contracts.config.materialization import OnConfigurationChangeOption |
7 | 8 |
|
8 | 9 | from dbt.adapters.databricks.relation import DatabricksRelationType |
| 10 | +from dbt.adapters.databricks.relation_configs.streaming_table import StreamingTableConfig |
9 | 11 | from tests.functional.adapter.streaming_tables import fixtures |
10 | 12 |
|
11 | 13 |
|
@@ -300,6 +302,55 @@ def test_create_with_liquid_clustering_config(self, project): |
300 | 302 | assert relation_type == "streaming_table" |
301 | 303 |
|
302 | 304 |
|
| 305 | +@pytest.mark.dlt |
| 306 | +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") |
| 307 | +class TestStreamingTableLiquidClusteringChanges: |
| 308 | + """Test liquid clustering changes on existing streaming tables (#1329).""" |
| 309 | + |
| 310 | + @pytest.fixture(scope="class") |
| 311 | + def seeds(self): |
| 312 | + return {"my_seed.csv": MY_SEED} |
| 313 | + |
| 314 | + @pytest.fixture(scope="class") |
| 315 | + def models(self): |
| 316 | + yield { |
| 317 | + "liquid_clustered_st.sql": fixtures.liquid_clustered_st, |
| 318 | + "schema.yml": fixtures.liquid_clustered_st_schema_v1, |
| 319 | + } |
| 320 | + |
| 321 | + @pytest.fixture(scope="class") |
| 322 | + def project_config_update(self): |
| 323 | + return {"models": {"on_configuration_change": OnConfigurationChangeOption.Apply.value}} |
| 324 | + |
| 325 | + @pytest.fixture(scope="class") |
| 326 | + def liquid_clustered_st(self, project) -> BaseRelation: |
| 327 | + return project.adapter.Relation.create( |
| 328 | + identifier="liquid_clustered_st", |
| 329 | + schema=project.test_schema, |
| 330 | + database=project.database, |
| 331 | + type=DatabricksRelationType.StreamingTable, |
| 332 | + ) |
| 333 | + |
| 334 | + @pytest.fixture(scope="class", autouse=True) |
| 335 | + def setup(self, project, liquid_clustered_st): |
| 336 | + util.run_dbt(["seed"]) |
| 337 | + util.run_dbt(["run", "--models", liquid_clustered_st.identifier, "--full-refresh"]) |
| 338 | + |
| 339 | + yield |
| 340 | + |
| 341 | + project.run_sql(f"drop schema if exists {project.test_schema} cascade") |
| 342 | + |
| 343 | + def test_liquid_clustering_change_is_applied(self, project, liquid_clustered_st): |
| 344 | + """Changing liquid_clustered_by on an existing ST should apply via ALTER.""" |
| 345 | + util.write_file(fixtures.liquid_clustered_st_schema_v2, "models", "schema.yml") |
| 346 | + util.run_dbt(["run", "--models", liquid_clustered_st.identifier]) |
| 347 | + |
| 348 | + with util.get_connection(project.adapter): |
| 349 | + config = project.adapter.get_relation_config(liquid_clustered_st) |
| 350 | + assert isinstance(config, StreamingTableConfig) |
| 351 | + assert config.config["liquid_clustering"].cluster_by == ["id", "value"] |
| 352 | + |
| 353 | + |
303 | 354 | @pytest.mark.dlt |
304 | 355 | @pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") |
305 | 356 | class TestStreamingTablesFromFiles(TestStreamingTablesMixin): |
|
0 commit comments