Skip to content

Commit a0bed49

Browse files
authored
Merge pull request #2026 from mintlayer/p2p_count_connections_without_activity
P2p: count connections without activity; use the counter when calculating next connection attempt time, to make it progressively bigger as the counter grows.
2 parents e6b8948 + 5666d88 commit a0bed49

File tree

28 files changed

+2312
-168
lines changed

28 files changed

+2312
-168
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ The format is loosely based on [Keep a Changelog](https://keepachangelog.com/en/
6868
- Node bootstrapping:
6969
- The format of the bootstrap file was changed and the legacy format is no longer supported.
7070

71+
- P2p:
72+
- The logic of initiating new outbound connections has been improved to prevent the node from
73+
constantly attempting to re-establish a connection with a peer that has banned it.
74+
7175
### Fixed
7276
- P2p: when a peer sends a message that can't be decoded, it will now be discouraged (which is what
7377
is normally done for misbehaving peers) and the node won't try connecting to it again.\

dns-server/src/crawler_p2p/crawler_manager/mod.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use p2p::{
3434
PingResponse,
3535
},
3636
net::{
37-
types::{ConnectivityEvent, SyncingEvent},
37+
types::{ConnectivityEvent, PeerManagerMessageExt, SyncingEvent},
3838
ConnectivityService, NetworkingService, SyncingEventReceiver,
3939
},
4040
peer_manager::{
@@ -211,9 +211,17 @@ where
211211
fn handle_conn_message(
212212
&mut self,
213213
peer_id: PeerId,
214-
message: PeerManagerMessage,
214+
message: PeerManagerMessageExt,
215215
) -> p2p::Result<()> {
216-
match message {
216+
let peer_mgr_message = match message {
217+
PeerManagerMessageExt::PeerManagerMessage(message) => message,
218+
PeerManagerMessageExt::FirstSyncMessageReceived => {
219+
// Ignored
220+
return Ok(());
221+
}
222+
};
223+
224+
match peer_mgr_message {
217225
PeerManagerMessage::AddrListRequest(_) => {
218226
// Ignored
219227
Ok(())
@@ -231,7 +239,7 @@ where
231239
}
232240
PeerManagerMessage::PingRequest(PingRequest { nonce }) => {
233241
self.conn
234-
.send_message(
242+
.send_peer_manager_message(
235243
peer_id,
236244
PeerManagerMessage::PingResponse(PingResponse { nonce }),
237245
)
@@ -382,7 +390,7 @@ where
382390
CrawlerCommand::RequestAddresses { peer_id } => {
383391
log::debug!("Requesting addresses from peer {peer_id}");
384392

385-
conn.send_message(
393+
conn.send_peer_manager_message(
386394
peer_id,
387395
PeerManagerMessage::AddrListRequest(AddrListRequest {}),
388396
)

dns-server/src/crawler_p2p/crawler_manager/tests/mock_manager.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ impl MockStateRef {
132132
peer_id,
133133
message: PeerManagerMessage::AnnounceAddrRequest(AnnounceAddrRequest {
134134
address: announced_ip.as_peer_address(),
135-
}),
135+
})
136+
.into(),
136137
})
137138
.unwrap();
138139
}
@@ -271,7 +272,11 @@ impl ConnectivityService<MockNetworkingService> for MockConnectivityHandle {
271272
Ok(())
272273
}
273274

274-
fn send_message(&mut self, _peer_id: PeerId, _request: PeerManagerMessage) -> p2p::Result<()> {
275+
fn send_peer_manager_message(
276+
&mut self,
277+
_peer_id: PeerId,
278+
_request: PeerManagerMessage,
279+
) -> p2p::Result<()> {
275280
Ok(())
276281
}
277282

p2p/src/message.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ use serialization::{Decode, Encode};
2525

2626
use crate::types::peer_address::PeerAddress;
2727

28-
#[derive(Debug, Clone, PartialEq, Eq)]
28+
#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants)]
29+
#[strum_discriminants(name(BlockSyncMessageTag), derive(strum::EnumIter))]
2930
pub enum BlockSyncMessage {
3031
HeaderListRequest(HeaderListRequest),
3132
BlockListRequest(BlockListRequest),
@@ -40,14 +41,16 @@ pub enum BlockSyncMessage {
4041
TestSentinel(Id<()>),
4142
}
4243

43-
#[derive(Debug, Clone, PartialEq, Eq)]
44+
#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants)]
45+
#[strum_discriminants(name(TransactionSyncMessageTag), derive(strum::EnumIter))]
4446
pub enum TransactionSyncMessage {
4547
NewTransaction(Id<Transaction>),
4648
TransactionRequest(Id<Transaction>),
4749
TransactionResponse(TransactionResponse),
4850
}
4951

50-
#[derive(Debug, Clone, PartialEq, Eq)]
52+
#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants)]
53+
#[strum_discriminants(name(PeerManagerMessageTag), derive(strum::EnumIter))]
5154
pub enum PeerManagerMessage {
5255
AddrListRequest(AddrListRequest),
5356
AnnounceAddrRequest(AnnounceAddrRequest),

p2p/src/net/default_backend/backend.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,14 @@ use crate::{
4444
config::P2pConfig,
4545
disconnection_reason::DisconnectionReason,
4646
error::{DialError, P2pError, PeerError},
47-
message::PeerManagerMessage,
4847
net::{
4948
default_backend::{
5049
peer,
5150
types::{BackendEvent, Command, PeerEvent},
5251
},
53-
types::{services::Services, ConnectivityEvent, PeerInfo, SyncingEvent},
52+
types::{
53+
services::Services, ConnectivityEvent, PeerInfo, PeerManagerMessageExt, SyncingEvent,
54+
},
5455
},
5556
protocol::{ProtocolVersion, SupportedProtocolVersion},
5657
types::{peer_address::PeerAddress, peer_id::PeerId},
@@ -613,7 +614,7 @@ where
613614
Ok(())
614615
}
615616

616-
PeerEvent::MessageReceived { message } => {
617+
PeerEvent::MessageReceived(message) => {
617618
if self.networking_enabled {
618619
self.handle_message(peer_id, message)?;
619620
}
@@ -696,7 +697,7 @@ where
696697
fn handle_message(
697698
&mut self,
698699
peer_id: PeerId,
699-
message: PeerManagerMessage,
700+
message: PeerManagerMessageExt,
700701
) -> crate::Result<()> {
701702
// Do not process remaining messages if the peer has been forcibly disconnected (for example, after being banned).
702703
// Without this check, the backend might send messages to the sync and peer managers after sending the disconnect notification.

p2p/src/net/default_backend/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,11 @@ where
136136
Ok(self.cmd_sender.send(types::Command::Disconnect { peer_id, reason })?)
137137
}
138138

139-
fn send_message(&mut self, peer_id: PeerId, message: PeerManagerMessage) -> crate::Result<()> {
139+
fn send_peer_manager_message(
140+
&mut self,
141+
peer_id: PeerId,
142+
message: PeerManagerMessage,
143+
) -> crate::Result<()> {
140144
Ok(self.cmd_sender.send(types::Command::SendMessage {
141145
peer_id,
142146
message: message.into(),

p2p/src/net/default_backend/peer.rs

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ use crate::{
3131
disconnection_reason::DisconnectionReason,
3232
error::{ConnectionValidationError, P2pError, PeerError, ProtocolError},
3333
message::{BlockSyncMessage, TransactionSyncMessage, WillDisconnectMessage},
34-
net::default_backend::types::{BackendEvent, PeerEvent},
34+
net::{
35+
default_backend::types::{BackendEvent, PeerEvent},
36+
types::PeerManagerMessageExt,
37+
},
3538
protocol::{choose_common_protocol_version, ProtocolVersion, SupportedProtocolVersion},
3639
types::peer_id::PeerId,
3740
};
@@ -80,6 +83,10 @@ pub struct Peer<T: TransportSocket> {
8083

8184
/// Time getter
8285
time_getter: TimeGetter,
86+
87+
/// Will be set to true once at least one BlockSyncMessage or TransactionSyncMessage has been
88+
/// received from the peer.
89+
sync_message_received: bool,
8390
}
8491

8592
impl<T> Peer<T>
@@ -112,6 +119,7 @@ where
112119
node_protocol_version,
113120
time_getter,
114121
common_protocol_version: None,
122+
sync_message_received: false,
115123
}
116124
}
117125

@@ -390,17 +398,17 @@ where
390398
// Note: the channels used by this function to propagate messages to other parts of p2p
391399
// must be bounded; this is important to prevent DoS attacks.
392400
async fn handle_socket_msg(
401+
&mut self,
393402
peer_id: PeerId,
394403
msg: Message,
395-
peer_event_sender: &mut mpsc::Sender<PeerEvent>,
396-
block_sync_msg_sender: &mut mpsc::Sender<BlockSyncMessage>,
397-
transaction_sync_msg_sender: &mut mpsc::Sender<TransactionSyncMessage>,
404+
block_sync_msg_sender: &mpsc::Sender<BlockSyncMessage>,
405+
transaction_sync_msg_sender: &mpsc::Sender<TransactionSyncMessage>,
398406
) -> crate::Result<()> {
399407
match msg.categorize() {
400408
CategorizedMessage::Handshake(_) => {
401409
log::error!("Peer {peer_id} sent unexpected handshake message");
402410

403-
peer_event_sender
411+
self.peer_event_sender
404412
.send(PeerEvent::Misbehaved {
405413
error: P2pError::ProtocolError(ProtocolError::UnexpectedMessage(
406414
"Unexpected handshake message".to_owned(),
@@ -409,11 +417,35 @@ where
409417
.await?;
410418
}
411419
CategorizedMessage::PeerManagerMessage(msg) => {
412-
peer_event_sender.send(PeerEvent::MessageReceived { message: msg }).await?
420+
self.peer_event_sender
421+
.send(PeerEvent::MessageReceived(
422+
PeerManagerMessageExt::PeerManagerMessage(msg),
423+
))
424+
.await?
425+
}
426+
CategorizedMessage::BlockSyncMessage(msg) => {
427+
if !self.sync_message_received {
428+
self.peer_event_sender
429+
.send(PeerEvent::MessageReceived(
430+
PeerManagerMessageExt::FirstSyncMessageReceived,
431+
))
432+
.await?;
433+
self.sync_message_received = true;
434+
}
435+
436+
block_sync_msg_sender.send(msg).await?;
413437
}
414-
CategorizedMessage::BlockSyncMessage(msg) => block_sync_msg_sender.send(msg).await?,
415438
CategorizedMessage::TransactionSyncMessage(msg) => {
416-
transaction_sync_msg_sender.send(msg).await?
439+
if !self.sync_message_received {
440+
self.peer_event_sender
441+
.send(PeerEvent::MessageReceived(
442+
PeerManagerMessageExt::FirstSyncMessageReceived,
443+
))
444+
.await?;
445+
self.sync_message_received = true;
446+
}
447+
448+
transaction_sync_msg_sender.send(msg).await?;
417449
}
418450
}
419451

@@ -486,12 +518,11 @@ where
486518
event = self.socket.recv(), if sync_msg_senders_opt.is_some() => match event {
487519
Ok(message) => {
488520
let sync_msg_senders = sync_msg_senders_opt.as_mut().expect("sync_msg_senders_opt is some");
489-
Self::handle_socket_msg(
521+
self.handle_socket_msg(
490522
self.peer_id,
491523
message,
492-
&mut self.peer_event_sender,
493-
&mut sync_msg_senders.0,
494-
&mut sync_msg_senders.1,
524+
&sync_msg_senders.0,
525+
&sync_msg_senders.1,
495526
).await?;
496527
}
497528
Err(err) => {

p2p/src/net/default_backend/types.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::{
3232
BlockSyncMessage, HeaderList, HeaderListRequest, PeerManagerMessage, PingRequest,
3333
PingResponse, TransactionResponse, TransactionSyncMessage, WillDisconnectMessage,
3434
},
35-
net::types::services::Services,
35+
net::types::{services::Services, PeerManagerMessageExt},
3636
protocol::{ProtocolVersion, SupportedProtocolVersion},
3737
types::{peer_address::PeerAddress, peer_id::PeerId},
3838
};
@@ -109,7 +109,7 @@ pub mod peer_event {
109109
}
110110
}
111111

112-
/// Events sent by Peer to Backend
112+
/// Events sent by `Peer` to `Backend`.
113113
#[derive(Debug)]
114114
pub enum PeerEvent {
115115
/// Peer information received from remote
@@ -119,7 +119,7 @@ pub enum PeerEvent {
119119
ConnectionClosed,
120120

121121
/// Message received from remote
122-
MessageReceived { message: PeerManagerMessage },
122+
MessageReceived(PeerManagerMessageExt),
123123

124124
/// Protocol violation
125125
Misbehaved { error: P2pError },
@@ -135,7 +135,7 @@ pub enum PeerEvent {
135135
},
136136
}
137137

138-
/// Events sent by Backend to Peer
138+
/// Events sent by `Backend` to `Peer`.
139139
#[derive(Debug)]
140140
pub enum BackendEvent {
141141
Accepted {

p2p/src/net/mod.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,18 +115,17 @@ where
115115
reason: Option<DisconnectionReason>,
116116
) -> crate::Result<()>;
117117

118-
/// Sends a message to the given peer.
119-
fn send_message(&mut self, peer: PeerId, message: PeerManagerMessage) -> crate::Result<()>;
118+
/// Sends a peer manager message to the given peer.
119+
fn send_peer_manager_message(
120+
&mut self,
121+
peer: PeerId,
122+
message: PeerManagerMessage,
123+
) -> crate::Result<()>;
120124

121125
/// Return the socket addresses of the network service provider
122126
fn local_addresses(&self) -> &[SocketAddress];
123127

124128
/// Poll events from the network service provider
125-
///
126-
/// There are three types of events that can be received:
127-
/// - incoming peer connections
128-
/// - new discovered peers
129-
/// - peer expiration events
130129
async fn poll_next(&mut self) -> crate::Result<types::ConnectivityEvent>;
131130
}
132131

0 commit comments

Comments
 (0)