Skip to content

Commit f275daf

Browse files
authored
fix(workflow): persist and filter execution metadata across adapters (#1085)
1 parent 95ad610 commit f275daf

File tree

10 files changed

+194
-9
lines changed

10 files changed

+194
-9
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
---
2+
"@voltagent/core": patch
3+
"@voltagent/server-core": patch
4+
"@voltagent/libsql": patch
5+
"@voltagent/cloudflare-d1": patch
6+
---
7+
8+
Fix workflow execution filtering by persisted metadata across adapters.
9+
10+
- Persist `options.metadata` on workflow execution state so `/workflows/executions` filters can match tenant/user metadata.
11+
- Preserve existing execution metadata when updating cancelled/error workflow states.
12+
- Accept `options.metadata` in server workflow execution request schema.
13+
- Fix LibSQL and Cloudflare D1 JSON metadata query comparisons for `metadata` and `metadata.<key>` filters.
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import type { D1Database } from "@cloudflare/workers-types";
2+
import { describe, expect, it, vi } from "vitest";
3+
import { D1MemoryAdapter } from "./memory-adapter";
4+
5+
function createMockBinding(): D1Database {
6+
return {
7+
prepare: vi.fn(() => ({
8+
bind: vi.fn().mockReturnThis(),
9+
run: vi.fn().mockResolvedValue({}),
10+
all: vi.fn().mockResolvedValue({ results: [] }),
11+
})),
12+
batch: vi.fn().mockResolvedValue([]),
13+
} as unknown as D1Database;
14+
}
15+
16+
describe("D1MemoryAdapter queryWorkflowRuns", () => {
17+
it("builds metadata filters with JSON-aware comparisons", async () => {
18+
vi.spyOn(D1MemoryAdapter.prototype as any, "ensureInitialized").mockResolvedValue(undefined);
19+
20+
const adapter = new D1MemoryAdapter({
21+
binding: createMockBinding(),
22+
tablePrefix: "test",
23+
});
24+
25+
const allSpy = vi.spyOn(adapter as any, "all").mockResolvedValue([
26+
{
27+
id: "exec-1",
28+
workflow_id: "workflow-1",
29+
workflow_name: "Workflow 1",
30+
status: "completed",
31+
input: '{"requestId":"req-1"}',
32+
context: '[["tenantId","acme"]]',
33+
workflow_state: '{"phase":"done"}',
34+
suspension: null,
35+
events: null,
36+
output: null,
37+
cancellation: null,
38+
user_id: "user-1",
39+
conversation_id: null,
40+
metadata: '{"tenantId":"acme"}',
41+
created_at: "2024-01-02T00:00:00Z",
42+
updated_at: "2024-01-02T00:00:00Z",
43+
},
44+
]);
45+
46+
const result = await adapter.queryWorkflowRuns({
47+
workflowId: "workflow-1",
48+
status: "completed",
49+
from: new Date("2024-01-01T00:00:00Z"),
50+
to: new Date("2024-01-03T00:00:00Z"),
51+
userId: "user-1",
52+
metadata: { tenantId: "acme" },
53+
limit: 5,
54+
offset: 2,
55+
});
56+
57+
expect(result).toHaveLength(1);
58+
expect(result[0]?.input).toEqual({ requestId: "req-1" });
59+
expect(result[0]?.context).toEqual([["tenantId", "acme"]]);
60+
expect(result[0]?.workflowState).toEqual({ phase: "done" });
61+
62+
expect(allSpy).toHaveBeenCalledTimes(1);
63+
const [sql, args] = allSpy.mock.calls[0];
64+
65+
expect(sql).toContain("workflow_id = ?");
66+
expect(sql).toContain("status = ?");
67+
expect(sql).toContain("created_at >=");
68+
expect(sql).toContain("user_id = ?");
69+
expect(sql).toContain("json_extract(metadata, ?) = json_extract(json(?), '$')");
70+
expect(sql).toContain("ORDER BY created_at DESC");
71+
expect(args).toEqual([
72+
"workflow-1",
73+
"completed",
74+
"2024-01-01T00:00:00.000Z",
75+
"2024-01-03T00:00:00.000Z",
76+
"user-1",
77+
'$."tenantId"',
78+
'"acme"',
79+
5,
80+
2,
81+
]);
82+
});
83+
});

packages/cloudflare-d1/src/memory-adapter.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1375,7 +1375,7 @@ export class D1MemoryAdapter implements StorageAdapter {
13751375
continue;
13761376
}
13771377

1378-
conditions.push("json_extract(metadata, ?) = json(?)");
1378+
conditions.push("json_extract(metadata, ?) = json_extract(json(?), '$')");
13791379
args.push(metadataPath, safeStringify(value));
13801380
}
13811381
}

