Skip to content

LongRunningAgentServer: durable resume via heartbeat + CAS claim#416

Open
dhruv0811 wants to merge 38 commits intomainfrom
dhruv0811/durable-execution-resume
Open

LongRunningAgentServer: durable resume via heartbeat + CAS claim#416
dhruv0811 wants to merge 38 commits intomainfrom
dhruv0811/durable-execution-resume

Conversation

@dhruv0811
Copy link
Copy Markdown
Contributor

@dhruv0811 dhruv0811 commented Apr 16, 2026

Summary

Adds durable execution to LongRunningAgentServer: pods that die mid-stream are seamlessly recovered by another pod, with SDK-agnostic history repair so agent code stays untouched.

How it works

Heartbeat + CAS claim. The owning pod writes heartbeat_at every heartbeat_interval_seconds (3s). When GET /responses/{id} lands on any pod and the row's heartbeat is older than heartbeat_stale_threshold_seconds (10s), the serving pod atomically claims the row via a conditional UPDATE … RETURNING and bumps attempt_number.

Rotate + replay + sanitize. On claim, the new pod re-invokes the registered handler with:

  • a rotated conversation_id (suffixed ::attempt-N) so the SDK opens a fresh thread/session for the replay
  • the original request's input enriched with the prior attempt's completed tool pairs + narrative messages + a synthetic [INTERRUPTED] output paired with any tool call that didn't finish
  • all items passed through sanitize_tool_items (shared walker in long_running/repair.py) so Anthropic's tool_usetool_result pairing holds

SDK read-time repair. Each integration wraps its own history store to filter orphans on load, preventing a past crash from poisoning subsequent turns:

  • databricks-openai: AsyncDatabricksSession.get_items() always runs sanitize_tool_items on reads.
  • databricks-langchain: the checkpointer's (a)get_tuple is wrapped by _repair_loaded_checkpoint_tuple, which injects synthetic ToolMessages for orphan AIMessage.tool_calls in the trailing turn.

Stream resume. GET /responses/{id}?stream=true&starting_after=N replays the message log past the cursor; a response.resumed sentinel marks attempt boundaries.

Additions

  • long_running/repair.py — shared orphan-pair sanitizer + canonical DEFAULT_SYNTHETIC_INTERRUPTED_OUTPUT
  • long_running/server.py — heartbeat writer, CAS claim, _try_claim_and_resume, rotate conversation_id, prior-attempt event inheritance, response.resumed sentinel
  • /_debug/kill_task/{id} endpoint gated by LONG_RUNNING_ENABLE_DEBUG_KILL=1 for integration tests
  • [durable] INFO lifecycle logs (heartbeat, claim, stream start/stop)
  • Idempotent ADD COLUMN IF NOT EXISTS migrations with a WARN summary when skipped due to insufficient privilege

Settings (new)

  • heartbeat_interval_seconds = 3.0
  • heartbeat_stale_threshold_seconds = 10.0 (validator: must exceed interval)

Test plan

  • 179 unit tests pass (120 long_running + 40 databricks-openai session + 18 databricks-langchain checkpoint)
  • Deployed crash-recovery matrix via /_debug/kill_task/{id} on agent-openai-advanced + agent-langgraph-advanced — see app-templates PR #195
  • Multi-tool UI validation on Claude through the deployed agent-openai-advanced app: kill-mid-tool, verify durable resume preserves completed tool pairs and injects synthetic [INTERRUPTED] for the interrupted call
  • Passing CI run: https://github.com/databricks-eng/ai-oss-integration-tests-runner/actions/runs/24864211860

Known follow-ups (non-blocking)

  • Concurrent-claim / heartbeat-cascade / sequence-collision integration tests
  • DELETE /responses/{id} implementation (currently 501)
  • Exponential backoff in client resume loop
  • Metrics: heartbeat_miss_count, claim_latency_seconds, attempts_per_response

Mid-stream pod crashes currently mark in-progress runs as failed once the
task_timeout elapses — any in-flight LLM/tool work is lost. This change adds
best-effort crash recovery: another pod can atomically claim a stale run and
re-invoke the registered handler, and the agent SDK's own checkpointer
(LangGraph AsyncCheckpointSaver, databricks-openai Session) picks up prior
progress so completed tool calls are not re-executed.

