Skip to content

Commit de54722

Browse files
authored
Merge pull request #221 from zeromq/rgbkrk/dealer-socket-split
implement Split for DealerSocket
2 parents 4006664 + 4e86a40 commit de54722

File tree

2 files changed

+162
-0
lines changed

2 files changed

+162
-0
lines changed

src/dealer.rs

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,3 +94,94 @@ impl SocketSend for DealerSocket {
9494
}
9595

9696
impl CaptureSocket for DealerSocket {}
97+
98+
impl DealerSocket {
99+
/// Splits the socket into separate send and recv halves, allowing concurrent
100+
/// sending and receiving from independent async tasks.
101+
///
102+
/// The underlying socket stays alive until both halves are dropped.
103+
pub fn split(mut self) -> (DealerSendHalf, DealerRecvHalf) {
104+
// Swap the real fields out before self drops. The dummy backend's
105+
// shutdown() is a no-op on an empty peer map.
106+
let backend = std::mem::replace(
107+
&mut self.backend,
108+
Arc::new(GenericSocketBackend::with_options(
109+
None,
110+
SocketType::DEALER,
111+
SocketOptions::default(),
112+
)),
113+
);
114+
let fair_queue = std::mem::replace(&mut self.fair_queue, FairQueue::new(true));
115+
let binds = std::mem::take(&mut self.binds);
116+
117+
let inner = Arc::new(DealerSocketInner {
118+
backend,
119+
_binds: binds,
120+
});
121+
122+
(
123+
DealerSendHalf {
124+
inner: inner.clone(),
125+
},
126+
DealerRecvHalf {
127+
_inner: inner,
128+
fair_queue,
129+
},
130+
)
131+
}
132+
}
133+
134+
struct DealerSocketInner {
135+
backend: Arc<GenericSocketBackend>,
136+
_binds: HashMap<Endpoint, AcceptStopHandle>,
137+
}
138+
139+
impl Drop for DealerSocketInner {
140+
fn drop(&mut self) {
141+
self.backend.shutdown();
142+
}
143+
}
144+
145+
/// The send half of a [`DealerSocket`] produced by [`DealerSocket::split`].
146+
pub struct DealerSendHalf {
147+
inner: Arc<DealerSocketInner>,
148+
}
149+
150+
/// The recv half of a [`DealerSocket`] produced by [`DealerSocket::split`].
151+
pub struct DealerRecvHalf {
152+
_inner: Arc<DealerSocketInner>,
153+
fair_queue: FairQueue<ZmqFramedRead, PeerIdentity>,
154+
}
155+
156+
#[async_trait]
157+
impl SocketSend for DealerSendHalf {
158+
async fn send(&mut self, message: ZmqMessage) -> ZmqResult<()> {
159+
self.inner
160+
.backend
161+
.send_round_robin(Message::Message(message))
162+
.await?;
163+
Ok(())
164+
}
165+
}
166+
167+
impl CaptureSocket for DealerSendHalf {}
168+
169+
#[async_trait]
170+
impl SocketRecv for DealerRecvHalf {
171+
async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
172+
loop {
173+
match self.fair_queue.next().await {
174+
Some((_peer_id, Ok(Message::Message(message)))) => {
175+
return Ok(message);
176+
}
177+
Some((_peer_id, Ok(_))) => {}
178+
Some((_peer_id, Err(e))) => {
179+
return Err(e.into());
180+
}
181+
None => {
182+
return Err(ZmqError::NoMessage);
183+
}
184+
};
185+
}
186+
}
187+
}

tests/dealer_split.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
#[cfg(test)]
2+
mod test {
3+
4+
use zeromq::__async_rt as async_rt;
5+
use zeromq::prelude::*;
6+
use zeromq::ZmqMessage;
7+
8+
use std::error::Error;
9+
use std::time::Duration;
10+
11+
fn assert_send<T: Send>() {}
12+
13+
#[test]
14+
fn split_halves_are_send() {
15+
assert_send::<zeromq::DealerSendHalf>();
16+
assert_send::<zeromq::DealerRecvHalf>();
17+
}
18+
19+
#[async_rt::test]
20+
async fn test_dealer_split_concurrent_send_recv() -> Result<(), Box<dyn Error>> {
21+
pretty_env_logger::try_init().ok();
22+
23+
// Use DEALER-to-DEALER (compatible pair, no envelope framing needed)
24+
let mut server = zeromq::DealerSocket::new();
25+
let endpoint = server.bind("tcp://localhost:0").await?;
26+
27+
let mut client = zeromq::DealerSocket::new();
28+
client.connect(endpoint.to_string().as_str()).await?;
29+
30+
// Give connection time to establish
31+
async_rt::task::sleep(Duration::from_millis(100)).await;
32+
33+
let (mut send_half, mut recv_half) = client.split();
34+
35+
let num_messages: u32 = 5;
36+
37+
// Server echo loop
38+
let server_task = async_rt::task::spawn(async move {
39+
for _ in 0..num_messages {
40+
let msg = server.recv().await.unwrap();
41+
server.send(msg).await.unwrap();
42+
}
43+
});
44+
45+
// Sender in its own task
46+
let send_task = async_rt::task::spawn(async move {
47+
for i in 0..num_messages {
48+
let msg = ZmqMessage::from(format!("msg-{}", i));
49+
send_half.send(msg).await.unwrap();
50+
}
51+
});
52+
53+
// Receiver in its own task
54+
let recv_task = async_rt::task::spawn(async move {
55+
for _ in 0..num_messages {
56+
let _reply = recv_half.recv().await.unwrap();
57+
}
58+
});
59+
60+
let timeout = Duration::from_secs(5);
61+
async_rt::task::timeout(timeout, async {
62+
send_task.await.unwrap();
63+
recv_task.await.unwrap();
64+
server_task.await.unwrap();
65+
})
66+
.await
67+
.expect("test timed out");
68+
69+
Ok(())
70+
}
71+
}

0 commit comments

Comments
 (0)