packages/core/src/workflow/core.spec.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,55 @@ describe.sequential("workflow.run", () => {
284284
expect(persistedState?.userId).toBe("user-test-1");
285285
expect(persistedState?.conversationId).toBe("conv-test-1");
286286
});
287+
288+
it("should persist custom metadata in workflow state", async () => {
289+
const memory = new Memory({ storage: new InMemoryStorageAdapter() });
290+
291+
const workflow = createWorkflow(
292+
{
293+
id: "workflow-metadata-context",
294+
name: "Workflow Metadata Context",
295+
input: z.object({
296+
value: z.string(),
297+
}),
298+
result: z.object({
299+
value: z.string(),
300+
}),
301+
memory,
302+
},
303+
andThen({
304+
id: "echo",
305+
execute: async ({ data }) => data,
306+
}),
307+
);
308+
309+
const registry = WorkflowRegistry.getInstance();
310+
registry.registerWorkflow(workflow);
311+
312+
const result = await workflow.run(
313+
{ value: "ok" },
314+
{
315+
userId: "user-test-1",
316+
metadata: {
317+
tenantId: "acme",
318+
region: "us-east-1",
319+
flags: { plan: "pro" },
320+
},
321+
},
322+
);
323+
324+
const persistedState = await memory.getWorkflowState(result.executionId);
325+
326+
expect(persistedState?.metadata).toEqual(
327+
expect.objectContaining({
328+
tenantId: "acme",
329+
region: "us-east-1",
330+
flags: { plan: "pro" },
331+
traceId: expect.any(String),
332+
spanId: expect.any(String),
333+
}),
334+
);
335+
});
287336
});
288337

