Skip to content

Commit f56c08f

Browse files
committed
server/json-rpc: Add JSON-RPC batch request support
The migration from jsonrpsee to axum in commit 58b0f25 lost batch request support (JSON arrays of requests). Restore it by parsing both single and batch formats, dispatching each request individually, and returning an array of responses for batches per the JSON-RPC 2.0 spec.
1 parent 019c4c0 commit f56c08f

File tree

2 files changed

+143
-33
lines changed

2 files changed

+143
-33
lines changed

server/json-rpc/src/handlers.rs

Lines changed: 88 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,46 @@ pub struct AppState<R> {
3939
pub logger: Logger,
4040
}
4141

42+
/// Dispatch a single JSON-RPC request to the appropriate method handler.
43+
///
44+
/// Returns `None` for notifications (requests without an id), since the
45+
/// JSON-RPC 2.0 spec says notifications must not produce a response.
46+
async fn dispatch_request<R: SubgraphRegistrar>(
47+
state: &Arc<AppState<R>>,
48+
request: JsonRpcRequest,
49+
) -> Option<JsonRpcResponse> {
50+
if !request.is_valid_version() {
51+
return Some(JsonRpcResponse::invalid_request());
52+
}
53+
54+
let id = request.id.clone().unwrap_or(JsonRpcId::Null);
55+
56+
// Notifications (no id) should not produce a response per JSON-RPC 2.0 spec.
57+
// But since all our methods have side effects, we still execute them;
58+
// we just don't return the response.
59+
let is_notification = request.id.is_none();
60+
61+
let response = match request.method.as_str() {
62+
"subgraph_create" => handle_create(state, &request, id).await,
63+
"subgraph_deploy" => handle_deploy(state, &request, id).await,
64+
"subgraph_remove" => handle_remove(state, &request, id).await,
65+
"subgraph_reassign" => handle_reassign(state, &request, id).await,
66+
"subgraph_pause" => handle_pause(state, &request, id).await,
67+
"subgraph_resume" => handle_resume(state, &request, id).await,
68+
_ => JsonRpcResponse::error(id, JsonRpcError::method_not_found()),
69+
};
70+
71+
if is_notification {
72+
None
73+
} else {
74+
Some(response)
75+
}
76+
}
77+
4278
/// Main JSON-RPC request handler.
4379
///
44-
/// Processes incoming JSON-RPC requests, dispatches to the appropriate method handler,
45-
/// and returns the response.
80+
/// Supports both single requests and batch requests (JSON arrays).
81+
/// Per JSON-RPC 2.0 spec, an empty batch array returns an invalid request error.
4682
pub async fn jsonrpc_handler<R: SubgraphRegistrar>(
4783
State(state): State<Arc<AppState<R>>>,
4884
ConnectInfo(remote_addr): ConnectInfo<SocketAddr>,
@@ -56,45 +92,64 @@ pub async fn jsonrpc_handler<R: SubgraphRegistrar>(
5692
.unwrap_or("unset")
5793
}
5894

59-
// Parse the JSON-RPC request
95+
let log_request = |method: &str, params: &Option<JsonValue>| {
96+
info!(
97+
&state.logger,
98+
"JSON-RPC call";
99+
"method" => method,
100+
"params" => ?params,
101+
"remote_addr" => %remote_addr,
102+
"x_forwarded_for" => header(&headers, "x-forwarded-for"),
103+
"x_real_ip" => header(&headers, "x-real-ip"),
104+
"x_forwarded_proto" => header(&headers, "x-forwarded-proto")
105+
);
106+
};
107+
108+
// Try batch (JSON array) first, then single request.
109+
if let Ok(requests) = serde_json::from_str::<Vec<JsonRpcRequest>>(&body) {
110+
if requests.is_empty() {
111+
let resp = serde_json::to_value(JsonRpcResponse::invalid_request())
112+
.expect("failed to serialize response");
113+
return (StatusCode::OK, Json(resp));
114+
}
115+
116+
info!(
117+
&state.logger,
118+
"JSON-RPC batch request";
119+
"batch_size" => requests.len(),
120+
"remote_addr" => %remote_addr,
121+
);
122+
123+
let mut responses = Vec::new();
124+
for request in requests {
125+
log_request(&request.method, &request.params);
126+
if let Some(resp) = dispatch_request(&state, request).await {
127+
responses.push(resp);
128+
}
129+
}
130+
131+
let value = serde_json::to_value(responses).expect("failed to serialize responses");
132+
return (StatusCode::OK, Json(value));
133+
}
134+
135+
// Single request
60136
let request: JsonRpcRequest = match serde_json::from_str(&body) {
61137
Ok(req) => req,
62138
Err(_) => {
63-
return (StatusCode::OK, Json(JsonRpcResponse::parse_error()));
139+
let resp = serde_json::to_value(JsonRpcResponse::parse_error())
140+
.expect("failed to serialize response");
141+
return (StatusCode::OK, Json(resp));
64142
}
65143
};
66144

67-
// Validate JSON-RPC version
68-
if !request.is_valid_version() {
69-
return (StatusCode::OK, Json(JsonRpcResponse::invalid_request()));
70-
}
71-
72-
let id = request.id.clone().unwrap_or(JsonRpcId::Null);
73-
74-
// Log the method call
75-
info!(
76-
&state.logger,
77-
"JSON-RPC call";
78-
"method" => &request.method,
79-
"params" => ?request.params,
80-
"remote_addr" => %remote_addr,
81-
"x_forwarded_for" => header(&headers, "x-forwarded-for"),
82-
"x_real_ip" => header(&headers, "x-real-ip"),
83-
"x_forwarded_proto" => header(&headers, "x-forwarded-proto")
84-
);
145+
log_request(&request.method, &request.params);
85146

86-
// Dispatch to the appropriate handler
87-
let response = match request.method.as_str() {
88-
"subgraph_create" => handle_create(&state, &request, id.clone()).await,
89-
"subgraph_deploy" => handle_deploy(&state, &request, id.clone()).await,
90-
"subgraph_remove" => handle_remove(&state, &request, id.clone()).await,
91-
"subgraph_reassign" => handle_reassign(&state, &request, id.clone()).await,
92-
"subgraph_pause" => handle_pause(&state, &request, id.clone()).await,
93-
"subgraph_resume" => handle_resume(&state, &request, id.clone()).await,
94-
_ => JsonRpcResponse::error(id, JsonRpcError::method_not_found()),
95-
};
147+
let response = dispatch_request(&state, request)
148+
.await
149+
.unwrap_or_else(|| JsonRpcResponse::success(JsonRpcId::Null, JsonValue::Null));
96150

97-
(StatusCode::OK, Json(response))
151+
let value = serde_json::to_value(response).expect("failed to serialize response");
152+
(StatusCode::OK, Json(value))
98153
}
99154