Schema additions (idempotent ADD COLUMN IF NOT EXISTS):
- responses.owner_pod_id, heartbeat_at, attempt_number, original_request
- messages.attempt_number
- idx_responses_stale partial index

Server changes:
- _handle_background_request anchors context.conversation_id to response_id
  when the client didn't pin a thread_id/session_id. Both agent templates'
  helpers read context.conversation_id as priority-2 fallback, so a replay
  from a different pod resolves to the same agent-SDK thread/session without
  any template code change.
- original_request is persisted on create_response so another pod can
  re-invoke with the same arguments.
- _heartbeat async context writes heartbeat_at every heartbeat_interval_seconds
  (default 3s) while the agent loop runs, stops cleanly on exit.
- _handle_retrieve_request invokes _try_claim_and_resume before returning:
  if the run is in_progress and heartbeat is older than
  heartbeat_stale_threshold_seconds (default 15s), CAS the row, increment
  attempt_number, emit a response.resumed sentinel event at the next seq,
  and spawn _run_background_stream(attempt_number=N+1) with input=[].
- Sequence numbers stay monotonic across attempts so client cursors
  (GET /responses/{id}?stream=true&starting_after=N) resume correctly.
- _handle_retrieve_request's non-stream path filters to output_item.done
  events (the authoritative conversation record).

New settings:
- heartbeat_interval_seconds: 3.0
- heartbeat_stale_threshold_seconds: 15.0
- validated: stale > interval

Out of scope (follow-up PRs): cancellation implementation (still 501),
eager multi-pod polling for stale work (v1 uses lazy reclaim on retrieve).

Tests:
- 11 new unit tests covering _inject_conversation_id priority rules,
  _try_claim_and_resume scenarios (grace period, missing original_request,
  failed claim, successful claim + sentinel emission, empty-input replay),
  _heartbeat context lifecycle, and settings validation.
- Existing tests migrated to the new ResponseInfo (8-field) and
  get_messages 4-tuple shapes via _resp_info()/_msg() helpers.
init_db now runs idempotent ADD COLUMN IF NOT EXISTS for the durability
columns after the initial create_all. The existing assert_awaited_once is
obsolete — assert the full SQL corpus instead.
…uest

_handle_invocations_request returns a ResponsesAgentRequest pydantic model
from the validator, not a dict. _inject_conversation_id was calling .get()
on it — runtime AttributeError on the first POST with background=true.

Fix: dump to dict, inject, and round-trip back through the validator so the
downstream handler still sees the declared pydantic type. Same round-trip
applied to the resume path in _try_claim_and_resume.
ADD COLUMN IF NOT EXISTS is a no-op when the column already exists, but PG
still enforces table-ownership before running the ALTER. Multiple pods
running different SPs against the same agent_server schema — or a developer
connecting as themselves to an already-migrated dogfood instance — hit
'must be owner of table responses' during init_db and fail to start.

Split migrations into one-statement-per-transaction and swallow
InsufficientPrivilege with an info log. If the column is missing and we
really can't grant it, the next query that uses the column will fail loudly
— better than refusing to boot the app on a shared DB.
Clients and tests need a way to see that a response was resumed across
pods (attempt_number > 1) without grepping server logs. Adds the field to
every shape of the non-stream retrieve return body.
Integration tests for the durable-resume path want to simulate a pod crash
without actually restarting the pod (costly, disruptive, affects other
traffic). Add POST /_debug/kill_task/{response_id} that cancels the
in-flight asyncio task owning that response. CancelledError propagates past
_task_scope's Exception handlers (it's BaseException in py3.8+), so the DB
row stays in_progress with a going-stale heartbeat — exactly the shape a
real crash leaves. A client GET then triggers lazy reclaim and resume.

Gated behind env var LONG_RUNNING_ENABLE_DEBUG_KILL=1 so the endpoint is
never exposed in production. Tracks tasks in a per-server dict cleared via
done-callback; purely an observation affordance, durability still hinges
on DB state.
- ruff format: line-length normalization in models.py / db.py / server.py / test_long_running_server.py (no behavior change).
- ruff F401: drop unused 'import asyncio as _a' inside TestTryClaimAndResume.
- ty call-non-callable: replace hasattr + attribute access with getattr +
  callable check. request_data is typed as dict but is a pydantic model at
  runtime (returned by validate_and_convert_request); narrowing via
  getattr(...) keeps ty happy and preserves the fallback for tests that pass
  dicts directly.
