LongRunningAgentServer: durable resume via heartbeat + CAS claim#416
Open
LongRunningAgentServer: durable resume via heartbeat + CAS claim#416
Conversation
Mid-stream pod crashes currently mark in-progress runs as failed once the
task_timeout elapses — any in-flight LLM/tool work is lost. This change adds
best-effort crash recovery: another pod can atomically claim a stale run and
re-invoke the registered handler, and the agent SDK's own checkpointer
(LangGraph AsyncCheckpointSaver, databricks-openai Session) picks up prior
progress so completed tool calls are not re-executed.
Schema additions (idempotent ADD COLUMN IF NOT EXISTS):
- responses.owner_pod_id, heartbeat_at, attempt_number, original_request
- messages.attempt_number
- idx_responses_stale partial index
Server changes:
- _handle_background_request anchors context.conversation_id to response_id
when the client didn't pin a thread_id/session_id. Both agent templates'
helpers read context.conversation_id as priority-2 fallback, so a replay
from a different pod resolves to the same agent-SDK thread/session without
any template code change.
- original_request is persisted on create_response so another pod can
re-invoke with the same arguments.
- _heartbeat async context writes heartbeat_at every heartbeat_interval_seconds
(default 3s) while the agent loop runs, stops cleanly on exit.
- _handle_retrieve_request invokes _try_claim_and_resume before returning:
if the run is in_progress and heartbeat is older than
heartbeat_stale_threshold_seconds (default 15s), CAS the row, increment
attempt_number, emit a response.resumed sentinel event at the next seq,
and spawn _run_background_stream(attempt_number=N+1) with input=[].
- Sequence numbers stay monotonic across attempts so client cursors
(GET /responses/{id}?stream=true&starting_after=N) resume correctly.
- _handle_retrieve_request's non-stream path filters to output_item.done
events (the authoritative conversation record).
New settings:
- heartbeat_interval_seconds: 3.0
- heartbeat_stale_threshold_seconds: 15.0
- validated: stale > interval
Out of scope (follow-up PRs): cancellation implementation (still 501),
eager multi-pod polling for stale work (v1 uses lazy reclaim on retrieve).
Tests:
- 11 new unit tests covering _inject_conversation_id priority rules,
_try_claim_and_resume scenarios (grace period, missing original_request,
failed claim, successful claim + sentinel emission, empty-input replay),
_heartbeat context lifecycle, and settings validation.
- Existing tests migrated to the new ResponseInfo (8-field) and
get_messages 4-tuple shapes via _resp_info()/_msg() helpers.
init_db now runs idempotent ADD COLUMN IF NOT EXISTS for the durability columns after the initial create_all. The existing assert_awaited_once is obsolete — assert the full SQL corpus instead.
…uest _handle_invocations_request returns a ResponsesAgentRequest pydantic model from the validator, not a dict. _inject_conversation_id was calling .get() on it — runtime AttributeError on the first POST with background=true. Fix: dump to dict, inject, and round-trip back through the validator so the downstream handler still sees the declared pydantic type. Same round-trip applied to the resume path in _try_claim_and_resume.
ADD COLUMN IF NOT EXISTS is a no-op when the column already exists, but PG still enforces table-ownership before running the ALTER. Multiple pods running different SPs against the same agent_server schema — or a developer connecting as themselves to an already-migrated dogfood instance — hit 'must be owner of table responses' during init_db and fail to start. Split migrations into one-statement-per-transaction and swallow InsufficientPrivilege with an info log. If the column is missing and we really can't grant it, the next query that uses the column will fail loudly — better than refusing to boot the app on a shared DB.
Clients and tests need a way to see that a response was resumed across pods (attempt_number > 1) without grepping server logs. Adds the field to every shape of the non-stream retrieve return body.
Integration tests for the durable-resume path want to simulate a pod crash
without actually restarting the pod (costly, disruptive, affects other
traffic). Add POST /_debug/kill_task/{response_id} that cancels the
in-flight asyncio task owning that response. CancelledError propagates past
_task_scope's Exception handlers (it's BaseException in py3.8+), so the DB
row stays in_progress with a going-stale heartbeat — exactly the shape a
real crash leaves. A client GET then triggers lazy reclaim and resume.
Gated behind env var LONG_RUNNING_ENABLE_DEBUG_KILL=1 so the endpoint is
never exposed in production. Tracks tasks in a per-server dict cleared via
done-callback; purely an observation affordance, durability still hinges
on DB state.
- ruff format: line-length normalization in models.py / db.py / server.py / test_long_running_server.py (no behavior change). - ruff F401: drop unused 'import asyncio as _a' inside TestTryClaimAndResume. - ty call-non-callable: replace hasattr + attribute access with getattr + callable check. request_data is typed as dict but is a pydantic model at runtime (returned by validate_and_convert_request); narrowing via getattr(...) keeps ty happy and preserves the fallback for tests that pass dicts directly.
…apps logs without --log-level debug
response_id lived only inside the nested 'response' object of the response.created event, so proxies and clients had to know that specific shape to extract it. Add it as a top-level field on every frame alongside sequence_number — discoverable without schema knowledge.
A stream opened BEFORE the owning pod died would poll the DB forever waiting for events that never come: _try_claim_and_resume only fires on fresh GET retrieve, not inside the existing poll loop. When the kill hit during a live stream, the UI just hung. Now every poll iteration also tries to reclaim — a no-op for fresh runs, a rescue path for dead ones.
Recovers crashed conversations faster — ~3 heartbeat intervals tolerated before reclaim instead of 5. Still plenty of margin over the 3s heartbeat interval to avoid false positives.
Every transition now shows up in apps logs prefixed with [durable]: - heartbeat start / beat#N sampled / stop with total - stale heartbeat detected with age + threshold - attempting claim / claim succeeded / claim lost - claim skipped with reason (grace_period, heartbeat_fresh, no_original_request) - background stream start / completed with totals - kill endpoint: cancelling task Fresh-heartbeat skips stay at DEBUG to avoid spamming every poll tick. Periodic heartbeat sampled every 5 beats (~15s at default interval).
Open
4 tasks
AsyncDatabricksSession.repair(): Dedupes function_call/function_call_output items by call_id and injects synthetic function_call_outputs for orphans left behind by a mid-tool kill. Returns the count injected. No-op when clean. build_tool_resume_repair(messages) in databricks_langchain: Pure helper that returns synthetic ToolMessages for AIMessage.tool_calls in the trailing assistant turn whose paired ToolMessage never landed. Append via the add_messages reducer to satisfy Anthropic's tool_use ⇄ tool_result contract on resume. Both were previously in-template workarounds in agent-openai-advanced and agent-langgraph-advanced; moving them into the library so any user of LongRunningAgentServer + AsyncDatabricksSession or AsyncCheckpointSaver gets correct mid-tool crash-resume behavior without template-side fixes. Co-authored-by: Isaac
- databricks_langchain.build_tool_resume_repair_pre_model_hook(): returns a LangGraph pre_model_hook that wraps build_tool_resume_repair. Lets templates wire durable-resume recovery via one create_agent arg instead of manual aget_state/aupdate_state surgery in the handler (which also required as_node="tools" to avoid a KeyError: 'model' in the branch re-evaluation). Fires on every model turn; no-op when state is clean. - db.py: promote "skipped migration due to insufficient privilege" from INFO to a single WARN summary at startup. If the DB was previously migrated by a different service principal this is expected, but if our SP genuinely lacks ALTER on a fresh table the claim/heartbeat path will fail later with "column does not exist" — surface the risk clearly. Co-authored-by: Isaac
langchain.agents.create_agent in 1.x uses AgentMiddleware (middleware=[...]) instead of the older pre_model_hook arg. Rename the helper and return an AgentMiddleware instance whose before_model / abefore_model methods run build_tool_resume_repair. Zero behavior change; matches the public API of the current langchain release. Co-authored-by: Isaac
Move both durability concerns out of user-space and into the server. The
LangGraph middleware and OpenAI session.repair() become optional — templates
no longer need to install them.
Two coordinated changes in server.py:
1. Rotate context.conversation_id on every resume attempt. _try_claim_and_resume
now deep-copies original_request, replays input verbatim (instead of
blanking to []), drops custom_inputs.thread_id/session_id, and sets
context.conversation_id = f"{base}::attempt-{N}" so the handler's SDK
helpers resolve to a fresh thread/session. Base anchors on whatever the
user actually pinned (thread_id/session_id/conversation_id/response_id);
rotation always re-anchors on original_request — no stacking across
attempts. Trade-off: attempt N>1 re-runs the LLM on the pre-crash input
instead of picking up mid-turn state; strictly safer than inheriting
mid-turn checkpoint state that the SDK can't fully repair (notably the
LangGraph stream-event attempt-boundary orphan artifact seen in the
stress test).
2. Full-history _sanitize_request_input runs before the handler sees the
request (both initial POST and resume replay). Drops duplicate call_ids,
drops orphan function_call_output items, injects synthetic outputs for
function_calls with no output. Walks the whole list — neither the
LangGraph middleware (trailing-only) nor session.repair() (session-only)
cover mid-history orphans that come in via UI echo. Gated by
auto_sanitize_input (default True).
14 new unit tests cover the sanitizer (paired/orphan/duplicate/mid-history/
chat-completions shape), rotation (all four fallback priorities), and the
resume-replays-input-not-empty flow.
Co-authored-by: Isaac
AsyncCheckpointSaver.aget_tuple / CheckpointSaver.get_tuple now run build_tool_resume_repair on returned state before handing it to the graph. With the LongRunningAgentServer rotate-on-resume + input-sanitizer landing in the same PR, this closes the last gap: a stable-thread_id client that relies on the checkpointer as cross-turn truth still survives a mid-tool crash, without the caller installing build_tool_resume_repair_middleware. Scenario the unit tests capture: turn 2 crashes mid-tool → checkpoint retains an orphan AIMessage.tool_calls on the stable thread → turn 3 loads that state and would fail the Anthropic tool_use ⇄ tool_result pairing check. Read-time repair injects a synthetic ToolMessage on every load, self-healing via the next checkpoint write at node boundary. - build_tool_resume_repair is idempotent and O(trailing-turn) — no perf regression on the happy path. - Never mutates caller-supplied lists; returns a new CheckpointTuple with a copied channel_values["messages"]. - Gracefully no-ops when langchain_core is not importable (non-agent checkpointer use cases). Middleware and session.repair() remain publicly exported for callers who want explicit control or are building on custom graphs outside create_agent's shape. Co-authored-by: Isaac
AsyncDatabricksSession.get_items() now applies the orphan/duplicate walker in memory before returning. Parallels the LangGraph checkpointer read-time repair landed earlier in this branch. Closes the last gap for middleware-free templates: after PR 195 strips session.repair() from the template, a base-session (stable session_id) crash leaves orphan function_calls in session state that OpenAI's Runner would happily echo to the LLM on the next turn — 400 on tool_calls pairing. - Factor the walk out of repair() into _sanitize_items(items, synthetic). Returns the caller's list unchanged on the happy path so callers can cheaply skip re-persistence. - Add auto_repair / auto_repair_synthetic_output constructor kwargs (default True + the same interrupted-by-resume text). - Override get_items() to apply the filter when auto_repair is on. Runner.get_items() now always sees protocol-valid items. - repair() keeps working (bypasses the override to read raw items) and continues to rewrite the DB for callers that want persistent cleanup. 7 new tests cover the sanitizer walker (trailing orphan, mid-history orphan, multi-parallel-orphan, duplicate dedup, orphan-output drop) and the get_items auto-repair path (on/off). Co-authored-by: Isaac
Rotation gives each resume attempt a fresh conversation anchor, which means the LLM on attempt 2 sees no signal that it's a retry — it'll re-plan from scratch and can re-run read-only tools it already executed before the crash. Callers asked for a way to detect retries without reading the whole state machine. Zero-opinion breadcrumb: _rotate_conversation_id now stamps custom_inputs["attempt_number"] = N on every resume. Absent on normal first-attempt requests. Templates that want retry-aware behavior (e.g., appending "you are resuming a retry" to the system prompt, or gating side-effectful tool execution) can opt in by reading this field; others ignore it and get the current rotation-default behavior. Co-authored-by: Isaac
… peers
Three sites mint synthetic outputs for orphaned tool calls (server input
sanitizer, OpenAI session auto-repair, LangGraph checkpointer read-time
repair). The old text ("Tool call was interrupted... retry if still
needed") was too generic — dogfood UI testing showed models reading
it and rationally concluding "let me re-run the whole sequence from the
top to get a clean re-execution", re-invoking peers that had already
completed.
New text is scoped to the single interrupted call:
- notes THIS tool call failed and no result is available
- asserts other tool calls' results in history are still valid
- suggests re-invoking only this specific tool if still needed
Informative rather than directive — leaves room for the model's judgment
while removing the ambiguity that drove full-sequence re-runs.
All three constants carry the same text so LangGraph and OpenAI stay in
sync regardless of which layer (server, session, checkpointer) minted
the synthetic output.
Co-authored-by: Isaac
Rotation gave each resume attempt a fresh thread but ALSO blanked
attempt N+1's visibility into what attempt N already accomplished — the
LLM re-planned from just the user's latest message and re-emitted tool
calls that had already completed. Dogfood UI testing showed this clearly:
a "call get_time then deep_research" turn that crashed during deep_research
would, on resume, re-run get_time too even though its result had already
streamed to the client.
_try_claim_and_resume now collects the prior attempt's emitted
function_call / function_call_output items from agent_server.messages and
appends them to the replayed input[]. The sanitizer's full-history walker
then closes the interrupted call with a synthetic "[INTERRUPTED]" output.
Net effect on attempt N+1's LLM:
user: call get_time then deep_research
AI: tool_call c1 (get_time)
tool: <real result from attempt N>
AI: tool_call c2 (deep_research)
tool: [INTERRUPTED] This tool call did not complete...
The LLM sees attempt N's get_time result as valid history and the
informative synthetic output on the interrupted deep_research call,
guiding it to re-invoke only the interrupted tool instead of the whole
chain.
Mechanics:
- Read events from the same get_messages() call already used to compute
the response.resumed sentinel's sequence_number — no extra DB round-trip.
- Filter to attempt_number == new_attempt - 1 + item.type in
{function_call, function_call_output}.
- Append to deep-copied input[] before the existing sanitizer + rotation.
Composes with the attempt_number breadcrumb (handlers that want to can
still branch on custom_inputs.attempt_number > 1 for retry-aware
behavior).
Co-authored-by: Isaac
Extend _collect_prior_attempt_tool_events to carry forward response.output_item.done items of type "message" (assistant narrative text) in addition to function_call / function_call_output. Lets the next attempt's LLM see its own prior narration as history so it continues where it left off instead of re-narrating from scratch. Only fully-completed items are recoverable — mid-stream partial text (output_text.delta frames that never reached output_item.done) can't be reassembled from the event log and is lost on crash. Limits the allow-list to the three known conversational types (function_call, function_call_output, message) so future item kinds don't auto-flow through without review. Co-authored-by: Isaac
A text-only crash — a model generating a long response that gets killed mid-token-stream — never emits response.output_item.done for the in-flight assistant message. My prior "only completed items" filter found nothing to inherit and attempt N+1 regenerated from the top. Observed directly in UI testing: 324 output_text.delta events on attempt 1, zero output_item.done, attempt 2 restarted from the beginning. Fix: walk the same event log twice in one pass. Besides the existing output_item.done collection, track in-flight message items by id via output_item.added, accumulate their output_text.delta frames, and emit any never-completed items as synthetic assistant message items at the end. The assembled text gives attempt N+1's LLM the partial narration as prior assistant context, letting it continue where the crash cut off instead of restarting. If a .done eventually lands for a tracked id, the in-progress tracker is cleared — no duplicate emission alongside the completed item. Co-authored-by: Isaac
…eams
Extends the partial-reassembly path beyond message text. Generalises
into a dispatch table so adding a new item kind is two entries:
a reassembler callable + its expected delta event type(s).
- reasoning items: added to the inheritable allow-list. Partial
reasoning (added + delta frames, no .done) is reassembled from
response.reasoning.delta / response.reasoning_summary_text.delta
frames into a synthetic reasoning item with a summary_text block.
Relevant for thinking-mode models (Claude extended thinking,
o1-style) where the reasoning token stream can crash mid-flight.
- function_call argument streams: added + function_call_arguments.delta
frames reassemble into a function_call item with the accumulated
arguments text. If the partial JSON doesn't parse, we fall back to
'{}' — the item is still protocol-valid input and the input
sanitizer will pair it with a synthetic [INTERRUPTED] output, so the
LLM sees "this tool call started but didn't finish" rather than
losing the attempt entirely.
File_search / code_interpreter / computer_call items are intentionally
NOT auto-inherited — they're app-specific and their shapes vary; add
to the allow-list if an app needs them.
Co-authored-by: Isaac
…havior Two docstrings still described the older "input=[]" resume contract (before the rotation + replay + prior-attempt-inheritance work landed): - LongRunningAgentServer class docstring: said the handler is re-invoked with input=[] and SDKs load prior progress. Now it's re-invoked with rotated conversation_id + original input + inherited prior-attempt items + sanitizer-paired synthetic [INTERRUPTED] outputs. - _try_claim_and_resume method docstring: same correction. No behavior change — docs-only. Co-authored-by: Isaac
Remove the exploratory surface that the final design doesn't actually need. The remaining code is the smallest set that makes rotate+replay + inheritance + read-time repair work end-to-end. Dropped: - **build_tool_resume_repair_middleware** (checkpoint.py, __init__.py export). The AsyncCheckpointSaver.aget_tuple read-time repair covers the same case without the middleware API surface. The underlying pure walker (build_tool_resume_repair) stays since both the read-time repair and custom-graph users can import it directly. - **AsyncDatabricksSession.repair()** (session.py). Destructive DB rewriter. Redundant now that get_items()'s auto_repair filter returns protocol-valid items on every read. The _sanitize_items helper stays, and _item_dict was removed as unused. - **Reasoning-item inheritance + partial reasoning reassembly** (server.py). Dropped from _INHERITABLE_ITEM_TYPES and the reassemblers dispatch. Templates don't exercise reasoning mode; re-add to the allow-list when someone ships a thinking-mode app. - **function_call argument-stream reassembly** (server.py). Edge case: a crash after output_item.added for a function_call but before args finish streaming. Partial JSON is risky to feed back anyway; we drop it and let the next attempt's LLM re-decide. - **custom_inputs.attempt_number breadcrumb** (server.py). Unused by any template; retry awareness comes from the synthetic [INTERRUPTED] output text on the inherited item, not from this breadcrumb. Kept essentials: - rotate + replay of original_request.input on resume - full-history input sanitizer - AsyncCheckpointSaver.aget_tuple read-time repair - AsyncDatabricksSession.get_items auto-repair - prior-attempt completed-item inheritance (function_call, function_call_output, message) - partial assistant-message text reassembly from output_text.delta frames - [durable] lifecycle logs - debug /_debug/kill_task endpoint (env-gated) Tests drop 4 (reasoning, function_call args parse+fallback, attempt_number breadcrumb). Net: 344 fewer lines; 152 passing tests. Co-authored-by: Isaac
…repair) The public export existed for users who wanted to install the middleware in custom graphs. Since the middleware wrapper is gone (read-time repair in AsyncCheckpointSaver.aget_tuple covers the case transparently for users on our saver), only one internal caller remains: _repair_loaded_checkpoint_tuple. Rename to underscore-prefixed private, drop the databricks_langchain __init__ export and __all__ entry. No behavior change. Co-authored-by: Isaac
OpenAI Agents Runner's stream_events() awaits a queue that drains fast; without an explicit yield point per event, task.cancel() can sit for tens of seconds during a text-heavy stream before propagating. The deep_research tool path doesn't hit this because asyncio.sleep(15) is cancellable by design. Reproduced: 2000-word essay streaming, /_debug/kill_task issued at 12s, task continued running for another 48s after cancel before exiting naturally. After this fix, cancel should propagate within a single event's worth of stream delay. Co-authored-by: Isaac
Match the non-durable stream format (data-only frames, type carried inside the payload). The previous event: prefix was benign for GPT-5 on openai-advanced because GPT-5 runs a single response.created/completed pair per turn — the AI SDK's Databricks provider parses the data, sees the type, and completes cleanly. For Claude via the OpenAI-compatible endpoint, each tool iteration in a multi-tool turn emits its own response.created/response.completed pair; combined with the event: prefix, the provider's state machine fails to emit a clean finish UIMessageChunk and the UI retries forever. The non-durable path never had this because its format is data-only; matching it here is the minimal fix. Co-authored-by: Isaac
Root cause of stuck durable resume on Claude multi-tool turns: the openai-agents SDK's Anthropic adapter rejects the replayed input with HTTP 400 — "tool_use ids were found without tool_result blocks immediately after". Claude's attempt 1 event stream interleaves function_call → narrative message → function_call_output, and my inheritance preserved that order. When the adapter converts each item to an Anthropic message, the narrative message lands between the assistant's tool_use and the user's tool_result, violating Anthropic's "tool_use immediately followed by tool_result" contract. Attempt 2 therefore fails on its first LLM call and the agent never continues. Fix: remove completed `message` items from the inheritable allow-list so only function_call + function_call_output pairs flow through the replay — and those are naturally paired in the event log, so the Anthropic adapter produces a valid message sequence. Preserved: - Partial mid-stream text reassembly (different code path) — a text-only crash still synthesizes the partial assistant message from output_text.delta frames and inherits it, so Claude's prefill continuation still works. - .done cleanup of the partial tracker fires regardless of whether the completed item is on the inherit list, so a fully-completed message no longer gets falsely "reassembled" as a partial at the end of the walk. Co-authored-by: Isaac
…pping Prior fix dropped completed `message` items to avoid Anthropic's "tool_use must be immediately followed by tool_result" 400 on replay. Better alternative: keep the narrative but move it past the tool pairs so each function_call stays adjacent to its function_call_output. Collector now accumulates into two buckets while walking events: - tool_items: function_call / function_call_output in event-log order - narrative_items: completed message items (and the partial reassembled one if text crashed mid-stream) in event-log order Returns tool_items + narrative_items. The resulting Anthropic sequence is user → [assistant(tool_use_A) → user(tool_result_A) → ...] → assistant(narrative text). Valid alternation, and the LLM still gets the prior narrative as trailing context for continuation decisions. Co-authored-by: Isaac
Marker commit: paired with app-templates PR #195 final state, this branch is now a correctly working stable baseline for durable execution across both LangGraph and OpenAI advanced templates. UI-validated scenarios that pass cleanly: - LangGraph + Claude, single-tool interrupt + resume (checkpointer read-time repair closes orphan tool_uses on stable thread). - LangGraph + Claude, multi-tool interrupt + resume, agent continues. - LangGraph + Claude, text-only mid-stream crash + resume with partial-text continuation (prefill). - OpenAI + GPT-5, single-tool + multi-tool interrupt + resume. - OpenAI + Claude, multi-tool interrupt + resume (previously hit the Anthropic "tool_use must be followed by tool_result" 400; fixed by hoisting inherited narrative messages past the tool pairs). - OpenAI + Claude, text-only mid-stream crash + resume with prefill continuation. - Cross-turn recall after crash-and-resume on both templates. Key pieces in this PR: - LongRunningAgentServer: rotate conversation_id + replay original_request.input + prior-attempt tool-pair inheritance + narrative hoist + partial-text reassembly + full-history input sanitizer. Data-only SSE frames (no event: prefix). - AsyncCheckpointSaver.aget_tuple: read-time orphan repair via build_tool_resume_repair for stable-thread baselines. - AsyncDatabricksSession.get_items: auto_repair sanitizer returns protocol-valid items without touching the DB. - asyncio.sleep(0) yield in stream loop so task.cancel() propagates promptly during tight text streams. Co-authored-by: Isaac
Split the orphan tool-call sanitizer out of server.py / session.py into long_running/repair.py so the server input sanitizer and the OpenAI AsyncDatabricksSession.get_items auto-repair use the same walker (server was ~100 LOC, session was ~78 LOC, now one shared helper). Also: - Rename _INHERITABLE_ITEM_TYPES -> _TOOL_PAIR_TYPES to reflect that message items are hoisted separately, not inherited. - Split _collect_prior_attempt_tool_events into three focused helpers: _iter_attempt_events, _extract_completed_items, _reassemble_partial_message. - Restore the Args docstring on LongRunningAgentServer.__init__. No behavior change; all 179 unit tests pass (long_running + langchain checkpoint + openai session). Co-authored-by: Isaac
Cleanup pass on the durable-resume surface following PR review: 1. Single source of truth for the synthetic tool-output string. Previously three copies (server.py, session.py, checkpoint.py), each under a different constant name, all identical. Follow the existing pattern from lakebase.py (defined once, imported into integrations) and lift it to long_running/repair.py as ``DEFAULT_SYNTHETIC_INTERRUPTED_OUTPUT``. It becomes the default for ``sanitize_tool_items(synthetic_output=...)`` so callers that don't customise don't pass anything. 2. Drop the ``auto_repair`` / ``auto_repair_synthetic_output`` parameters from ``AsyncDatabricksSession.__init__``. No realistic caller wants auto-repair off (it yields protocol-invalid histories that break the next call), and no one needs to customise the synthetic output text. ``get_items()`` is now unconditionally repaired and ``_sanitize_items`` is a one-liner that only sets the session-scoped log prefix. 3. Remove the pass-through ``synthetic_output`` parameter from ``_sanitize_request_input`` (server.py) and ``_build_tool_resume_repair`` (checkpoint.py). Both were internal helpers nothing else overrode — the caller trail simply forwarded the default to ``sanitize_tool_items``. 4. Restore the class-level Args docstring on ``LongRunningAgentServer`` matching the origin/main shape. Existing param entries keep their original wording verbatim; the only additions are ``heartbeat_interval_seconds`` and ``heartbeat_stale_threshold_seconds``. The redundant ``__init__`` docstring is removed so there's one source of param docs. Net: -19 LOC in session.py, -27 LOC in server.py, -16 LOC in checkpoint.py; the shared constant exists once; all 151 tests (long_running + langchain checkpoint + openai session) pass. Co-authored-by: Isaac
# Conflicts: # integrations/openai/src/databricks_openai/agents/session.py # integrations/openai/tests/unit_tests/test_session.py
Fix the two CI lint failures exposed after the main merge: - repair.py: drop unused ``Optional`` import (F401) - repair.py + server.py: ruff format reformatting Co-authored-by: Isaac
``databricks_langchain.checkpoint`` is Python 3.10+ and re-exported eagerly from ``databricks_langchain/__init__.py``. Placing ``DEFAULT_SYNTHETIC_INTERRUPTED_OUTPUT`` inside ``long_running/repair.py`` meant any ``import databricks_langchain`` triggered ``long_running/__init__.py``, which: 1. ``from .server import ...`` — ``server.py`` raises RuntimeError on Python < 3.11, breaking langchain tests on 3.10. 2. ``from .db import ...`` — pulls ``psycopg``, breaking any langchain test that doesn't have ``[memory]`` extras installed. Both failed in CI. Move the file to top-level ``databricks_ai_bridge.tool_repair`` so it can be imported without crossing the ``long_running/`` package boundary. Three import sites updated (server, session, checkpoint) plus the test. No API break — ``sanitize_tool_items`` and ``DEFAULT_SYNTHETIC_INTERRUPTED_OUTPUT`` were still pre-release. Co-authored-by: Isaac
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds durable execution to
LongRunningAgentServer: pods that die mid-stream are seamlessly recovered by another pod, with SDK-agnostic history repair so agent code stays untouched.How it works
Heartbeat + CAS claim. The owning pod writes
heartbeat_ateveryheartbeat_interval_seconds(3s). WhenGET /responses/{id}lands on any pod and the row's heartbeat is older thanheartbeat_stale_threshold_seconds(10s), the serving pod atomically claims the row via a conditionalUPDATE … RETURNINGand bumpsattempt_number.Rotate + replay + sanitize. On claim, the new pod re-invokes the registered handler with:
conversation_id(suffixed::attempt-N) so the SDK opens a fresh thread/session for the replayinputenriched with the prior attempt's completed tool pairs + narrative messages + a synthetic[INTERRUPTED]output paired with any tool call that didn't finishsanitize_tool_items(shared walker inlong_running/repair.py) so Anthropic'stool_use→tool_resultpairing holdsSDK read-time repair. Each integration wraps its own history store to filter orphans on load, preventing a past crash from poisoning subsequent turns:
databricks-openai:AsyncDatabricksSession.get_items()always runssanitize_tool_itemson reads.databricks-langchain: the checkpointer's(a)get_tupleis wrapped by_repair_loaded_checkpoint_tuple, which injects syntheticToolMessages for orphanAIMessage.tool_callsin the trailing turn.Stream resume.
GET /responses/{id}?stream=true&starting_after=Nreplays the message log past the cursor; aresponse.resumedsentinel marks attempt boundaries.Additions
long_running/repair.py— shared orphan-pair sanitizer + canonicalDEFAULT_SYNTHETIC_INTERRUPTED_OUTPUTlong_running/server.py— heartbeat writer, CAS claim,_try_claim_and_resume, rotate conversation_id, prior-attempt event inheritance,response.resumedsentinel/_debug/kill_task/{id}endpoint gated byLONG_RUNNING_ENABLE_DEBUG_KILL=1for integration tests[durable]INFO lifecycle logs (heartbeat, claim, stream start/stop)ADD COLUMN IF NOT EXISTSmigrations with a WARN summary when skipped due to insufficient privilegeSettings (new)
heartbeat_interval_seconds = 3.0heartbeat_stale_threshold_seconds = 10.0(validator: must exceed interval)Test plan
long_running+ 40databricks-openaisession + 18databricks-langchaincheckpoint)/_debug/kill_task/{id}onagent-openai-advanced+agent-langgraph-advanced— see app-templates PR #195agent-openai-advancedapp: kill-mid-tool, verify durable resume preserves completed tool pairs and injects synthetic[INTERRUPTED]for the interrupted callKnown follow-ups (non-blocking)
DELETE /responses/{id}implementation (currently 501)heartbeat_miss_count,claim_latency_seconds,attempts_per_response