Skip to content

Commit 5927a32

Browse files
aarushisingh04tejassp-dbsd-db
authored
fix: convert workflow job spec to SDK dataclasses before jobs.create() (#1338) (#1364)
resolves #1360 ### description `WorkflowJobApi.create()` passes plain dicts to `workspace_client.jobs.create(**converted_job_spec)`. the databricks sdk's `jobs.create()` iterates tasks calling `v.as_dict()` on each (confirmed via `JobsAPI.create` source inspection). plain dicts have no `.as_dict()` which caused the `'dict' object has no attribute 'as_dict'` error. 1. the fix uses `JobSettings.from_dict()` + `.as_shallow_dict()` to convert the plain dict job spec into proper sdk dataclasses before calling `jobs.create()`. 2. `as_shallow_dict()` is the correct choice here : it unpacks top-level fields as kwargs (matching the `jobs.create()` signature) while keeping nested values as typed `Task` / `NotebookTask` / etc. dataclasses that satisfy `.as_dict()`. 3. this is the same pattern already used by `update_job_settings()` in the same class. **changes:** - `dbt/adapters/databricks/api_client.py` - `WorkflowJobApi.create()` now deserializes the dict via `JobSettings.from_dict()` and passes `.as_shallow_dict()` to the sdk. inline comment documents the sdk internals that make this the correct choice. - `tests/unit/api_client/test_workflow_job_api.py` - tests strengthened to assert `isinstance(task, Task)`, call `.as_dict()` on each task and verify output (directly proving the original `AttributeError` cannot recur), and add a new `test_create__invalid_job_spec_raises` covering the `JobSettings.from_dict()` error path. ### checklist - [x] i have run this code in development and it appears to resolve the stated issue - [x] this pr includes tests, or tests are not required/relevant for this pr - [x] i have updated the `CHANGELOG.md` and added information about my change to the "dbt-databricks next" section. --------- Signed-off-by: aarushisingh04 <aarushi07.singh@gmail.com> Co-authored-by: tejassp-db <241722411+tejassp-db@users.noreply.github.com> Co-authored-by: Shubham Dhal <shubham.dhal@databricks.com>
1 parent e2a85e6 commit 5927a32

File tree

3 files changed

+45
-17
lines changed

3 files changed

+45
-17
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
-
77
### Fixes
88

9+
- Fix `workflow_job` Python model submission method failing with dictionary attribute error ([#1360](https://github.com/databricks/dbt-databricks/issues/1360))
910
- Fix column order mismatch in microbatch and replace_where incremental strategies by using INSERT BY NAME syntax ([#1338](https://github.com/databricks/dbt-databricks/issues/1338))
1011

1112
## dbt-databricks 1.11.6 (Mar 10, 2026)

dbt/adapters/databricks/api_client.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -801,7 +801,10 @@ def create(self, job_spec: dict[str, Any]) -> str:
801801
try:
802802
# Convert job_spec to be compatible with jobs.create
803803
converted_job_spec = self._convert_job_spec_for_create(job_spec)
804-
create_response = self.workspace_client.jobs.create(**converted_job_spec)
804+
# Convert plain dicts to SDK dataclasses via JobSettings to avoid
805+
# AttributeError when the SDK iterates and calls .as_dict() on tasks.
806+
job_settings = JobSettings.from_dict(converted_job_spec)
807+
create_response = self.workspace_client.jobs.create(**job_settings.as_shallow_dict())
805808
job_id = str(create_response.job_id)
806809
logger.info(f"New workflow created with job id {job_id}")
807810
return job_id

tests/unit/api_client/test_workflow_job_api.py

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
from unittest.mock import Mock
1+
from unittest.mock import Mock, patch
22

33
import pytest
4-
from databricks.sdk.service.jobs import BaseJob, CreateResponse, JobSettings, QueueSettings
4+
from databricks.sdk.service.jobs import BaseJob, CreateResponse, JobSettings, QueueSettings, Task
55
from dbt_common.exceptions import DbtRuntimeError
66

77
from dbt.adapters.databricks.api_client import WorkflowJobApi
@@ -59,7 +59,10 @@ def test_create__success(self, api, workspace_client):
5959
result = api.create(job_spec)
6060

6161
assert result == "456"
62-
workspace_client.jobs.create.assert_called_once_with(**job_spec)
62+
workspace_client.jobs.create.assert_called_once()
63+
call_kwargs = workspace_client.jobs.create.call_args[1]
64+
assert call_kwargs["name"] == "test_job"
65+
assert call_kwargs.get("tasks") in (None, [])
6366

6467
def test_create__job_spec_conversion(self, api, workspace_client):
6568
mock_create_response = Mock(spec=CreateResponse)
@@ -89,21 +92,42 @@ def test_create__job_spec_conversion(self, api, workspace_client):
8992
assert result == "789"
9093
workspace_client.jobs.create.assert_called_once()
9194

92-
# Verify the call was made with converted parameters
95+
# Tasks are converted to SDK Task dataclasses via JobSettings.from_dict().as_shallow_dict().
96+
# The SDK's jobs.create() calls v.as_dict() on each task internally, so
97+
# we verify: (a) they are Task instances, (b) attributes are correct,
98+
# (c) .as_dict() works (proving the original AttributeError is fixed).
9399
call_kwargs = workspace_client.jobs.create.call_args[1]
94100
assert call_kwargs["name"] == "test_job"
95-
assert len(call_kwargs["tasks"]) == 2
96-
97-
# Check first task conversion
98-
task1 = call_kwargs["tasks"][0]
99-
assert task1["task_key"] == "task1"
100-
assert "cluster_id" not in task1 # Should be removed
101-
assert task1["existing_cluster_id"] == "test-cluster-id" # Should be converted
102-
103-
# Check second task remains unchanged
104-
task2 = call_kwargs["tasks"][1]
105-
assert task2["task_key"] == "task2"
106-
assert task2["existing_cluster_id"] == "already-correct"
101+
tasks = call_kwargs["tasks"]
102+
assert len(tasks) == 2
103+
104+
task1 = tasks[0]
105+
assert isinstance(task1, Task)
106+
assert task1.task_key == "task1"
107+
assert task1.existing_cluster_id == "test-cluster-id"
108+
# Verify .as_dict() works — this was the root cause of the bug
109+
task1_dict = task1.as_dict()
110+
assert task1_dict["task_key"] == "task1"
111+
assert task1_dict["existing_cluster_id"] == "test-cluster-id"
112+
113+
task2 = tasks[1]
114+
assert isinstance(task2, Task)
115+
assert task2.task_key == "task2"
116+
assert task2.existing_cluster_id == "already-correct"
117+
assert task2.as_dict()["existing_cluster_id"] == "already-correct"
118+
119+
def test_create__invalid_job_spec_raises(self, api, workspace_client):
120+
"""If JobSettings.from_dict() raises (e.g. on malformed input), create()
121+
wraps it in a DbtRuntimeError via the existing except block."""
122+
with patch(
123+
"dbt.adapters.databricks.api_client.JobSettings.from_dict",
124+
side_effect=Exception("malformed spec"),
125+
):
126+
with pytest.raises(DbtRuntimeError) as exc_info:
127+
api.create({"name": "bad_job", "tasks": ["not-a-dict"]})
128+
129+
assert "Error creating Workflow" in str(exc_info.value)
130+
workspace_client.jobs.create.assert_not_called()
107131

108132
def test_update_job_settings__exception(self, api, workspace_client):
109133
workspace_client.jobs.reset.side_effect = Exception("API Error")

0 commit comments

Comments
 (0)