response_id lived only inside the nested 'response' object of the
response.created event, so proxies and clients had to know that specific
shape to extract it. Add it as a top-level field on every frame alongside
sequence_number — discoverable without schema knowledge.
A stream opened BEFORE the owning pod died would poll the DB forever
waiting for events that never come: _try_claim_and_resume only fires on
fresh GET retrieve, not inside the existing poll loop. When the kill
hit during a live stream, the UI just hung. Now every poll iteration
also tries to reclaim — a no-op for fresh runs, a rescue path for dead
ones.
Recovers crashed conversations faster — ~3 heartbeat intervals tolerated
before reclaim instead of 5. Still plenty of margin over the 3s heartbeat
interval to avoid false positives.
Every transition now shows up in apps logs prefixed with [durable]:
- heartbeat start / beat#N sampled / stop with total
- stale heartbeat detected with age + threshold
- attempting claim / claim succeeded / claim lost
- claim skipped with reason (grace_period, heartbeat_fresh, no_original_request)
- background stream start / completed with totals
- kill endpoint: cancelling task

Fresh-heartbeat skips stay at DEBUG to avoid spamming every poll tick.
Periodic heartbeat sampled every 5 beats (~15s at default interval).
AsyncDatabricksSession.repair():
  Dedupes function_call/function_call_output items by call_id and injects
  synthetic function_call_outputs for orphans left behind by a mid-tool
  kill. Returns the count injected. No-op when clean.

build_tool_resume_repair(messages) in databricks_langchain:
  Pure helper that returns synthetic ToolMessages for AIMessage.tool_calls
  in the trailing assistant turn whose paired ToolMessage never landed.
  Append via the add_messages reducer to satisfy Anthropic's tool_use
  ⇄ tool_result contract on resume.

Both were previously in-template workarounds in agent-openai-advanced
and agent-langgraph-advanced; moving them into the library so any user
of LongRunningAgentServer + AsyncDatabricksSession or AsyncCheckpointSaver
gets correct mid-tool crash-resume behavior without template-side fixes.

Co-authored-by: Isaac
- databricks_langchain.build_tool_resume_repair_pre_model_hook(): returns a
  LangGraph pre_model_hook that wraps build_tool_resume_repair. Lets
  templates wire durable-resume recovery via one create_agent arg instead
  of manual aget_state/aupdate_state surgery in the handler (which also
  required as_node="tools" to avoid a KeyError: 'model' in the branch
  re-evaluation). Fires on every model turn; no-op when state is clean.

- db.py: promote "skipped migration due to insufficient privilege" from
  INFO to a single WARN summary at startup. If the DB was previously
  migrated by a different service principal this is expected, but if our
  SP genuinely lacks ALTER on a fresh table the claim/heartbeat path will
  fail later with "column does not exist" — surface the risk clearly.

Co-authored-by: Isaac
langchain.agents.create_agent in 1.x uses AgentMiddleware (middleware=[...])
instead of the older pre_model_hook arg. Rename the helper and return an
AgentMiddleware instance whose before_model / abefore_model methods run
build_tool_resume_repair. Zero behavior change; matches the public API of
the current langchain release.

Co-authored-by: Isaac
@dhruv0811 dhruv0811 marked this pull request as ready for review April 21, 2026 23:14
@dhruv0811 dhruv0811 requested review from bbqiu and jennsun and removed request for jennsun April 21, 2026 23:20
Move both durability concerns out of user-space and into the server. The
LangGraph middleware and OpenAI session.repair() become optional — templates
no longer need to install them.

Two coordinated changes in server.py:

