Skip to content

Commit 28251e8

Browse files
authored
test(admin): cover expand command dispatch (#182)
1 parent c4a40e9 commit 28251e8

1 file changed

Lines changed: 166 additions & 0 deletions

File tree

crates/cli/tests/admin_expand.rs

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
#![cfg(not(windows))]
2+
3+
use std::io::{Read, Write};
4+
use std::net::{TcpListener, TcpStream};
5+
use std::path::PathBuf;
6+
use std::process::Command;
7+
use std::sync::mpsc::{self, Receiver};
8+
use std::thread::{self, JoinHandle};
9+
use std::time::Duration;
10+
11+
#[derive(Debug)]
12+
struct CapturedAdminRequest {
13+
method: String,
14+
target: String,
15+
}
16+
17+
fn rc_binary() -> PathBuf {
18+
if let Ok(path) = std::env::var("CARGO_BIN_EXE_rc") {
19+
return PathBuf::from(path);
20+
}
21+
22+
let workspace_root = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
23+
.parent()
24+
.expect("cli crate has parent directory")
25+
.parent()
26+
.expect("workspace root exists")
27+
.to_path_buf();
28+
29+
let debug_binary = workspace_root.join("target/debug/rc");
30+
if debug_binary.exists() {
31+
return debug_binary;
32+
}
33+
34+
workspace_root.join("target/release/rc")
35+
}
36+
37+
fn read_admin_request(stream: &mut TcpStream) -> CapturedAdminRequest {
38+
stream
39+
.set_read_timeout(Some(Duration::from_secs(5)))
40+
.expect("set read timeout");
41+
42+
let mut request = Vec::new();
43+
let mut buffer = [0_u8; 8192];
44+
loop {
45+
let bytes_read = stream.read(&mut buffer).expect("read admin request");
46+
if bytes_read == 0 {
47+
break;
48+
}
49+
request.extend_from_slice(&buffer[..bytes_read]);
50+
if request.windows(4).any(|window| window == b"\r\n\r\n") {
51+
break;
52+
}
53+
}
54+
55+
let request = String::from_utf8(request).expect("admin request should be UTF-8");
56+
let request_line = request.lines().next().expect("request line");
57+
let mut parts = request_line.split_whitespace();
58+
let method = parts.next().expect("request method").to_string();
59+
let target = parts.next().expect("request target").to_string();
60+
61+
CapturedAdminRequest { method, target }
62+
}
63+
64+
fn start_admin_test_server(
65+
response_body: &'static str,
66+
) -> (String, Receiver<CapturedAdminRequest>, JoinHandle<()>) {
67+
let listener = TcpListener::bind("127.0.0.1:0").expect("bind admin test server");
68+
let endpoint = format!("http://{}", listener.local_addr().expect("server address"));
69+
let (sender, receiver) = mpsc::channel();
70+
71+
let handle = thread::spawn(move || {
72+
let (mut stream, _) = listener.accept().expect("accept admin request");
73+
let request = read_admin_request(&mut stream);
74+
sender.send(request).expect("send captured request");
75+
76+
let response = format!(
77+
"HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
78+
response_body.len(),
79+
response_body
80+
);
81+
stream
82+
.write_all(response.as_bytes())
83+
.expect("write admin response");
84+
});
85+
86+
(endpoint, receiver, handle)
87+
}
88+
89+
fn rc_host_alias(endpoint: &str) -> String {
90+
let (_, endpoint_authority) = endpoint.split_once("://").expect("endpoint has scheme");
91+
format!("http://ACCESS_KEY:SECRET_KEY@{endpoint_authority}")
92+
}
93+
94+
#[test]
95+
fn scale_start_dispatches_to_rebalance_start_with_expansion_json() {
96+
let config_dir = tempfile::tempdir().expect("create config dir");
97+
let (endpoint, receiver, handle) = start_admin_test_server(r#"{"id":"rebalance-123"}"#);
98+
99+
let output = Command::new(rc_binary())
100+
.args(["--json", "admin", "scale", "start", "myalias"])
101+
.env("RC_CONFIG_DIR", config_dir.path())
102+
.env("RC_HOST_myalias", rc_host_alias(&endpoint))
103+
.output()
104+
.expect("run rc command");
105+
106+
assert!(
107+
output.status.success(),
108+
"stderr: {}",
109+
String::from_utf8_lossy(&output.stderr)
110+
);
111+
112+
let stdout = String::from_utf8(output.stdout).expect("stdout should be UTF-8");
113+
let payload: serde_json::Value = serde_json::from_str(&stdout).expect("JSON output");
114+
assert_eq!(payload["success"], true);
115+
assert_eq!(
116+
payload["message"],
117+
"Expansion rebalance started successfully"
118+
);
119+
assert_eq!(payload["target"], "myalias");
120+
assert_eq!(payload["id"], "rebalance-123");
121+
122+
let request = receiver
123+
.recv_timeout(Duration::from_secs(5))
124+
.expect("captured admin request");
125+
assert_eq!(request.method, "POST");
126+
assert_eq!(request.target, "/rustfs/admin/v3/rebalance/start");
127+
128+
handle.join().expect("admin test server finished");
129+
}
130+
131+
#[test]
132+
fn expand_stop_dispatches_to_rebalance_stop_with_expansion_json() {
133+
let config_dir = tempfile::tempdir().expect("create config dir");
134+
let (endpoint, receiver, handle) = start_admin_test_server("");
135+
136+
let output = Command::new(rc_binary())
137+
.args(["--json", "admin", "expand", "stop", "myalias"])
138+
.env("RC_CONFIG_DIR", config_dir.path())
139+
.env("RC_HOST_myalias", rc_host_alias(&endpoint))
140+
.output()
141+
.expect("run rc command");
142+
143+
assert!(
144+
output.status.success(),
145+
"stderr: {}",
146+
String::from_utf8_lossy(&output.stderr)
147+
);
148+
149+
let stdout = String::from_utf8(output.stdout).expect("stdout should be UTF-8");
150+
let payload: serde_json::Value = serde_json::from_str(&stdout).expect("JSON output");
151+
assert_eq!(payload["success"], true);
152+
assert_eq!(
153+
payload["message"],
154+
"Expansion rebalance stopped successfully"
155+
);
156+
assert_eq!(payload["target"], "myalias");
157+
assert!(payload.get("id").is_none());
158+
159+
let request = receiver
160+
.recv_timeout(Duration::from_secs(5))
161+
.expect("captured admin request");
162+
assert_eq!(request.method, "POST");
163+
assert_eq!(request.target, "/rustfs/admin/v3/rebalance/stop");
164+
165+
handle.join().expect("admin test server finished");
166+
}

0 commit comments

Comments
 (0)