From f2df0bfac474456af637870c437629c9f705799d Mon Sep 17 00:00:00 2001 From: edlsh Date: Mon, 13 Apr 2026 09:39:01 -0400 Subject: [PATCH 1/2] fix(amp): preserve lowercase glob tool name --- internal/api/modules/amp/response_rewriter.go | 50 ++++++++++++++++++ .../api/modules/amp/response_rewriter_test.go | 51 +++++++++++++++++++ 2 files changed, 101 insertions(+) diff --git a/internal/api/modules/amp/response_rewriter.go b/internal/api/modules/amp/response_rewriter.go index 707fe576b4..895c494e74 100644 --- a/internal/api/modules/amp/response_rewriter.go +++ b/internal/api/modules/amp/response_rewriter.go @@ -123,6 +123,52 @@ func (rw *ResponseRewriter) Flush() { var modelFieldPaths = []string{"message.model", "model", "modelVersion", "response.model", "response.modelVersion"} +// ampCanonicalToolNames maps tool names to the exact casing expected by the +// Amp mode tool whitelist (case-sensitive match). +var ampCanonicalToolNames = map[string]string{ + "bash": "Bash", + "read": "Read", + "grep": "Grep", + "glob": "glob", + "task": "Task", + "check": "Check", +} + +// normalizeAmpToolNames fixes tool_use block names to match Amp's canonical casing. +// Some upstream models return lowercase tool names (e.g. "bash" instead of "Bash") +// which causes Amp's case-sensitive mode whitelist to reject them. +func normalizeAmpToolNames(data []byte) []byte { + // Non-streaming: content[].name in tool_use blocks + for index, block := range gjson.GetBytes(data, "content").Array() { + if block.Get("type").String() != "tool_use" { + continue + } + name := block.Get("name").String() + if canonical, ok := ampCanonicalToolNames[strings.ToLower(name)]; ok && name != canonical { + path := fmt.Sprintf("content.%d.name", index) + var err error + data, err = sjson.SetBytes(data, path, canonical) + if err != nil { + log.Warnf("Amp ResponseRewriter: failed to normalize tool name %q to %q: %v", name, canonical, err) + } + } + } + + // Streaming: content_block.name in content_block_start events + if gjson.GetBytes(data, "content_block.type").String() == "tool_use" { + name := gjson.GetBytes(data, "content_block.name").String() + if canonical, ok := ampCanonicalToolNames[strings.ToLower(name)]; ok && name != canonical { + var err error + data, err = sjson.SetBytes(data, "content_block.name", canonical) + if err != nil { + log.Warnf("Amp ResponseRewriter: failed to normalize streaming tool name %q to %q: %v", name, canonical, err) + } + } + } + + return data +} + // ensureAmpSignature injects empty signature fields into tool_use/thinking blocks // in API responses so that the Amp TUI does not crash on P.signature.length. func ensureAmpSignature(data []byte) []byte { @@ -179,6 +225,7 @@ func (rw *ResponseRewriter) suppressAmpThinking(data []byte) []byte { func (rw *ResponseRewriter) rewriteModelInResponse(data []byte) []byte { data = ensureAmpSignature(data) + data = normalizeAmpToolNames(data) data = rw.suppressAmpThinking(data) if len(data) == 0 { return data @@ -278,6 +325,9 @@ func (rw *ResponseRewriter) rewriteStreamEvent(data []byte) []byte { // Inject empty signature where needed data = ensureAmpSignature(data) + // Normalize tool names to canonical casing + data = normalizeAmpToolNames(data) + // Rewrite model name if rw.originalModel != "" { for _, path := range modelFieldPaths { diff --git a/internal/api/modules/amp/response_rewriter_test.go b/internal/api/modules/amp/response_rewriter_test.go index ac95dfc64f..a3a350cb23 100644 --- a/internal/api/modules/amp/response_rewriter_test.go +++ b/internal/api/modules/amp/response_rewriter_test.go @@ -175,6 +175,57 @@ func TestSanitizeAmpRequestBody_MixedInvalidThinkingAndToolUseSignature(t *testi } } +func TestNormalizeAmpToolNames_NonStreaming(t *testing.T) { + input := []byte(`{"content":[{"type":"tool_use","id":"toolu_01","name":"bash","input":{"cmd":"ls"}},{"type":"tool_use","id":"toolu_02","name":"read","input":{"path":"/tmp"}},{"type":"text","text":"hello"}]}`) + result := normalizeAmpToolNames(input) + + if !contains(result, []byte(`"name":"Bash"`)) { + t.Errorf("expected bash->Bash, got %s", string(result)) + } + if !contains(result, []byte(`"name":"Read"`)) { + t.Errorf("expected read->Read, got %s", string(result)) + } + if contains(result, []byte(`"name":"bash"`)) { + t.Errorf("expected lowercase bash to be replaced, got %s", string(result)) + } +} + +func TestNormalizeAmpToolNames_Streaming(t *testing.T) { + input := []byte(`{"type":"content_block_start","index":1,"content_block":{"type":"tool_use","name":"grep","id":"toolu_01","input":{}}}`) + result := normalizeAmpToolNames(input) + + if !contains(result, []byte(`"name":"Grep"`)) { + t.Errorf("expected grep->Grep in streaming, got %s", string(result)) + } +} + +func TestNormalizeAmpToolNames_AlreadyCorrect(t *testing.T) { + input := []byte(`{"content":[{"type":"tool_use","id":"toolu_01","name":"Bash","input":{"cmd":"ls"}}]}`) + result := normalizeAmpToolNames(input) + + if string(result) != string(input) { + t.Errorf("expected no modification for correctly-cased tool, got %s", string(result)) + } +} + +func TestNormalizeAmpToolNames_GlobPreserved(t *testing.T) { + input := []byte(`{"content":[{"type":"tool_use","id":"toolu_01","name":"glob","input":{"pattern":"*.go"}}]}`) + result := normalizeAmpToolNames(input) + + if string(result) != string(input) { + t.Errorf("expected glob to remain lowercase, got %s", string(result)) + } +} + +func TestNormalizeAmpToolNames_UnknownToolUntouched(t *testing.T) { + input := []byte(`{"content":[{"type":"tool_use","id":"toolu_01","name":"edit_file","input":{"path":"/tmp/x"}}]}`) + result := normalizeAmpToolNames(input) + + if string(result) != string(input) { + t.Errorf("expected no modification for unknown tool, got %s", string(result)) + } +} + func contains(data, substr []byte) bool { for i := 0; i <= len(data)-len(substr); i++ { if string(data[i:i+len(substr)]) == string(substr) { From 870cc1c40669213e1d114b696447f345ca1c9a32 Mon Sep 17 00:00:00 2001 From: edlsh Date: Fri, 24 Apr 2026 15:15:01 -0400 Subject: [PATCH 2/2] fix(openai): repair empty responses stream output --- .../openai/openai_responses_handlers.go | 122 +++++++++++++++++- .../openai_responses_handlers_stream_test.go | 34 ++++- 2 files changed, 151 insertions(+), 5 deletions(-) diff --git a/sdk/api/handlers/openai/openai_responses_handlers.go b/sdk/api/handlers/openai/openai_responses_handlers.go index 8969ce2f6d..67c648dcf3 100644 --- a/sdk/api/handlers/openai/openai_responses_handlers.go +++ b/sdk/api/handlers/openai/openai_responses_handlers.go @@ -13,6 +13,7 @@ import ( "fmt" "io" "net/http" + "sort" "github.com/gin-gonic/gin" . "github.com/router-for-me/CLIProxyAPI/v6/internal/constant" @@ -45,7 +46,9 @@ func writeResponsesSSEChunk(w io.Writer, chunk []byte) { } type responsesSSEFramer struct { - pending []byte + pending []byte + outputItems map[int][]byte + outputOrder []int } func (f *responsesSSEFramer) WriteChunk(w io.Writer, chunk []byte) { @@ -61,7 +64,7 @@ func (f *responsesSSEFramer) WriteChunk(w io.Writer, chunk []byte) { if frameLen == 0 { break } - writeResponsesSSEChunk(w, f.pending[:frameLen]) + f.writeFrame(w, f.pending[:frameLen]) copy(f.pending, f.pending[frameLen:]) f.pending = f.pending[:len(f.pending)-frameLen] } @@ -72,7 +75,7 @@ func (f *responsesSSEFramer) WriteChunk(w io.Writer, chunk []byte) { if len(f.pending) == 0 || !responsesSSECanEmitWithoutDelimiter(f.pending) { return } - writeResponsesSSEChunk(w, f.pending) + f.writeFrame(w, f.pending) f.pending = f.pending[:0] } @@ -88,10 +91,121 @@ func (f *responsesSSEFramer) Flush(w io.Writer) { f.pending = f.pending[:0] return } - writeResponsesSSEChunk(w, f.pending) + f.writeFrame(w, f.pending) f.pending = f.pending[:0] } +func (f *responsesSSEFramer) writeFrame(w io.Writer, frame []byte) { + writeResponsesSSEChunk(w, f.repairFrame(frame)) +} + +func (f *responsesSSEFramer) repairFrame(frame []byte) []byte { + payload, ok := responsesSSEDataPayload(frame) + if !ok || len(payload) == 0 || bytes.Equal(payload, []byte("[DONE]")) || !json.Valid(payload) { + return frame + } + + switch gjson.GetBytes(payload, "type").String() { + case "response.output_item.done": + f.recordOutputItem(payload) + case "response.completed": + repaired := f.repairCompletedPayload(payload) + if !bytes.Equal(repaired, payload) { + return responsesSSEFrameWithData(frame, repaired) + } + } + return frame +} + +func responsesSSEDataPayload(frame []byte) ([]byte, bool) { + var payload []byte + found := false + for _, line := range bytes.Split(frame, []byte("\n")) { + line = bytes.TrimRight(line, "\r") + trimmed := bytes.TrimSpace(line) + if !bytes.HasPrefix(trimmed, []byte("data:")) { + continue + } + data := bytes.TrimSpace(trimmed[len("data:"):]) + if found { + payload = append(payload, '\n') + } + payload = append(payload, data...) + found = true + } + return payload, found +} + +func responsesSSEFrameWithData(frame, payload []byte) []byte { + var out bytes.Buffer + for _, line := range bytes.Split(frame, []byte("\n")) { + line = bytes.TrimRight(line, "\r") + trimmed := bytes.TrimSpace(line) + if len(trimmed) == 0 || bytes.HasPrefix(trimmed, []byte("data:")) { + continue + } + out.Write(line) + out.WriteByte('\n') + } + out.WriteString("data: ") + out.Write(payload) + out.WriteString("\n\n") + return out.Bytes() +} + +func (f *responsesSSEFramer) recordOutputItem(payload []byte) { + item := gjson.GetBytes(payload, "item") + if !item.Exists() || !item.IsObject() || item.Get("type").String() == "" { + return + } + + index := len(f.outputOrder) + if outputIndex := gjson.GetBytes(payload, "output_index"); outputIndex.Exists() { + index = int(outputIndex.Int()) + } + if f.outputItems == nil { + f.outputItems = make(map[int][]byte) + } + if _, exists := f.outputItems[index]; !exists { + f.outputOrder = append(f.outputOrder, index) + } + f.outputItems[index] = append([]byte(nil), item.Raw...) +} + +func (f *responsesSSEFramer) repairCompletedPayload(payload []byte) []byte { + if len(f.outputOrder) == 0 { + return payload + } + output := gjson.GetBytes(payload, "response.output") + if output.Exists() && (!output.IsArray() || len(output.Array()) > 0) { + return payload + } + + var outputJSON bytes.Buffer + outputJSON.WriteByte('[') + indexes := append([]int(nil), f.outputOrder...) + sort.Ints(indexes) + written := 0 + for _, index := range indexes { + item, ok := f.outputItems[index] + if !ok { + continue + } + if written > 0 { + outputJSON.WriteByte(',') + } + outputJSON.Write(item) + written++ + } + outputJSON.WriteByte(']') + + repaired, err := sjson.SetRawBytes(payload, "response.output", outputJSON.Bytes()) + if err != nil { + return payload + } + return repaired +} + func responsesSSEFrameLen(chunk []byte) int { if len(chunk) == 0 { return 0 diff --git a/sdk/api/handlers/openai/openai_responses_handlers_stream_test.go b/sdk/api/handlers/openai/openai_responses_handlers_stream_test.go index ef16fe80ac..8b3f79e33d 100644 --- a/sdk/api/handlers/openai/openai_responses_handlers_stream_test.go +++ b/sdk/api/handlers/openai/openai_responses_handlers_stream_test.go @@ -10,6 +10,7 @@ import ( "github.com/router-for-me/CLIProxyAPI/v6/internal/interfaces" "github.com/router-for-me/CLIProxyAPI/v6/sdk/api/handlers" sdkconfig "github.com/router-for-me/CLIProxyAPI/v6/sdk/config" + "github.com/tidwall/gjson" ) func newResponsesStreamTestHandler(t *testing.T) (*OpenAIResponsesAPIHandler, *httptest.ResponseRecorder, *gin.Context, http.Flusher) { @@ -53,12 +54,43 @@ func TestForwardResponsesStreamSeparatesDataOnlySSEChunks(t *testing.T) { t.Errorf("unexpected first event.\nGot: %q\nWant: %q", parts[0], expectedPart1) } - expectedPart2 := "data: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp-1\",\"output\":[]}}" + expectedPart2 := "data: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp-1\",\"output\":[{\"type\":\"function_call\",\"arguments\":\"{}\"}]}}" if parts[1] != expectedPart2 { t.Errorf("unexpected second event.\nGot: %q\nWant: %q", parts[1], expectedPart2) } } +func TestForwardResponsesStreamRepairsEmptyCompletedOutputFromDoneItems(t *testing.T) { + h, recorder, c, flusher := newResponsesStreamTestHandler(t) + + data := make(chan []byte, 3) + errs := make(chan *interfaces.ErrorMessage) + data <- []byte(`data: {"type":"response.output_item.done","output_index":0,"item":{"type":"reasoning","id":"rs-1","summary":[]}}`) + data <- []byte(`data: {"type":"response.output_item.done","output_index":1,"item":{"type":"function_call","id":"fc-1","call_id":"call-1","name":"shell","arguments":"{\"cmd\":\"pwd\"}","status":"completed"}}`) + data <- []byte(`data: {"type":"response.completed","response":{"id":"resp-1","output":[]}}`) + close(data) + close(errs) + + h.forwardResponsesStream(c, flusher, func(error) {}, data, errs, nil) + + parts := strings.Split(strings.TrimSpace(recorder.Body.String()), "\n\n") + if len(parts) != 3 { + t.Fatalf("expected 3 SSE events, got %d. Body: %q", len(parts), recorder.Body.String()) + } + + payload := strings.TrimPrefix(parts[2], "data: ") + output := gjson.Get(payload, "response.output") + if !output.IsArray() || len(output.Array()) != 2 { + t.Fatalf("expected repaired completed output with 2 items, got %s", output.Raw) + } + if got := gjson.Get(payload, "response.output.1.name").String(); got != "shell" { + t.Fatalf("expected function_call name to be preserved, got %q in %s", got, payload) + } + if got := gjson.Get(payload, "response.output.1.arguments").String(); got != `{"cmd":"pwd"}` { + t.Fatalf("expected function_call arguments to be preserved, got %q in %s", got, payload) + } +} + func TestForwardResponsesStreamReassemblesSplitSSEEventChunks(t *testing.T) { h, recorder, c, flusher := newResponsesStreamTestHandler(t)