Skip to content

Commit 33cca1d

Browse files
authored
feat: Enable concurrent microbatch execution (#1326)
Resolves #914 ### Description Declares `MicrobatchConcurrency` adapter capability so dbt-core 1.9+ can execute microbatch incremental batches in parallel threads instead of sequentially. Per reviewer feedback from @sd-db, the capability is **gated behind the `use_concurrent_microbatch` behavior flag** (default: `false`). Users opt in via: ```yaml # dbt_project.yml flags: use_concurrent_microbatch: true ``` When the flag is disabled (default), `adapter.supports(MicrobatchConcurrency)` returns `False` and dbt-core falls back to sequential batch execution — identical to current behavior. ### Implementation 1. **`USE_CONCURRENT_MICROBATCH` behavior flag** — module-level `BehaviorFlag` with `default=False`, registered in `_behavior_flags` 2. **`supports()` instance method override** — intercepts `Capability.MicrobatchConcurrency` and gates on the flag; delegates all other capabilities to `super().supports()` 3. **`MicrobatchConcurrency` removed from `_capabilities` dict** — the `supports()` override is the sole gatekeeper (if the capability stayed in `_capabilities`, `super().supports()` would return `True` regardless of the flag) 4. **Unit tests** — disabled-by-default, enabled-with-flag, guard test (capability not declared as Full in `_capabilities`), delegation regression test ### Integration test findings (Databricks cluster, `batch_size='day'`, 31 batches, `--threads 4`) | Run | Flag | Batches OK | Notes | |-----|------|-----------|-------| | Sequential | `false` | 31/31 | Correct fallback, warning emitted | | Concurrent | `true` | 13-30/31 | `DELTA_CONCURRENT_APPEND` on non-partitioned tables | **Key finding: `REPLACE WHERE` predicates are non-overlapping** (each batch is exactly one `batch_size` wide, regardless of `lookback`), **but non-partitioned Delta tables still conflict** because Delta's `WriteSerializable` isolation cannot verify non-overlap at the file level — it conservatively rejects concurrent conditional overwrites to the same table root. A secondary error class (`DELTA_METADATA_CHANGED`) occurs when dbt-databricks applies `SET TBLPROPERTIES` (e.g., `autoCompact`) per batch, conflicting with concurrent writes. **Safe configurations for concurrent microbatch:** - Partition the target table by `event_time` at the same granularity as `batch_size` (allows Delta to verify non-overlap at the partition level) - Avoid per-batch `tblproperties` changes - `DATABRICKS_SKIP_OPTIMIZE=true` prevents post-write OPTIMIZE conflicts (but does not prevent the core `REPLACE WHERE` conflict) **`lookback` is NOT a factor** — it only controls which batches are generated (shifts the start of the batch list backwards), not the width of any individual batch's `REPLACE WHERE` predicate. **The `default=false` is the correct safe choice.** The default can be flipped to `true` around 1.12 once partitioning guidance and/or retry logic is established. **Prior art:** dbt-snowflake added the same capability in [dbt-snowflake#1259](dbt-labs/dbt-snowflake#1259). ### Checklist - [x] I have run this code in development and it appears to resolve the stated issue - [x] This PR includes tests, or tests are not required/relevant for this PR - [x] I have updated the `CHANGELOG.md` and added information about my change to the "dbt-databricks next" section. --------- Signed-off-by: Wyatt Jones <wyatt.jones6@cfacorp.com>
1 parent 24325a3 commit 33cca1d

3 files changed

Lines changed: 37 additions & 0 deletions

File tree

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
## dbt-databricks 1.11.6 (TBD)
2+
3+
### Features
4+
5+
- Enable concurrent microbatch execution via `MicrobatchConcurrency` capability,
6+
gated behind the `use_concurrent_microbatch` behavior flag (default: `false`).
7+
Opt in via `flags: {use_concurrent_microbatch: true}` in `dbt_project.yml`
8+
([#914](https://github.com/databricks/dbt-databricks/issues/914))
9+
110
## dbt-databricks 1.11.5 (Feb 19, 2026)
211

312
### Fixes

dbt/adapters/databricks/impl.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,17 @@
141141
),
142142
) # type: ignore[typeddict-item]
143143

144+
USE_CONCURRENT_MICROBATCH = BehaviorFlag(
145+
name="use_concurrent_microbatch",
146+
default=False,
147+
description=(
148+
"Enable concurrent execution of microbatch incremental batches. "
149+
"When enabled, dbt will run microbatch batches in parallel threads "
150+
"instead of sequentially. The microbatch strategy itself requires "
151+
"dbt-core 1.9+; this flag controls whether batches run concurrently."
152+
),
153+
) # type: ignore[typeddict-item]
154+
144155

145156
class DatabricksRelationInfo(NamedTuple):
146157
table_name: str
@@ -246,8 +257,15 @@ def _behavior_flags(self) -> list[BehaviorFlag]:
246257
USE_MATERIALIZATION_V2,
247258
USE_REPLACE_ON_FOR_INSERT_OVERWRITE,
248259
USE_MANAGED_ICEBERG,
260+
USE_CONCURRENT_MICROBATCH,
249261
]
250262

263+
def supports(self, capability: Capability) -> bool:
264+
# Gate MicrobatchConcurrency on the use_concurrent_microbatch behavior flag.
265+
if capability == Capability.MicrobatchConcurrency:
266+
return bool(self.behavior.use_concurrent_microbatch)
267+
return super().supports(capability)
268+
251269
def quote(self, identifier): # type: ignore[override,no-untyped-def]
252270
"""Override base adapter's quote method to prevent double quoting."""
253271
return quote(identifier)

tests/unit/test_adapter_capabilities.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from unittest.mock import Mock, patch
77

88
import pytest
9+
from dbt.adapters.capability import Capability
910
from dbt_common.exceptions import DbtConfigError
1011

1112
from dbt.adapters.databricks.dbr_capabilities import DBRCapabilities, DBRCapability
@@ -169,3 +170,12 @@ def test_insert_by_name_required_version_string(self):
169170
"""Test that the required version string is correct for INSERT_BY_NAME"""
170171
version_string = DBRCapabilities.get_required_version(DBRCapability.INSERT_BY_NAME)
171172
assert version_string == "DBR 12.2+"
173+
174+
def test_microbatch_concurrency_disabled_by_default(self, adapter):
175+
"""Test that MicrobatchConcurrency is disabled by default (behavior flag off)."""
176+
assert not adapter.supports(Capability.MicrobatchConcurrency)
177+
178+
def test_microbatch_concurrency_enabled_with_flag(self, adapter):
179+
"""Test that MicrobatchConcurrency is enabled when behavior flag is on."""
180+
adapter.behavior.use_concurrent_microbatch = True
181+
assert adapter.supports(Capability.MicrobatchConcurrency)

0 commit comments

Comments
 (0)