289338
describe.sequential("workflow streaming", () => {

packages/core/src/workflow/core.ts

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -747,6 +747,14 @@ export function createWorkflow<
747747
executionId = options?.executionId || randomUUID();
748748
}
749749

750+
const mergeExecutionMetadata = async (patch: Record<string, unknown>) => {
751+
const existingState = await executionMemory.getWorkflowState(executionId);
752+
return {
753+
...(existingState?.metadata ?? {}),
754+
...patch,
755+
};
756+
};
757+
750758
// Only create stream controller if one is provided (for streaming execution)
751759
// For normal run, we don't need a stream controller
752760
const streamController = externalStreamController || null;
@@ -813,6 +821,10 @@ export function createWorkflow<
813821
: options?.context
814822
? new Map(Object.entries(options.context))
815823
: new Map();
824+
const optionMetadata =
825+
options?.metadata && typeof options.metadata === "object" && !Array.isArray(options.metadata)
826+
? options.metadata
827+
: undefined;
816828
const workflowStateStore = options?.workflowState ?? {};
817829

818830
// Get previous trace IDs if resuming
@@ -928,6 +940,7 @@ export function createWorkflow<
928940
userId: options?.userId,
929941
conversationId: options?.conversationId,
930942
metadata: {
943+
...(optionMetadata ?? {}),
931944
traceId: rootSpan.spanContext().traceId,
932945
spanId: rootSpan.spanContext().spanId,
933946
},
@@ -1219,10 +1232,10 @@ export function createWorkflow<
12191232
cancelledAt: new Date(),
12201233
reason,
12211234
},
1222-
metadata: {
1235+
metadata: await mergeExecutionMetadata({
12231236
...(stateManager.state?.usage ? { usage: stateManager.state.usage } : {}),
12241237
cancellationReason: reason,
1225-
},
1238+
}),
12261239
updatedAt: new Date(),
12271240
});
12281241
} catch (memoryError) {
@@ -1943,10 +1956,10 @@ export function createWorkflow<
19431956
await executionMemory.updateWorkflowState(executionId, {
19441957
status: "cancelled",
19451958
workflowState: stateManager.state.workflowState,
1946-
metadata: {
1959+
metadata: await mergeExecutionMetadata({
19471960
...(stateManager.state?.usage ? { usage: stateManager.state.usage } : {}),
19481961
cancellationReason,
1949-
},
1962+
}),
19501963
updatedAt: new Date(),
19511964
});
19521965
} catch (memoryError) {
@@ -2041,10 +2054,10 @@ export function createWorkflow<
20412054
workflowState: stateManager.state.workflowState,
20422055
events: collectedEvents,
20432056
// Store a lightweight error summary in metadata for debugging
2044-
metadata: {
2057+
metadata: await mergeExecutionMetadata({
20452058
...(stateManager.state?.usage ? { usage: stateManager.state.usage } : {}),
20462059
errorMessage: error instanceof Error ? error.message : String(error),
2047-
},
2060+
}),
20482061
updatedAt: new Date(),
20492062
});
20502063
} catch (memoryError) {

packages/core/src/workflow/types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,10 @@ export interface WorkflowRunOptions {
240240
* The user ID, this can be used to track the current user in a workflow
241241
*/
242242
userId?: string;
243+
/**
244+
* Additional execution metadata persisted with workflow state
245+
*/
246+
metadata?: Record<string, unknown>;
243247
/**
244248
* The user context, this can be used to track the current user context in a workflow
245249
*/

packages/libsql/src/memory-core.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1214,7 +1214,7 @@ export class LibSQLMemoryCore implements StorageAdapter {
12141214
continue;
12151215
}
12161216

1217-
conditions.push("json_extract(metadata, ?) = json(?)");
1217+
conditions.push("json_extract(metadata, ?) = json_extract(json(?), '$')");
12181218
args.push(metadataPath, safeStringify(value));
12191219
}
12201220
}

packages/libsql/src/memory-v2-adapter.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ describe.sequential("LibSQLMemoryAdapter - Advanced Behavior", () => {
8585
expect(sql).toContain("status = ?");
8686
expect(sql).toContain("created_at >=");
8787
expect(sql).toContain("user_id = ?");
88-
expect(sql).toContain("json_extract(metadata, ?) = json(?)");
88+
expect(sql).toContain("json_extract(metadata, ?) = json_extract(json(?), '$')");
8989
expect(sql).toContain("ORDER BY created_at DESC");
9090
expect(args).toEqual([
9191
"workflow-1",
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { describe, expect, it } from "vitest";
2+
import { WorkflowExecutionRequestSchema } from "./agent.schemas";
3+
4+
describe("WorkflowExecutionRequestSchema", () => {
5+
it("accepts options.metadata payload", () => {
6+
const parsed = WorkflowExecutionRequestSchema.parse({
7+
input: { value: 1 },
8+
options: {
9+
userId: "user-1",
10+
metadata: {
11+
tenantId: "acme",
12+
region: "us-east-1",
13+
},
14+
},
15+
});
16+
17+
expect(parsed.options?.metadata).toEqual({
18+
tenantId: "acme",
19+
region: "us-east-1",
20+
});
21+
});
22+
});

packages/server-core/src/schemas/agent.schemas.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,7 @@ export const WorkflowExecutionRequestSchema = z.object({
359359
executionId: z.string().optional(),
360360
context: z.any().optional(),
361361
workflowState: z.record(z.any()).optional(),
362+
metadata: z.record(z.any()).optional(),
362363
})
363364
.optional()
364365
.describe("Optional execution options"),

0 commit comments

Comments
 (0)