1. Rotate context.conversation_id on every resume attempt. _try_claim_and_resume
   now deep-copies original_request, replays input verbatim (instead of
   blanking to []), drops custom_inputs.thread_id/session_id, and sets
   context.conversation_id = f"{base}::attempt-{N}" so the handler's SDK
   helpers resolve to a fresh thread/session. Base anchors on whatever the
   user actually pinned (thread_id/session_id/conversation_id/response_id);
   rotation always re-anchors on original_request — no stacking across
   attempts. Trade-off: attempt N>1 re-runs the LLM on the pre-crash input
   instead of picking up mid-turn state; strictly safer than inheriting
   mid-turn checkpoint state that the SDK can't fully repair (notably the
   LangGraph stream-event attempt-boundary orphan artifact seen in the
   stress test).

2. Full-history _sanitize_request_input runs before the handler sees the
   request (both initial POST and resume replay). Drops duplicate call_ids,
   drops orphan function_call_output items, injects synthetic outputs for
   function_calls with no output. Walks the whole list — neither the
   LangGraph middleware (trailing-only) nor session.repair() (session-only)
   cover mid-history orphans that come in via UI echo. Gated by
   auto_sanitize_input (default True).

14 new unit tests cover the sanitizer (paired/orphan/duplicate/mid-history/
chat-completions shape), rotation (all four fallback priorities), and the
resume-replays-input-not-empty flow.

Co-authored-by: Isaac
AsyncCheckpointSaver.aget_tuple / CheckpointSaver.get_tuple now run
build_tool_resume_repair on returned state before handing it to the graph.
With the LongRunningAgentServer rotate-on-resume + input-sanitizer landing
in the same PR, this closes the last gap: a stable-thread_id client that
relies on the checkpointer as cross-turn truth still survives a mid-tool
crash, without the caller installing build_tool_resume_repair_middleware.

Scenario the unit tests capture: turn 2 crashes mid-tool → checkpoint
retains an orphan AIMessage.tool_calls on the stable thread → turn 3
loads that state and would fail the Anthropic tool_use ⇄ tool_result
pairing check. Read-time repair injects a synthetic ToolMessage on every
load, self-healing via the next checkpoint write at node boundary.

- build_tool_resume_repair is idempotent and O(trailing-turn) — no perf
  regression on the happy path.
- Never mutates caller-supplied lists; returns a new CheckpointTuple with
  a copied channel_values["messages"].
- Gracefully no-ops when langchain_core is not importable (non-agent
  checkpointer use cases).

Middleware and session.repair() remain publicly exported for callers who
want explicit control or are building on custom graphs outside
create_agent's shape.

Co-authored-by: Isaac
AsyncDatabricksSession.get_items() now applies the orphan/duplicate walker
in memory before returning. Parallels the LangGraph checkpointer read-time
repair landed earlier in this branch. Closes the last gap for
middleware-free templates: after PR 195 strips session.repair() from
the template, a base-session (stable session_id) crash leaves orphan
function_calls in session state that OpenAI's Runner would happily echo
to the LLM on the next turn — 400 on tool_calls pairing.

- Factor the walk out of repair() into _sanitize_items(items, synthetic).
  Returns the caller's list unchanged on the happy path so callers can
  cheaply skip re-persistence.
- Add auto_repair / auto_repair_synthetic_output constructor kwargs
  (default True + the same interrupted-by-resume text).
- Override get_items() to apply the filter when auto_repair is on.
  Runner.get_items() now always sees protocol-valid items.
- repair() keeps working (bypasses the override to read raw items) and
  continues to rewrite the DB for callers that want persistent cleanup.

7 new tests cover the sanitizer walker (trailing orphan, mid-history
orphan, multi-parallel-orphan, duplicate dedup, orphan-output drop) and
the get_items auto-repair path (on/off).

Co-authored-by: Isaac
Rotation gives each resume attempt a fresh conversation anchor, which
means the LLM on attempt 2 sees no signal that it's a retry — it'll
re-plan from scratch and can re-run read-only tools it already executed
before the crash. Callers asked for a way to detect retries without
reading the whole state machine.

Zero-opinion breadcrumb: _rotate_conversation_id now stamps
custom_inputs["attempt_number"] = N on every resume. Absent on normal
first-attempt requests. Templates that want retry-aware behavior (e.g.,
appending "you are resuming a retry" to the system prompt, or gating
side-effectful tool execution) can opt in by reading this field; others
ignore it and get the current rotation-default behavior.

