Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions crates/cli/tests/admin_expand.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
#![cfg(not(windows))]

use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::path::PathBuf;
use std::process::Command;
use std::sync::mpsc::{self, Receiver};
use std::thread::{self, JoinHandle};
use std::time::Duration;

#[derive(Debug)]
struct CapturedAdminRequest {
method: String,
target: String,
}

fn rc_binary() -> PathBuf {
if let Ok(path) = std::env::var("CARGO_BIN_EXE_rc") {
return PathBuf::from(path);
}

let workspace_root = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.parent()
.expect("cli crate has parent directory")
.parent()
.expect("workspace root exists")
.to_path_buf();

let debug_binary = workspace_root.join("target/debug/rc");
if debug_binary.exists() {
return debug_binary;
}

workspace_root.join("target/release/rc")
}

fn read_admin_request(stream: &mut TcpStream) -> CapturedAdminRequest {
stream
.set_read_timeout(Some(Duration::from_secs(5)))
.expect("set read timeout");

let mut request = Vec::new();
let mut buffer = [0_u8; 8192];
loop {
let bytes_read = stream.read(&mut buffer).expect("read admin request");
if bytes_read == 0 {
break;
}
request.extend_from_slice(&buffer[..bytes_read]);
if request.windows(4).any(|window| window == b"\r\n\r\n") {
break;
}
}

let request = String::from_utf8(request).expect("admin request should be UTF-8");
let request_line = request.lines().next().expect("request line");
let mut parts = request_line.split_whitespace();
let method = parts.next().expect("request method").to_string();
let target = parts.next().expect("request target").to_string();

CapturedAdminRequest { method, target }
}

fn start_admin_test_server(
response_body: &'static str,
) -> (String, Receiver<CapturedAdminRequest>, JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind admin test server");
let endpoint = format!("http://{}", listener.local_addr().expect("server address"));
let (sender, receiver) = mpsc::channel();

let handle = thread::spawn(move || {
let (mut stream, _) = listener.accept().expect("accept admin request");
let request = read_admin_request(&mut stream);
sender.send(request).expect("send captured request");

let response = format!(
"HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
response_body.len(),
response_body
);
stream
.write_all(response.as_bytes())
.expect("write admin response");
});

(endpoint, receiver, handle)
}

fn rc_host_alias(endpoint: &str) -> String {
let (_, endpoint_authority) = endpoint.split_once("://").expect("endpoint has scheme");
format!("http://ACCESS_KEY:SECRET_KEY@{endpoint_authority}")
}

#[test]
fn scale_start_dispatches_to_rebalance_start_with_expansion_json() {
let config_dir = tempfile::tempdir().expect("create config dir");
let (endpoint, receiver, handle) = start_admin_test_server(r#"{"id":"rebalance-123"}"#);

let output = Command::new(rc_binary())
.args(["--json", "admin", "scale", "start", "myalias"])
.env("RC_CONFIG_DIR", config_dir.path())
.env("RC_HOST_myalias", rc_host_alias(&endpoint))
.output()
.expect("run rc command");

assert!(
output.status.success(),
"stderr: {}",
String::from_utf8_lossy(&output.stderr)
);

let stdout = String::from_utf8(output.stdout).expect("stdout should be UTF-8");
let payload: serde_json::Value = serde_json::from_str(&stdout).expect("JSON output");
assert_eq!(payload["success"], true);
assert_eq!(
payload["message"],
"Expansion rebalance started successfully"
);
assert_eq!(payload["target"], "myalias");
assert_eq!(payload["id"], "rebalance-123");

let request = receiver
.recv_timeout(Duration::from_secs(5))
.expect("captured admin request");
assert_eq!(request.method, "POST");
assert_eq!(request.target, "/rustfs/admin/v3/rebalance/start");

handle.join().expect("admin test server finished");
}

#[test]
fn expand_stop_dispatches_to_rebalance_stop_with_expansion_json() {
let config_dir = tempfile::tempdir().expect("create config dir");
let (endpoint, receiver, handle) = start_admin_test_server("");

let output = Command::new(rc_binary())
.args(["--json", "admin", "expand", "stop", "myalias"])
.env("RC_CONFIG_DIR", config_dir.path())
.env("RC_HOST_myalias", rc_host_alias(&endpoint))
.output()
.expect("run rc command");

assert!(
output.status.success(),
"stderr: {}",
String::from_utf8_lossy(&output.stderr)
);

let stdout = String::from_utf8(output.stdout).expect("stdout should be UTF-8");
let payload: serde_json::Value = serde_json::from_str(&stdout).expect("JSON output");
assert_eq!(payload["success"], true);
assert_eq!(
payload["message"],
"Expansion rebalance stopped successfully"
);
assert_eq!(payload["target"], "myalias");
assert!(payload.get("id").is_none());

let request = receiver
.recv_timeout(Duration::from_secs(5))
.expect("captured admin request");
assert_eq!(request.method, "POST");
assert_eq!(request.target, "/rustfs/admin/v3/rebalance/stop");

handle.join().expect("admin test server finished");
}
Loading