Skip to content

Commit 0141c64

Browse files
committed
P2p: send a FirstSyncMessageReceived message to the peer manager once instead of sending TransactionSyncMessageTag/BlockSyncMessageTag on each sync message. Add a test to check that the message is actually sent. Some renaming.
1 parent 0e01057 commit 0141c64

12 files changed

Lines changed: 435 additions & 84 deletions

File tree

dns-server/src/crawler_p2p/crawler_manager/mod.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use p2p::{
3434
PingResponse,
3535
},
3636
net::{
37-
types::{ConnectivityEvent, PeerManagerMessageOrTag, SyncingEvent},
37+
types::{ConnectivityEvent, PeerManagerMessageExt, SyncingEvent},
3838
ConnectivityService, NetworkingService, SyncingEventReceiver,
3939
},
4040
peer_manager::{
@@ -211,12 +211,11 @@ where
211211
fn handle_conn_message(
212212
&mut self,
213213
peer_id: PeerId,
214-
message: PeerManagerMessageOrTag,
214+
message: PeerManagerMessageExt,
215215
) -> p2p::Result<()> {
216216
let peer_mgr_message = match message {
217-
PeerManagerMessageOrTag::PeerManagerMessage(message) => message,
218-
PeerManagerMessageOrTag::BlockSyncMessage(_)
219-
| PeerManagerMessageOrTag::TransactionSyncMessage(_) => {
217+
PeerManagerMessageExt::PeerManagerMessage(message) => message,
218+
PeerManagerMessageExt::FirstSyncMessageReceived => {
220219
// Ignored
221220
return Ok(());
222221
}

p2p/src/message.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use serialization::{Decode, Encode};
2626
use crate::types::peer_address::PeerAddress;
2727

2828
#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants)]
29-
#[strum_discriminants(name(BlockSyncMessageTag))]
29+
#[strum_discriminants(name(BlockSyncMessageTag), derive(strum::EnumIter))]
3030
pub enum BlockSyncMessage {
3131
HeaderListRequest(HeaderListRequest),
3232
BlockListRequest(BlockListRequest),
@@ -42,15 +42,15 @@ pub enum BlockSyncMessage {
4242
}
4343

4444
#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants)]
45-
#[strum_discriminants(name(TransactionSyncMessageTag))]
45+
#[strum_discriminants(name(TransactionSyncMessageTag), derive(strum::EnumIter))]
4646
pub enum TransactionSyncMessage {
4747
NewTransaction(Id<Transaction>),
4848
TransactionRequest(Id<Transaction>),
4949
TransactionResponse(TransactionResponse),
5050
}
5151

5252
#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants)]
53-
#[strum_discriminants(name(PeerManagerMessageTag))]
53+
#[strum_discriminants(name(PeerManagerMessageTag), derive(strum::EnumIter))]
5454
pub enum PeerManagerMessage {
5555
AddrListRequest(AddrListRequest),
5656
AnnounceAddrRequest(AnnounceAddrRequest),

p2p/src/net/default_backend/backend.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ use crate::{
5050
types::{BackendEvent, Command, PeerEvent},
5151
},
5252
types::{
53-
services::Services, ConnectivityEvent, PeerInfo, PeerManagerMessageOrTag, SyncingEvent,
53+
services::Services, ConnectivityEvent, PeerInfo, PeerManagerMessageExt, SyncingEvent,
5454
},
5555
},
5656
protocol::{ProtocolVersion, SupportedProtocolVersion},
@@ -697,7 +697,7 @@ where
697697
fn handle_message(
698698
&mut self,
699699
peer_id: PeerId,
700-
message: PeerManagerMessageOrTag,
700+
message: PeerManagerMessageExt,
701701
) -> crate::Result<()> {
702702
// Do not process remaining messages if the peer has been forcibly disconnected (for example, after being banned).
703703
// Without this check, the backend might send messages to the sync and peer managers after sending the disconnect notification.

p2p/src/net/default_backend/peer.rs

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::{
3333
message::{BlockSyncMessage, TransactionSyncMessage, WillDisconnectMessage},
3434
net::{
3535
default_backend::types::{BackendEvent, PeerEvent},
36-
types::PeerManagerMessageOrTag,
36+
types::PeerManagerMessageExt,
3737
},
3838
protocol::{choose_common_protocol_version, ProtocolVersion, SupportedProtocolVersion},
3939
types::peer_id::PeerId,
@@ -83,6 +83,10 @@ pub struct Peer<T: TransportSocket> {
8383

8484
/// Time getter
8585
time_getter: TimeGetter,
86+
87+
/// Will be set to true once at least one BlockSyncMessage or TransactionSyncMessage has been
88+
/// received from the peer.
89+
sync_message_received: bool,
8690
}
8791

8892
impl<T> Peer<T>
@@ -115,6 +119,7 @@ where
115119
node_protocol_version,
116120
time_getter,
117121
common_protocol_version: None,
122+
sync_message_received: false,
118123
}
119124
}
120125

@@ -393,17 +398,17 @@ where
393398
// Note: the channels used by this function to propagate messages to other parts of p2p
394399
// must be bounded; this is important to prevent DoS attacks.
395400
async fn handle_socket_msg(
401+
&mut self,
396402
peer_id: PeerId,
397403
msg: Message,
398-
peer_event_sender: &mut mpsc::Sender<PeerEvent>,
399-
block_sync_msg_sender: &mut mpsc::Sender<BlockSyncMessage>,
400-
transaction_sync_msg_sender: &mut mpsc::Sender<TransactionSyncMessage>,
404+
block_sync_msg_sender: &mpsc::Sender<BlockSyncMessage>,
405+
transaction_sync_msg_sender: &mpsc::Sender<TransactionSyncMessage>,
401406
) -> crate::Result<()> {
402407
match msg.categorize() {
403408
CategorizedMessage::Handshake(_) => {
404409
log::error!("Peer {peer_id} sent unexpected handshake message");
405410

406-
peer_event_sender
411+
self.peer_event_sender
407412
.send(PeerEvent::Misbehaved {
408413
error: P2pError::ProtocolError(ProtocolError::UnexpectedMessage(
409414
"Unexpected handshake message".to_owned(),
@@ -412,29 +417,35 @@ where
412417
.await?;
413418
}
414419
CategorizedMessage::PeerManagerMessage(msg) => {
415-
peer_event_sender
420+
self.peer_event_sender
416421
.send(PeerEvent::MessageReceived(
417-
PeerManagerMessageOrTag::PeerManagerMessage(msg),
422+
PeerManagerMessageExt::PeerManagerMessage(msg),
418423
))
419424
.await?
420425
}
421426
CategorizedMessage::BlockSyncMessage(msg) => {
422-
peer_event_sender
423-
.send(PeerEvent::MessageReceived(
424-
PeerManagerMessageOrTag::BlockSyncMessage((&msg).into()),
425-
))
426-
.await?;
427-
428-
block_sync_msg_sender.send(msg).await?
427+
block_sync_msg_sender.send(msg).await?;
428+
429+
if !self.sync_message_received {
430+
self.peer_event_sender
431+
.send(PeerEvent::MessageReceived(
432+
PeerManagerMessageExt::FirstSyncMessageReceived,
433+
))
434+
.await?;
435+
self.sync_message_received = true;
436+
}
429437
}
430438
CategorizedMessage::TransactionSyncMessage(msg) => {
431-
peer_event_sender
432-
.send(PeerEvent::MessageReceived(
433-
PeerManagerMessageOrTag::TransactionSyncMessage((&msg).into()),
434-
))
435-
.await?;
436-
437-
transaction_sync_msg_sender.send(msg).await?
439+
transaction_sync_msg_sender.send(msg).await?;
440+
441+
if !self.sync_message_received {
442+
self.peer_event_sender
443+
.send(PeerEvent::MessageReceived(
444+
PeerManagerMessageExt::FirstSyncMessageReceived,
445+
))
446+
.await?;
447+
self.sync_message_received = true;
448+
}
438449
}
439450
}
440451

@@ -507,12 +518,11 @@ where
507518
event = self.socket.recv(), if sync_msg_senders_opt.is_some() => match event {
508519
Ok(message) => {
509520
let sync_msg_senders = sync_msg_senders_opt.as_mut().expect("sync_msg_senders_opt is some");
510-
Self::handle_socket_msg(
521+
self.handle_socket_msg(
511522
self.peer_id,
512523
message,
513-
&mut self.peer_event_sender,
514-
&mut sync_msg_senders.0,
515-
&mut sync_msg_senders.1,
524+
&sync_msg_senders.0,
525+
&sync_msg_senders.1,
516526
).await?;
517527
}
518528
Err(err) => {

p2p/src/net/default_backend/types.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::{
3232
BlockSyncMessage, HeaderList, HeaderListRequest, PeerManagerMessage, PingRequest,
3333
PingResponse, TransactionResponse, TransactionSyncMessage, WillDisconnectMessage,
3434
},
35-
net::types::{services::Services, PeerManagerMessageOrTag},
35+
net::types::{services::Services, PeerManagerMessageExt},
3636
protocol::{ProtocolVersion, SupportedProtocolVersion},
3737
types::{peer_address::PeerAddress, peer_id::PeerId},
3838
};
@@ -119,7 +119,7 @@ pub enum PeerEvent {
119119
ConnectionClosed,
120120

121121
/// Message received from remote
122-
MessageReceived(PeerManagerMessageOrTag),
122+
MessageReceived(PeerManagerMessageExt),
123123

124124
/// Protocol violation
125125
Misbehaved { error: P2pError },

p2p/src/net/types.rs

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@ use tokio::sync::mpsc::Receiver;
2727
use crate::{
2828
error::ConnectionValidationError,
2929
message::{
30-
BlockSyncMessage, BlockSyncMessageTag, PeerManagerMessage, PeerManagerMessageTag,
31-
TransactionSyncMessage, TransactionSyncMessageTag,
30+
BlockSyncMessage, PeerManagerMessage, PeerManagerMessageTag, TransactionSyncMessage,
3231
},
3332
protocol::SupportedProtocolVersion,
3433
types::{peer_address::PeerAddress, peer_id::PeerId},
@@ -166,11 +165,9 @@ impl Display for PeerInfo {
166165
#[derive(Debug)]
167166
pub enum ConnectivityEvent {
168167
/// A message received from a peer.
169-
///
170-
/// Note that only a message tag is present here for block and transaction sync messages.
171168
Message {
172169
peer_id: PeerId,
173-
message: PeerManagerMessageOrTag,
170+
message: PeerManagerMessageExt,
174171
},
175172

176173
/// Outbound connection accepted
@@ -248,37 +245,34 @@ pub enum ConnectivityEvent {
248245
},
249246
}
250247

251-
/// Either a full `PeerManagerMessage` or, if it's a sync message, the corresponding tag.
252-
#[derive(Debug, Clone)]
253-
pub enum PeerManagerMessageOrTag {
248+
#[derive(Debug)]
249+
pub enum PeerManagerMessageExt {
250+
// The complete PeerManagerMessage
254251
PeerManagerMessage(PeerManagerMessage),
255-
BlockSyncMessage(BlockSyncMessageTag),
256-
TransactionSyncMessage(TransactionSyncMessageTag),
252+
253+
// An indicator that the first sync message (i.e. BlockSyncMessage or TransactionSyncMessage)
254+
// has been received from the peer.
255+
FirstSyncMessageReceived,
257256
}
258257

259-
impl From<PeerManagerMessage> for PeerManagerMessageOrTag {
258+
impl From<PeerManagerMessage> for PeerManagerMessageExt {
260259
fn from(value: PeerManagerMessage) -> Self {
261260
Self::PeerManagerMessage(value)
262261
}
263262
}
264263

264+
/// Tag type for `PeerManagerMessageExt`.
265265
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
266-
pub enum ConnectivityEventMessageTag {
266+
pub enum PeerManagerMessageExtTag {
267267
PeerManagerMessage(PeerManagerMessageTag),
268-
BlockSyncMessage(BlockSyncMessageTag),
269-
TransactionSyncMessage(TransactionSyncMessageTag),
268+
FirstSyncMessageReceived,
270269
}
271270

272-
impl From<&'_ PeerManagerMessageOrTag> for ConnectivityEventMessageTag {
273-
fn from(value: &'_ PeerManagerMessageOrTag) -> Self {
271+
impl From<&'_ PeerManagerMessageExt> for PeerManagerMessageExtTag {
272+
fn from(value: &'_ PeerManagerMessageExt) -> Self {
274273
match value {
275-
PeerManagerMessageOrTag::PeerManagerMessage(msg) => {
276-
Self::PeerManagerMessage(msg.into())
277-
}
278-
PeerManagerMessageOrTag::BlockSyncMessage(tag) => Self::BlockSyncMessage(*tag),
279-
PeerManagerMessageOrTag::TransactionSyncMessage(tag) => {
280-
Self::TransactionSyncMessage(*tag)
281-
}
274+
PeerManagerMessageExt::PeerManagerMessage(msg) => Self::PeerManagerMessage(msg.into()),
275+
PeerManagerMessageExt::FirstSyncMessageReceived => Self::FirstSyncMessageReceived,
282276
}
283277
}
284278
}

p2p/src/peer_manager/mod.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ use crate::{
6363
net::{
6464
types::{
6565
services::{Service, Services},
66-
ConnectivityEvent, ConnectivityEventMessageTag, PeerInfo, PeerManagerMessageOrTag,
67-
PeerRole,
66+
ConnectivityEvent, PeerInfo, PeerManagerMessageExt, PeerManagerMessageExtTag, PeerRole,
6867
},
6968
ConnectivityService, NetworkingService,
7069
},
@@ -1563,9 +1562,9 @@ where
15631562
}
15641563
}
15651564

1566-
fn handle_incoming_message(&mut self, peer_id: PeerId, message: PeerManagerMessageOrTag) {
1565+
fn handle_incoming_message(&mut self, peer_id: PeerId, message: PeerManagerMessageExt) {
15671566
let is_disconnection_message = match &message {
1568-
PeerManagerMessageOrTag::PeerManagerMessage(msg) => match msg {
1567+
PeerManagerMessageExt::PeerManagerMessage(msg) => match msg {
15691568
PeerManagerMessage::WillDisconnect(_) => true,
15701569

15711570
PeerManagerMessage::AddrListRequest(_)
@@ -1574,8 +1573,7 @@ where
15741573
| PeerManagerMessage::AddrListResponse(_)
15751574
| PeerManagerMessage::PingResponse(_) => false,
15761575
},
1577-
PeerManagerMessageOrTag::BlockSyncMessage(_)
1578-
| PeerManagerMessageOrTag::TransactionSyncMessage(_) => false,
1576+
PeerManagerMessageExt::FirstSyncMessageReceived => false,
15791577
};
15801578

15811579
// Note: `PeerContext` must always exist when an incoming message arrives, and the individual
@@ -1595,10 +1593,10 @@ where
15951593
return;
15961594
}
15971595

1598-
let message_tag: ConnectivityEventMessageTag = (&message).into();
1596+
let message_tag: PeerManagerMessageExtTag = (&message).into();
15991597

16001598
match message {
1601-
PeerManagerMessageOrTag::PeerManagerMessage(msg) => match msg {
1599+
PeerManagerMessageExt::PeerManagerMessage(msg) => match msg {
16021600
PeerManagerMessage::AddrListRequest(_) => {
16031601
self.handle_addr_list_request(peer_id);
16041602
}
@@ -1618,8 +1616,7 @@ where
16181616
self.handle_will_disconnect_messgae(peer_id, msg);
16191617
}
16201618
},
1621-
PeerManagerMessageOrTag::BlockSyncMessage(_)
1622-
| PeerManagerMessageOrTag::TransactionSyncMessage(_) => {}
1619+
PeerManagerMessageExt::FirstSyncMessageReceived => {}
16231620
};
16241621

16251622
if let Some(o) = self.observer.as_mut() {
@@ -2481,7 +2478,7 @@ pub trait Observer {
24812478

24822479
// This will be called after `ConnectivityEvent::Message` has been handled by
24832480
// the peer manager.
2484-
fn message_received(&mut self, peer_id: PeerId, message_tag: ConnectivityEventMessageTag);
2481+
fn message_received(&mut self, peer_id: PeerId, message_tag: PeerManagerMessageExtTag);
24852482
}
24862483

24872484
pub trait PeerManagerInterface {

p2p/src/peer_manager/tests/unsuccessful_connection_counter_update.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,9 @@ use crate::{
4444
config::P2pConfig,
4545
disconnection_reason::DisconnectionReason,
4646
error::{DialError, P2pError},
47-
message::BlockSyncMessageTag,
4847
net::{
4948
default_backend::types::{Command, Message},
50-
types::{
51-
ConnectivityEvent, ConnectivityEventMessageTag, PeerManagerMessageOrTag, PeerRole,
52-
},
49+
types::{ConnectivityEvent, PeerManagerMessageExt, PeerManagerMessageExtTag, PeerRole},
5350
},
5451
peer_manager::{
5552
self,
@@ -745,9 +742,7 @@ async fn auto_connection_without_peer_activity(
745742
conn_event_sender
746743
.send(ConnectivityEvent::Message {
747744
peer_id,
748-
message: PeerManagerMessageOrTag::BlockSyncMessage(
749-
BlockSyncMessageTag::HeaderListRequest,
750-
),
745+
message: PeerManagerMessageExt::FirstSyncMessageReceived,
751746
})
752747
.unwrap();
753748

@@ -756,9 +751,7 @@ async fn auto_connection_without_peer_activity(
756751
peer_mgr_notification,
757752
PeerManagerNotification::MessageReceived {
758753
peer_id,
759-
message_tag: ConnectivityEventMessageTag::BlockSyncMessage(
760-
BlockSyncMessageTag::HeaderListRequest
761-
)
754+
message_tag: PeerManagerMessageExtTag::FirstSyncMessageReceived
762755
}
763756
);
764757
}

0 commit comments

Comments
 (0)