Co-authored-by: Isaac
… peers

Three sites mint synthetic outputs for orphaned tool calls (server input
sanitizer, OpenAI session auto-repair, LangGraph checkpointer read-time
repair). The old text ("Tool call was interrupted... retry if still
needed") was too generic — dogfood UI testing showed models reading
it and rationally concluding "let me re-run the whole sequence from the
top to get a clean re-execution", re-invoking peers that had already
completed.

New text is scoped to the single interrupted call:
- notes THIS tool call failed and no result is available
- asserts other tool calls' results in history are still valid
- suggests re-invoking only this specific tool if still needed

Informative rather than directive — leaves room for the model's judgment
while removing the ambiguity that drove full-sequence re-runs.

All three constants carry the same text so LangGraph and OpenAI stay in
sync regardless of which layer (server, session, checkpointer) minted
the synthetic output.

Co-authored-by: Isaac
Rotation gave each resume attempt a fresh thread but ALSO blanked
attempt N+1's visibility into what attempt N already accomplished — the
LLM re-planned from just the user's latest message and re-emitted tool
calls that had already completed. Dogfood UI testing showed this clearly:
a "call get_time then deep_research" turn that crashed during deep_research
would, on resume, re-run get_time too even though its result had already
streamed to the client.

_try_claim_and_resume now collects the prior attempt's emitted
function_call / function_call_output items from agent_server.messages and
appends them to the replayed input[]. The sanitizer's full-history walker
then closes the interrupted call with a synthetic "[INTERRUPTED]" output.
Net effect on attempt N+1's LLM:

  user: call get_time then deep_research
  AI:   tool_call c1 (get_time)
  tool: <real result from attempt N>
  AI:   tool_call c2 (deep_research)
  tool: [INTERRUPTED] This tool call did not complete...

The LLM sees attempt N's get_time result as valid history and the
informative synthetic output on the interrupted deep_research call,
guiding it to re-invoke only the interrupted tool instead of the whole
chain.

Mechanics:
- Read events from the same get_messages() call already used to compute
  the response.resumed sentinel's sequence_number — no extra DB round-trip.
- Filter to attempt_number == new_attempt - 1 + item.type in
  {function_call, function_call_output}.
- Append to deep-copied input[] before the existing sanitizer + rotation.

Composes with the attempt_number breadcrumb (handlers that want to can
still branch on custom_inputs.attempt_number > 1 for retry-aware
behavior).

Co-authored-by: Isaac
Extend _collect_prior_attempt_tool_events to carry forward
response.output_item.done items of type "message" (assistant narrative
text) in addition to function_call / function_call_output. Lets the
next attempt's LLM see its own prior narration as history so it
continues where it left off instead of re-narrating from scratch.

Only fully-completed items are recoverable — mid-stream partial text
(output_text.delta frames that never reached output_item.done) can't
be reassembled from the event log and is lost on crash.

Limits the allow-list to the three known conversational types
(function_call, function_call_output, message) so future item kinds
don't auto-flow through without review.

Co-authored-by: Isaac
A text-only crash — a model generating a long response that gets killed
mid-token-stream — never emits response.output_item.done for the
in-flight assistant message. My prior "only completed items" filter
found nothing to inherit and attempt N+1 regenerated from the top.
Observed directly in UI testing: 324 output_text.delta events on
attempt 1, zero output_item.done, attempt 2 restarted from the
beginning.

Fix: walk the same event log twice in one pass. Besides the existing
output_item.done collection, track in-flight message items by id via
output_item.added, accumulate their output_text.delta frames, and emit
any never-completed items as synthetic assistant message items at the
end. The assembled text gives attempt N+1's LLM the partial narration
as prior assistant context, letting it continue where the crash cut
off instead of restarting.

If a .done eventually lands for a tracked id, the in-progress tracker
is cleared — no duplicate emission alongside the completed item.

Co-authored-by: Isaac
…eams

Extends the partial-reassembly path beyond message text. Generalises
into a dispatch table so adding a new item kind is two entries:
a reassembler callable + its expected delta event type(s).

- reasoning items: added to the inheritable allow-list. Partial
  reasoning (added + delta frames, no .done) is reassembled from
  response.reasoning.delta / response.reasoning_summary_text.delta
  frames into a synthetic reasoning item with a summary_text block.
  Relevant for thinking-mode models (Claude extended thinking,
  o1-style) where the reasoning token stream can crash mid-flight.

- function_call argument streams: added + function_call_arguments.delta
  frames reassemble into a function_call item with the accumulated
  arguments text. If the partial JSON doesn't parse, we fall back to
  '{}' — the item is still protocol-valid input and the input
  sanitizer will pair it with a synthetic [INTERRUPTED] output, so the
  LLM sees "this tool call started but didn't finish" rather than
  losing the attempt entirely.

File_search / code_interpreter / computer_call items are intentionally
NOT auto-inherited — they're app-specific and their shapes vary; add
to the allow-list if an app needs them.

Co-authored-by: Isaac
…havior

Two docstrings still described the older "input=[]" resume contract
(before the rotation + replay + prior-attempt-inheritance work landed):

- LongRunningAgentServer class docstring: said the handler is
  re-invoked with input=[] and SDKs load prior progress. Now it's
  re-invoked with rotated conversation_id + original input + inherited
  prior-attempt items + sanitizer-paired synthetic [INTERRUPTED]
  outputs.

- _try_claim_and_resume method docstring: same correction.

No behavior change — docs-only.

Co-authored-by: Isaac
Remove the exploratory surface that the final design doesn't actually
need. The remaining code is the smallest set that makes rotate+replay
+ inheritance + read-time repair work end-to-end.

Dropped:

- **build_tool_resume_repair_middleware** (checkpoint.py, __init__.py
  export). The AsyncCheckpointSaver.aget_tuple read-time repair covers
  the same case without the middleware API surface. The underlying
  pure walker (build_tool_resume_repair) stays since both the
  read-time repair and custom-graph users can import it directly.

- **AsyncDatabricksSession.repair()** (session.py). Destructive DB
  rewriter. Redundant now that get_items()'s auto_repair filter
  returns protocol-valid items on every read. The _sanitize_items
  helper stays, and _item_dict was removed as unused.

- **Reasoning-item inheritance + partial reasoning reassembly**
  (server.py). Dropped from _INHERITABLE_ITEM_TYPES and the
  reassemblers dispatch. Templates don't exercise reasoning mode;
  re-add to the allow-list when someone ships a thinking-mode app.

- **function_call argument-stream reassembly** (server.py). Edge case:
  a crash after output_item.added for a function_call but before args
  finish streaming. Partial JSON is risky to feed back anyway; we drop
  it and let the next attempt's LLM re-decide.

- **custom_inputs.attempt_number breadcrumb** (server.py). Unused by
  any template; retry awareness comes from the synthetic [INTERRUPTED]
  output text on the inherited item, not from this breadcrumb.

Kept essentials:

- rotate + replay of original_request.input on resume
- full-history input sanitizer
- AsyncCheckpointSaver.aget_tuple read-time repair
- AsyncDatabricksSession.get_items auto-repair
- prior-attempt completed-item inheritance (function_call,
  function_call_output, message)
- partial assistant-message text reassembly from
  output_text.delta frames
- [durable] lifecycle logs
- debug /_debug/kill_task endpoint (env-gated)

Tests drop 4 (reasoning, function_call args parse+fallback,
attempt_number breadcrumb). Net: 344 fewer lines; 152 passing tests.

Co-authored-by: Isaac
…repair)