100155
/// Parse parameters from a JSON-RPC request.

server/json-rpc/src/jsonrpc.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,4 +195,59 @@ mod tests {
195195
assert!(json.contains(r#""id":"req-1""#));
196196
assert!(!json.contains("result"));
197197
}
198+
199+
#[test]
200+
fn deserialize_batch_request() {
201+
let json = r#"[
202+
{"jsonrpc":"2.0","method":"subgraph_create","id":1},
203+
{"jsonrpc":"2.0","method":"subgraph_remove","id":2}
204+
]"#;
205+
let requests: Vec<JsonRpcRequest> = serde_json::from_str(json).unwrap();
206+
assert_eq!(requests.len(), 2);
207+
assert_eq!(requests[0].method, "subgraph_create");
208+
assert_eq!(requests[0].id, Some(JsonRpcId::Number(1)));
209+
assert_eq!(requests[1].method, "subgraph_remove");
210+
assert_eq!(requests[1].id, Some(JsonRpcId::Number(2)));
211+
}
212+
213+
#[test]
214+
fn serialize_batch_response() {
215+
let responses = vec![
216+
JsonRpcResponse::success(JsonRpcId::Number(1), serde_json::json!({"ok": true})),
217+
JsonRpcResponse::error(
218+
JsonRpcId::Number(2),
219+
JsonRpcError::new(-32601, "Method not found"),
220+
),
221+
];
222+
let json = serde_json::to_string(&responses).unwrap();
223+
// Must be a JSON array
224+
assert!(json.starts_with('['));
225+
assert!(json.ends_with(']'));
226+
// Both responses present
227+
let parsed: Vec<serde_json::Value> = serde_json::from_str(&json).unwrap();
228+
assert_eq!(parsed.len(), 2);
229+
assert_eq!(parsed[0]["id"], 1);
230+
assert_eq!(parsed[1]["id"], 2);
231+
}
232+
233+
#[test]
234+
fn batch_with_notification_omits_response() {
235+
// A notification has no id — when filtered, the batch response should
236+
// contain fewer entries than the batch request.
237+
let json = r#"[
238+
{"jsonrpc":"2.0","method":"subgraph_create","id":1},
239+
{"jsonrpc":"2.0","method":"subgraph_remove"}
240+
]"#;
241+
let requests: Vec<JsonRpcRequest> = serde_json::from_str(json).unwrap();
242+
assert_eq!(requests.len(), 2);
243+
assert!(requests[0].id.is_some());
244+
assert!(requests[1].id.is_none());
245+
}
246+
247+
#[test]
248+
fn empty_batch_is_valid_json_array() {
249+
let json = "[]";
250+
let requests: Vec<JsonRpcRequest> = serde_json::from_str(json).unwrap();
251+
assert!(requests.is_empty());
252+
}
198253
}

0 commit comments

Comments
 (0)