Skip to content

Commit b0cbe10

Browse files
committed
P2p: improve timeout_when_socket_not_read test, so that it also checks disconnection timeout. Introduce BackendObserver, rename peer_manager::Observer to PeerManagerObserver
1 parent b31434d commit b0cbe10

File tree

11 files changed

+226
-37
lines changed

11 files changed

+226
-37
lines changed

p2p/src/net/default_backend/backend.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use crate::{
4747
net::{
4848
default_backend::{
4949
peer,
50-
types::{BackendEvent, Command, PeerEvent},
50+
types::{BackendEvent, BackendObserver, Command, PeerEvent},
5151
},
5252
types::{
5353
services::Services, ConnectivityEvent, PeerInfo, PeerManagerMessageExt, SyncingEvent,
@@ -169,6 +169,9 @@ pub struct Backend<T: TransportSocket> {
169169
/// equal to default_networking_service::PREFERRED_PROTOCOL_VERSION, but it can be
170170
/// overridden for testing purposes.
171171
node_protocol_version: ProtocolVersion,
172+
173+
/// Observer object, used by tests.
174+
observer: Option<Arc<dyn BackendObserver + Send + Sync>>,
172175
}
173176

174177
impl<T> Backend<T>
@@ -190,6 +193,7 @@ where
190193
shutdown_receiver: oneshot::Receiver<()>,
191194
subscribers_receiver: mpsc::UnboundedReceiver<P2pEventHandler>,
192195
node_protocol_version: ProtocolVersion,
196+
observer: Option<Arc<dyn BackendObserver + Send + Sync>>,
193197
) -> Self {
194198
Self {
195199
networking_enabled,
@@ -210,6 +214,7 @@ where
210214
events_controller: EventsController::new(),
211215
subscribers_receiver,
212216
node_protocol_version,
217+
observer,
213218
}
214219
}
215220

@@ -403,6 +408,7 @@ where
403408
backend_event_receiver,
404409
self.node_protocol_version,
405410
self.time_getter.shallow_clone(),
411+
self.observer.clone(),
406412
)?;
407413
let shutdown = Arc::clone(&self.shutdown);
408414
let handle = tokio_spawn_in_current_tracing_span(

p2p/src/net/default_backend/default_networking_service.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use utils::{atomics::SeqCstAtomicBool, tokio_spawn_in_tracing_span};
2929

3030
use crate::{
3131
error::P2pError,
32-
net::NetworkingService,
32+
net::{default_backend::types::BackendObserver, NetworkingService},
3333
protocol::{ProtocolVersion, SupportedProtocolVersion},
3434
P2pConfig, P2pEventHandler,
3535
};
@@ -64,6 +64,7 @@ impl<T: TransportSocket> DefaultNetworkingService<T> {
6464
shutdown_receiver: oneshot::Receiver<()>,
6565
subscribers_receiver: mpsc::UnboundedReceiver<P2pEventHandler>,
6666
protocol_version: ProtocolVersion,
67+
observer: Option<Arc<dyn BackendObserver + Send + Sync>>,
6768
tracing_span: tracing::Span,
6869
) -> crate::Result<(
6970
<Self as NetworkingService>::ConnectivityHandle,
@@ -95,6 +96,7 @@ impl<T: TransportSocket> DefaultNetworkingService<T> {
9596
shutdown_receiver,
9697
subscribers_receiver,
9798
protocol_version,
99+
observer,
98100
);
99101
let backend_task = tokio_spawn_in_tracing_span(
100102
async move {
@@ -154,6 +156,7 @@ impl<T: TransportSocket> DefaultNetworkingService<T> {
154156
shutdown_receiver,
155157
subscribers_receiver,
156158
protocol_version,
159+
None,
157160
tracing::Span::current(),
158161
)
159162
}

p2p/src/net/default_backend/peer/mod.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use crate::{
4646
net::{
4747
default_backend::{
4848
peer::handshake_handler::HandshakeHandler,
49-
types::{BackendEvent, MessageDebugLogSummary, MessageTag, PeerEvent},
49+
types::{BackendEvent, BackendObserver, MessageDebugLogSummary, MessageTag, PeerEvent},
5050
},
5151
types::PeerManagerMessageExt,
5252
},
@@ -96,6 +96,9 @@ pub struct Peer<T: TransportSocket> {
9696

9797
/// Time getter
9898
time_getter: TimeGetter,
99+
100+
/// Observer object, used by tests.
101+
observer: Option<Arc<dyn BackendObserver + Send + Sync>>,
99102
}
100103

101104
impl<T> Peer<T>
@@ -113,6 +116,7 @@ where
113116
backend_event_receiver: mpsc::UnboundedReceiver<BackendEvent>,
114117
node_protocol_version: ProtocolVersion,
115118
time_getter: TimeGetter,
119+
observer: Option<Arc<dyn BackendObserver + Send + Sync>>,
116120
) -> crate::Result<Self> {
117121
let peer_address = socket.remote_address()?.as_peer_address();
118122

@@ -131,6 +135,7 @@ where
131135
backend_event_receiver,
132136
node_protocol_version,
133137
time_getter,
138+
observer,
134139
})
135140
}
136141

@@ -206,6 +211,7 @@ where
206211
mut backend_event_receiver,
207212
node_protocol_version,
208213
time_getter,
214+
observer,
209215
} = self;
210216

211217
let handshake_handler = HandshakeHandler::new(
@@ -238,6 +244,7 @@ where
238244
socket_writer,
239245
writer_cmd_receiver,
240246
writer_event_sender,
247+
observer.clone(),
241248
);
242249

243250
// Note: if the outer Option is set, an explicit disconnection should be initiated via
@@ -288,6 +295,10 @@ where
288295
}
289296
event = socket_reader.recv(), if sync_msg_senders_opt.is_some() => match event {
290297
Ok(message) => {
298+
if let Some(observer) = &observer {
299+
observer.on_message_read(peer_id, &message);
300+
}
301+
291302
let sync_msg_senders = sync_msg_senders_opt.as_mut().expect("sync_msg_senders_opt is some");
292303
Self::handle_socket_msg(
293304
message,
@@ -416,14 +427,17 @@ fn spawn_writer<S: PeerStream + 'static>(
416427
socket_writer: MessageWriter<S, Message>,
417428
cmd_receiver: mpsc::UnboundedReceiver<WriterCommand>,
418429
event_sender: mpsc::UnboundedSender<WriterEvent>,
430+
observer: Option<Arc<dyn BackendObserver + Send + Sync>>,
419431
) -> JoinHandle<()> {
420432
tokio_spawn_in_current_tracing_span(
421433
async move {
422434
let writer_result = writer_loop(
423435
&p2p_config,
436+
peer_id,
424437
common_protocol_version,
425438
socket_writer,
426439
cmd_receiver,
440+
observer,
427441
)
428442
.await;
429443

@@ -437,9 +451,11 @@ fn spawn_writer<S: PeerStream + 'static>(
437451

438452
async fn writer_loop<S: PeerStream>(
439453
p2p_config: &P2pConfig,
454+
peer_id: PeerId,
440455
common_protocol_version: SupportedProtocolVersion,
441456
mut socket_writer: MessageWriter<S, Message>,
442457
mut cmd_receiver: mpsc::UnboundedReceiver<WriterCommand>,
458+
observer: Option<Arc<dyn BackendObserver + Send + Sync>>,
443459
) -> crate::Result<()> {
444460
while let Some(cmd) = cmd_receiver.recv().await {
445461
match cmd {
@@ -450,6 +466,10 @@ async fn writer_loop<S: PeerStream>(
450466
message.encoded_size()
451467
);
452468

469+
if let Some(observer) = &observer {
470+
observer.on_message_write(peer_id, &message);
471+
}
472+
453473
tokio::time::timeout(
454474
*p2p_config.backend_timeouts.socket_write_timeout,
455475
socket_writer.send(*message),

p2p/src/net/default_backend/types.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,15 @@ pub fn can_send_will_disconnect(peer_protocol_version: ProtocolVersion) -> bool
410410
peer_protocol_version >= SupportedProtocolVersion::V3.into()
411411
}
412412

413+
/// Backend observer, used by tests.
414+
pub trait BackendObserver {
415+
/// Called before the message is written to the socket.
416+
fn on_message_write(&self, peer_id: PeerId, msg: &Message);
417+
418+
/// Called after the message has been read from the socket.
419+
fn on_message_read(&self, peer_id: PeerId, msg: &Message);
420+
}
421+
413422
#[cfg(test)]
414423
mod tests {
415424
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

p2p/src/peer_manager/mod.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ where
203203
addr_list_response_cache: AddrListResponseCache,
204204

205205
/// PeerManager's observer for use by tests.
206-
observer: Option<Box<dyn Observer + Send>>,
206+
observer: Option<Box<dyn PeerManagerObserver + Send>>,
207207

208208
/// Normally, this will be DefaultDnsSeed, which performs the actual address lookup, but tests can
209209
/// substitute it with a mock implementation.
@@ -274,7 +274,7 @@ where
274274
peer_mgr_event_receiver: mpsc::UnboundedReceiver<PeerManagerEvent>,
275275
time_getter: TimeGetter,
276276
peerdb_storage: S,
277-
observer: Option<Box<dyn Observer + Send>>,
277+
observer: Option<Box<dyn PeerManagerObserver + Send>>,
278278
dns_seed: Box<dyn DnsSeed + Send>,
279279
mut rng: impl RngCore + Send + 'static,
280280
) -> crate::Result<Self> {
@@ -1157,7 +1157,7 @@ where
11571157
}
11581158

11591159
if let Some(o) = self.observer.as_mut() {
1160-
o.on_connection_accepted(peer_address, peer_role)
1160+
o.on_connection_accepted(peer_address, peer_id, peer_role)
11611161
}
11621162

11631163
Ok(())
@@ -2455,7 +2455,7 @@ where
24552455
}
24562456
}
24572457

2458-
pub trait Observer {
2458+
pub trait PeerManagerObserver {
24592459
fn on_peer_ban_score_adjustment(&mut self, address: SocketAddress, new_score: u32);
24602460

24612461
fn on_peer_ban(&mut self, address: BannableAddress);
@@ -2466,7 +2466,12 @@ pub trait Observer {
24662466
fn on_heartbeat(&mut self);
24672467

24682468
// This will be called for both incoming and outgoing connections.
2469-
fn on_connection_accepted(&mut self, address: SocketAddress, peer_role: PeerRole);
2469+
fn on_connection_accepted(
2470+
&mut self,
2471+
address: SocketAddress,
2472+
peer_id: PeerId,
2473+
peer_role: PeerRole,
2474+
);
24702475

24712476
// This will be called after `ConnectivityEvent::ConnectionError` has been handled by
24722477
// the peer manager.

p2p/src/peer_manager/tests/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use crate::{
5353
},
5454
peer_manager::PeerManager,
5555
test_helpers::{peerdb_inmemory_store, test_p2p_config},
56-
tests::helpers::{PeerManagerNotification, PeerManagerObserver},
56+
tests::helpers::{PeerManagerNotification, PeerManagerObserverImpl},
5757
types::peer_id::PeerId,
5858
utils::oneshot_nofail,
5959
P2pConfig, P2pEventHandler, PeerManagerEvent,
@@ -173,7 +173,7 @@ pub fn make_standalone_peer_manager(
173173
conn_event_receiver,
174174
);
175175
let (peer_mgr_notification_sender, peer_mgr_notification_receiver) = mpsc::unbounded_channel();
176-
let peer_mgr_observer = Box::new(PeerManagerObserver::new(peer_mgr_notification_sender));
176+
let peer_mgr_observer = Box::new(PeerManagerObserverImpl::new(peer_mgr_notification_sender));
177177
let dns_seed = DefaultDnsSeed::new(Arc::clone(&chain_config), Arc::clone(&p2p_config));
178178

179179
let peer_mgr = PeerManager::<TcpNetworkingService, _>::new_generic(

p2p/src/peer_manager/tests/unsuccessful_connection_counter_update.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1145,6 +1145,7 @@ async fn peer_accepted_by_peer_mgr(
11451145
peer_mgr_notification,
11461146
PeerManagerNotification::ConnectionAccepted {
11471147
address: peer_address,
1148+
peer_id,
11481149
peer_role
11491150
}
11501151
);

p2p/src/tests/connection_lockup_when_socket_not_read.rs

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use p2p_test_utils::run_with_timeout;
3232
use randomness::Rng;
3333
use serialization::Encode as _;
3434
use test_utils::{
35-
assert_matches,
35+
assert_matches, assert_matches_return_val,
3636
random::{make_seedable_rng, Seed},
3737
BasicTestTimeGetter,
3838
};
@@ -41,7 +41,7 @@ use crate::{
4141
config::{BackendTimeoutsConfig, P2pConfig},
4242
message::{HeaderList, HeaderListRequest},
4343
net::{
44-
default_backend::types::{HandshakeMessage, Message, P2pTimestamp},
44+
default_backend::types::{HandshakeMessage, Message, MessageTag, P2pTimestamp},
4545
types::PeerManagerMessageExtTag,
4646
},
4747
sync::test_helpers::make_new_blocks,
@@ -242,35 +242,56 @@ async fn no_connection_lockup_when_socket_not_read(#[case] seed: Seed) {
242242
.await;
243243
}
244244

245-
// Similar to the test above, here the peer also stops reading from the socket, but it doesn't do
246-
// any discourageable actions this time.
247-
// The expected result is that the connection should also be terminated.
248-
// This checks that there is a timeout on socket write attempts.
245+
#[derive(Debug)]
246+
enum TimeoutTestType {
247+
SocketWrite,
248+
Disconnect,
249+
}
250+
251+
// Here the peer also stops reading from the socket, but doesn't do any discourageable actions.
252+
// Two cases exist:
253+
// 1) Socket write timeout is small, disconnection timeout is artificially large (just in case).
254+
// The expected result is that the connection should be terminated, though not immediately.
255+
// I.e. this checks that there is a timeout on socket write attempts.
256+
// 2) Disconnection timeout is small, socket write timeout is artificially large.
257+
// We wait until the HeaderList message has been sent by the node and then initiate manual
258+
// disconnection. The expected result is the same - the connection should be terminated,
259+
// but not immediately.
260+
// I.e. this checks that there is a timeout on disconnection attempts.
249261
#[tracing::instrument(skip(seed))]
250262
#[rstest]
251263
#[trace]
252264
#[case(Seed::from_entropy())]
253265
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
254-
async fn timeout_when_socket_not_read(#[case] seed: Seed) {
266+
async fn timeout_when_socket_not_read(
267+
#[case] seed: Seed,
268+
#[values(TimeoutTestType::SocketWrite, TimeoutTestType::Disconnect)] test_type: TimeoutTestType,
269+
) {
255270
let mut rng = make_seedable_rng(seed);
256271

257272
run_with_timeout(async {
258273
// Note: this is in bytes; this size should be less than the encoded size of `node_headers_count` headers.
259274
let channel_max_buf_size = 1000;
260275
let node_headers_count = 100;
261276

262-
let socket_write_timeout = Duration::from_secs(3);
277+
let timeout = Duration::from_secs(3);
278+
let large_timeout = Duration::from_secs(3600);
263279
let no_disconnection_after = Duration::from_secs(1);
264280

281+
let (socket_write_timeout, disconnection_timeout) = match test_type {
282+
TimeoutTestType::SocketWrite => (timeout, large_timeout),
283+
TimeoutTestType::Disconnect => (large_timeout, timeout),
284+
};
285+
265286
let time_getter = BasicTestTimeGetter::new();
266287
let chain_config = Arc::new(chain::config::create_unit_test_config());
267288
let p2p_config = Arc::new(P2pConfig {
268289
backend_timeouts: BackendTimeoutsConfig {
269290
socket_write_timeout: socket_write_timeout.into(),
291+
disconnection_timeout: disconnection_timeout.into(),
270292

271293
outbound_connection_timeout: Default::default(),
272294
peer_handshake_timeout: Default::default(),
273-
disconnection_timeout: Default::default(),
274295
},
275296

276297
bind_addresses: Default::default(),
@@ -382,9 +403,10 @@ async fn timeout_when_socket_not_read(#[case] seed: Seed) {
382403

383404
log::debug!("Expecting PeerManagerNotification::ConnectionAccepted");
384405
let peer_mgr_notif = test_node.peer_mgr_notification_receiver().recv().await.unwrap();
385-
assert_matches!(
406+
let peer_id = assert_matches_return_val!(
386407
peer_mgr_notif,
387-
PeerManagerNotification::ConnectionAccepted { .. }
408+
PeerManagerNotification::ConnectionAccepted { peer_id, .. },
409+
peer_id
388410
);
389411

390412
log::debug!("Expecting PeerManagerNotification::FirstSyncMessageReceived");
@@ -397,6 +419,16 @@ async fn timeout_when_socket_not_read(#[case] seed: Seed) {
397419
}
398420
);
399421

422+
let manual_disconnect_result_receiver = match test_type {
423+
TimeoutTestType::SocketWrite => None,
424+
TimeoutTestType::Disconnect => {
425+
test_node
426+
.wait_for_next_backend_socket_write(peer_id, MessageTag::HeaderList)
427+
.await;
428+
Some(test_node.start_disconnecting(peer_id))
429+
}
430+
};
431+
400432
// Wait for a short period of time, there should be no disconnection.
401433
tokio::time::timeout(
402434
no_disconnection_after,
@@ -413,6 +445,10 @@ async fn timeout_when_socket_not_read(#[case] seed: Seed) {
413445
PeerManagerNotification::ConnectionClosed { .. }
414446
);
415447

448+
if let Some(manual_disconnect_result_receiver) = manual_disconnect_result_receiver {
449+
manual_disconnect_result_receiver.await.unwrap().unwrap();
450+
}
451+
416452
test_node.join().await;
417453
})
418454
.await;

0 commit comments

Comments
 (0)