The public export existed for users who wanted to install the
middleware in custom graphs. Since the middleware wrapper is gone
(read-time repair in AsyncCheckpointSaver.aget_tuple covers the case
transparently for users on our saver), only one internal caller
remains: _repair_loaded_checkpoint_tuple. Rename to underscore-prefixed
private, drop the databricks_langchain __init__ export and __all__ entry.

No behavior change.

Co-authored-by: Isaac
OpenAI Agents Runner's stream_events() awaits a queue that drains fast;
without an explicit yield point per event, task.cancel() can sit for
tens of seconds during a text-heavy stream before propagating. The
deep_research tool path doesn't hit this because asyncio.sleep(15) is
cancellable by design.

Reproduced: 2000-word essay streaming, /_debug/kill_task issued at 12s,
task continued running for another 48s after cancel before exiting
naturally. After this fix, cancel should propagate within a single
event's worth of stream delay.

Co-authored-by: Isaac
Match the non-durable stream format (data-only frames, type carried
inside the payload). The previous event: prefix was benign for GPT-5
on openai-advanced because GPT-5 runs a single response.created/completed
pair per turn — the AI SDK's Databricks provider parses the data, sees
the type, and completes cleanly. For Claude via the OpenAI-compatible
endpoint, each tool iteration in a multi-tool turn emits its own
response.created/response.completed pair; combined with the event:
prefix, the provider's state machine fails to emit a clean finish
UIMessageChunk and the UI retries forever. The non-durable path never
had this because its format is data-only; matching it here is the
minimal fix.

Co-authored-by: Isaac
Root cause of stuck durable resume on Claude multi-tool turns: the
openai-agents SDK's Anthropic adapter rejects the replayed input with
HTTP 400 — "tool_use ids were found without tool_result blocks
immediately after". Claude's attempt 1 event stream interleaves
function_call → narrative message → function_call_output, and my
inheritance preserved that order. When the adapter converts each item
to an Anthropic message, the narrative message lands between the
assistant's tool_use and the user's tool_result, violating Anthropic's
"tool_use immediately followed by tool_result" contract. Attempt 2
therefore fails on its first LLM call and the agent never continues.

Fix: remove completed `message` items from the inheritable allow-list
so only function_call + function_call_output pairs flow through the
replay — and those are naturally paired in the event log, so the
Anthropic adapter produces a valid message sequence.

Preserved:

- Partial mid-stream text reassembly (different code path) — a
  text-only crash still synthesizes the partial assistant message
  from output_text.delta frames and inherits it, so Claude's prefill
  continuation still works.

- .done cleanup of the partial tracker fires regardless of whether
  the completed item is on the inherit list, so a fully-completed
  message no longer gets falsely "reassembled" as a partial at the
  end of the walk.

Co-authored-by: Isaac
…pping

Prior fix dropped completed `message` items to avoid Anthropic's
"tool_use must be immediately followed by tool_result" 400 on replay.
Better alternative: keep the narrative but move it past the tool pairs
so each function_call stays adjacent to its function_call_output.

Collector now accumulates into two buckets while walking events:
- tool_items: function_call / function_call_output in event-log order
- narrative_items: completed message items (and the partial reassembled
  one if text crashed mid-stream) in event-log order

Returns tool_items + narrative_items. The resulting Anthropic sequence
is user → [assistant(tool_use_A) → user(tool_result_A) → ...] →
assistant(narrative text). Valid alternation, and the LLM still gets
the prior narrative as trailing context for continuation decisions.

Co-authored-by: Isaac
Marker commit: paired with app-templates PR #195 final state, this
branch is now a correctly working stable baseline for durable execution
across both LangGraph and OpenAI advanced templates.

UI-validated scenarios that pass cleanly:

  - LangGraph + Claude, single-tool interrupt + resume (checkpointer
    read-time repair closes orphan tool_uses on stable thread).
  - LangGraph + Claude, multi-tool interrupt + resume, agent
    continues.
  - LangGraph + Claude, text-only mid-stream crash + resume with
    partial-text continuation (prefill).
  - OpenAI + GPT-5, single-tool + multi-tool interrupt + resume.
  - OpenAI + Claude, multi-tool interrupt + resume (previously hit the
    Anthropic "tool_use must be followed by tool_result" 400; fixed by
    hoisting inherited narrative messages past the tool pairs).
  - OpenAI + Claude, text-only mid-stream crash + resume with
    prefill continuation.
  - Cross-turn recall after crash-and-resume on both templates.

Key pieces in this PR:

  - LongRunningAgentServer: rotate conversation_id + replay
    original_request.input + prior-attempt tool-pair inheritance +
    narrative hoist + partial-text reassembly + full-history input
    sanitizer. Data-only SSE frames (no event: prefix).
  - AsyncCheckpointSaver.aget_tuple: read-time orphan repair via
    build_tool_resume_repair for stable-thread baselines.
  - AsyncDatabricksSession.get_items: auto_repair sanitizer returns
    protocol-valid items without touching the DB.
  - asyncio.sleep(0) yield in stream loop so task.cancel() propagates
    promptly during tight text streams.

Co-authored-by: Isaac
Split the orphan tool-call sanitizer out of server.py / session.py
into long_running/repair.py so the server input sanitizer and the
OpenAI AsyncDatabricksSession.get_items auto-repair use the same
walker (server was ~100 LOC, session was ~78 LOC, now one shared
helper).

Also:
- Rename _INHERITABLE_ITEM_TYPES -> _TOOL_PAIR_TYPES to reflect that
  message items are hoisted separately, not inherited.
- Split _collect_prior_attempt_tool_events into three focused helpers:
  _iter_attempt_events, _extract_completed_items,
  _reassemble_partial_message.
- Restore the Args docstring on LongRunningAgentServer.__init__.

No behavior change; all 179 unit tests pass (long_running + langchain
checkpoint + openai session).

Co-authored-by: Isaac
Cleanup pass on the durable-resume surface following PR review:

1. Single source of truth for the synthetic tool-output string.
   Previously three copies (server.py, session.py, checkpoint.py),
   each under a different constant name, all identical. Follow the
   existing pattern from lakebase.py (defined once, imported into
   integrations) and lift it to long_running/repair.py as
   ``DEFAULT_SYNTHETIC_INTERRUPTED_OUTPUT``. It becomes the default
   for ``sanitize_tool_items(synthetic_output=...)`` so callers that
   don't customise don't pass anything.

2. Drop the ``auto_repair`` / ``auto_repair_synthetic_output``
   parameters from ``AsyncDatabricksSession.__init__``. No realistic
   caller wants auto-repair off (it yields protocol-invalid histories
   that break the next call), and no one needs to customise the
   synthetic output text. ``get_items()`` is now unconditionally
   repaired and ``_sanitize_items`` is a one-liner that only sets
   the session-scoped log prefix.

3. Remove the pass-through ``synthetic_output`` parameter from
   ``_sanitize_request_input`` (server.py) and
   ``_build_tool_resume_repair`` (checkpoint.py). Both were internal
   helpers nothing else overrode — the caller trail simply forwarded
   the default to ``sanitize_tool_items``.

4. Restore the class-level Args docstring on ``LongRunningAgentServer``
   matching the origin/main shape. Existing param entries keep their
   original wording verbatim; the only additions are
   ``heartbeat_interval_seconds`` and
   ``heartbeat_stale_threshold_seconds``. The redundant ``__init__``
   docstring is removed so there's one source of param docs.

Net: -19 LOC in session.py, -27 LOC in server.py, -16 LOC in
checkpoint.py; the shared constant exists once; all 151 tests
(long_running + langchain checkpoint + openai session) pass.

Co-authored-by: Isaac
# Conflicts:
#	integrations/openai/src/databricks_openai/agents/session.py
#	integrations/openai/tests/unit_tests/test_session.py
Fix the two CI lint failures exposed after the main merge:
- repair.py: drop unused ``Optional`` import (F401)
- repair.py + server.py: ruff format reformatting

Co-authored-by: Isaac
``databricks_langchain.checkpoint`` is Python 3.10+ and re-exported
eagerly from ``databricks_langchain/__init__.py``. Placing
``DEFAULT_SYNTHETIC_INTERRUPTED_OUTPUT`` inside ``long_running/repair.py``
meant any ``import databricks_langchain`` triggered
``long_running/__init__.py``, which:

1. ``from .server import ...`` — ``server.py`` raises RuntimeError on
   Python < 3.11, breaking langchain tests on 3.10.
2. ``from .db import ...`` — pulls ``psycopg``, breaking any langchain
   test that doesn't have ``[memory]`` extras installed.

Both failed in CI. Move the file to top-level
``databricks_ai_bridge.tool_repair`` so it can be imported without
crossing the ``long_running/`` package boundary. Three import sites
updated (server, session, checkpoint) plus the test. No API break —
``sanitize_tool_items`` and ``DEFAULT_SYNTHETIC_INTERRUPTED_OUTPUT``
were still pre-release.

Co-authored-by: Isaac
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant