Skip to content

Commit 6884299

Browse files
authored
Merge pull request #2031 from mintlayer/p2p_fix_connection_stalling
Fix for potential indefinite connection stalling
2 parents 83205de + 684ae7f commit 6884299

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+2574
-1445
lines changed

CHANGELOG.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,13 @@ The format is loosely based on [Keep a Changelog](https://keepachangelog.com/en/
7373
constantly attempting to re-establish a connection with a peer that has banned it.
7474

7575
### Fixed
76-
- P2p: when a peer sends a message that can't be decoded, it will now be discouraged (which is what
77-
is normally done for misbehaving peers) and the node won't try connecting to it again.\
78-
Also, the peer will be sent an appropriate `WillDisconnect` message prior to disconnection.
76+
- P2p:
77+
- When a peer sends a message that can't be decoded, it will now be discouraged (which is what
78+
is normally done for misbehaving peers) and the node won't try connecting to it again.\
79+
Also, the peer will be sent an appropriate `WillDisconnect` message prior to disconnection.
80+
81+
- Fixed a potential indefinite stalling of a particular connection when both nodes start sending
82+
large amounts of data to each other.
7983

8084
- Wallet CLI and RPC: the commands `account-utxos` and `standalone-multisig-utxos` and their RPC
8185
counterparts now return correct decimal amounts for tokens with non-default number of decimals.

Cargo.lock

Lines changed: 6 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -262,9 +262,10 @@ tracing-subscriber = { version = "0.3", features = [
262262
"env-filter",
263263
"json",
264264
] }
265-
# Note: we use an unreleased version of Tokio, because we need a certain bugfix, see the `[patch.crates-io]`
266-
# section for additional details.
267-
tokio = { git = "https://github.com/tokio-rs/tokio", rev = "0d6c7af3e43457350bdc03a6dbcafa276fab7352", default-features = false }
265+
# Note: we need at least tokio 1.50, because it includes the PR https://github.com/tokio-rs/tokio/pull/7879, which fixes
266+
# the potential indefinite stalling of the chainstate subsystem (reproducible on a slow machine during initial sync
267+
# with 30+ connected peers).
268+
tokio = { version = "1.50", default-features = false }
268269
tokio-socks = "0.5"
269270
tokio-stream = "0.1"
270271
tokio-util = { version = "0.7", default-features = false }
@@ -349,11 +350,6 @@ default = ["trezor", "ledger"]
349350
# TODO: investigate this further.
350351
fontconfig-parser = { git = "https://github.com/Riey/fontconfig-parser", rev = "f7d13a779e6ee282ce75acbc00a1270c0350e0c2" }
351352

352-
# We need this PR - https://github.com/tokio-rs/tokio/pull/7879 - to fix potential indefinite stalling of the chainstate
353-
# subsystem (reproducible on a slow machine during initial sync with 30+ connected peers).
354-
# The PR was merged after 1.49, so it should probably be part of 1.50 when it comes out.
355-
tokio = { git = "https://github.com/tokio-rs/tokio", rev = "0d6c7af3e43457350bdc03a6dbcafa276fab7352" }
356-
357353
# This patch is needed because there is no release of the library and because ledger-lib depends on ledger-proto, so this is the only way to make the former find the latter.
358354
# Note that the revision specified here must be the same as the one used in the workspace.dependencies section
359355
ledger-proto = { git = "https://github.com/ImplOfAnImpl/rust-ledger.git", rev = "035789ec436d47b938e8a3d2085ffb2fbf6f0559" }

deny.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
github = [
33
"mintlayer", # allow any code from mintlayer's github
44
"paritytech", # we have to use an unreleased version of parity-scale-codec at this moment
5-
"tokio-rs", # we have to use an unreleased version of tokio at this moment
65
]
76

87
[licenses]

dns-server/src/main.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,17 +76,16 @@ async fn run(options: DnsServerRunOptions) -> anyhow::Result<Never> {
7676
// Note: this ban config (as well as any other settings related to the peer or sync manager)
7777
// won't have any effect on the dns server.
7878
ban_config: Default::default(),
79-
outbound_connection_timeout: Default::default(),
8079
ping_check_period: Default::default(),
8180
ping_timeout: Default::default(),
82-
peer_handshake_timeout: Default::default(),
8381
max_clock_diff: Default::default(),
8482
node_type: NodeType::DnsServer.into(),
8583
allow_discover_private_ips: Default::default(),
8684
user_agent,
8785
sync_stalling_timeout: Default::default(),
8886
peer_manager_config: Default::default(),
8987
protocol_config: Default::default(),
88+
backend_timeouts: Default::default(),
9089
custom_disconnection_reason_for_banning: Default::default(),
9190
});
9291

networking/src/error.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,21 @@ pub enum NetworkingError {
2020
// Note: std::io::Error is neither clonable nor comparable, so we only store its "kind" here.
2121
#[error("IO error: {0}")]
2222
IoError(std::io::ErrorKind),
23+
2324
#[error("Message codec error: {0}")]
2425
MessageCodecError(#[from] MessageCodecError),
26+
2527
#[error("Noise protocol handshake error")]
2628
NoiseHandshakeError(String),
29+
2730
#[error("Proxy error: {0}")]
2831
ProxyError(String),
2932

3033
#[error("Channel transport error: {0}")]
3134
ChannelTransportError(#[from] MpscChannelTransportError),
35+
36+
#[error("Socket write timed out")]
37+
SocketWriteTimedOut,
3238
}
3339

3440
#[derive(thiserror::Error, Debug, PartialEq, Eq, Clone)]

networking/src/test_helpers.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ impl TestTransportMaker for TestTransportChannel {
7979
}
8080
}
8181

82+
impl TestTransportChannel {
83+
pub fn make_transport_with_max_buf_size(max_buf_size: usize) -> MpscChannelTransport {
84+
MpscChannelTransport::new().with_max_buf_size(max_buf_size)
85+
}
86+
}
87+
8288
pub struct TestTransportNoise {}
8389

8490
impl TestTransportMaker for TestTransportNoise {

networking/src/transport/impls/channel.rs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ use crate::{
4141
Result,
4242
};
4343

44-
// How much bytes is allowed for write (without reading on the other side).
45-
const MAX_BUF_SIZE: usize = 10 * 1024 * 1024;
44+
/// The default value for how much bytes is allowed for write (without reading on the other side).
45+
const DEFAULT_MAX_BUF_SIZE: usize = 10 * 1024 * 1024;
4646

4747
struct IncomingConnection {
4848
from: SocketAddr,
@@ -68,6 +68,7 @@ static NEXT_IP_ADDRESS: StdAtomicU32 = StdAtomicU32::new(1);
6868
pub struct MpscChannelTransport {
6969
local_address: IpAddr,
7070
last_port: AtomicU16,
71+
max_buf_size: usize,
7172
}
7273

7374
impl MpscChannelTransport {
@@ -80,9 +81,16 @@ impl MpscChannelTransport {
8081
Self {
8182
local_address,
8283
last_port: 1024.into(),
84+
max_buf_size: DEFAULT_MAX_BUF_SIZE,
8385
}
8486
}
8587

88+
/// Specified how much bytes is allowed for write (without reading on the other side).
89+
pub fn with_max_buf_size(mut self, max_buf_size: usize) -> Self {
90+
self.max_buf_size = max_buf_size;
91+
self
92+
}
93+
8694
/// Return the next u32 value that can be used to construct a unique local address for this kind of transport.
8795
pub fn next_local_address_as_u32() -> u32 {
8896
NEXT_IP_ADDRESS.fetch_add(1, Ordering::Relaxed)
@@ -105,6 +113,7 @@ impl TransportSocket for MpscChannelTransport {
105113
return Ok(Self::Listener {
106114
addresses,
107115
receiver: None,
116+
max_buf_size: self.max_buf_size,
108117
});
109118
}
110119

@@ -142,6 +151,7 @@ impl TransportSocket for MpscChannelTransport {
142151
Ok(Self::Listener {
143152
addresses,
144153
receiver: Some(receiver),
154+
max_buf_size: self.max_buf_size,
145155
})
146156
}
147157

@@ -186,6 +196,7 @@ impl TransportSocket for MpscChannelTransport {
186196
pub struct ChannelListener {
187197
addresses: Vec<SocketAddr>,
188198
receiver: Option<UnboundedReceiver<IncomingConnection>>,
199+
max_buf_size: usize,
189200
}
190201

191202
#[async_trait]
@@ -209,7 +220,7 @@ impl TransportListener for ChannelListener {
209220

210221
assert!(self.addresses.contains(&local_address));
211222

212-
let (server_stream, client_stream) = tokio::io::duplex(MAX_BUF_SIZE);
223+
let (server_stream, client_stream) = tokio::io::duplex(self.max_buf_size);
213224

214225
client_stream_sender.send(client_stream).map_err(|_| {
215226
MpscChannelTransportError::ConnectorDroppedUnexpectedly {
@@ -320,7 +331,7 @@ mod tests {
320331
use randomness::Rng;
321332
use test_utils::random::{gen_random_bytes, Seed};
322333

323-
use crate::transport::BufferedTranscoder;
334+
use crate::transport::new_message_stream;
324335

325336
use super::*;
326337

@@ -346,11 +357,12 @@ mod tests {
346357
let message_size = rng.gen_range(128..1024);
347358

348359
let message = gen_random_bytes(&mut rng, 1, message_size);
349-
let mut peer_stream = BufferedTranscoder::new(peer_stream, Some(message.encoded_size()));
350-
peer_stream.send(message.clone()).await.unwrap();
360+
let (_, mut peer_stream_writer) =
361+
new_message_stream(peer_stream, Some(message.encoded_size()));
362+
peer_stream_writer.send(message.clone()).await.unwrap();
351363

352-
let mut server_stream =
353-
BufferedTranscoder::<_, Vec<u8>>::new(server_stream, Some(message.encoded_size()));
354-
assert_eq!(server_stream.recv().await.unwrap(), message);
364+
let (mut server_stream_reader, _) =
365+
new_message_stream::<_, Vec<u8>>(server_stream, Some(message.encoded_size()));
366+
assert_eq!(server_stream_reader.recv().await.unwrap(), message);
355367
}
356368
}

networking/src/transport/impls/stream_adapter/wrapped_transport/tests.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::{
3636
test_helpers::{TestTransportChannel, TestTransportMaker, TestTransportTcp},
3737
transport::{
3838
impls::stream_adapter::wrapped_transport::wrapped_listener::MAX_CONCURRENT_HANDSHAKES,
39-
BufferedTranscoder, ChannelListener, IdentityStreamAdapter, MpscChannelTransport,
39+
new_message_stream, ChannelListener, IdentityStreamAdapter, MpscChannelTransport,
4040
NoiseEncryptionAdapter, NoiseEncryptionAdapterMaker, PeerStream, TcpTransportSocket,
4141
TransportListener, TransportSocket,
4242
},
@@ -252,13 +252,13 @@ async fn send_2_reqs(#[case] seed: Seed) {
252252
let message_1 = gen_random_bytes(&mut rng, 0, 1000);
253253
let message_2 = gen_random_bytes(&mut rng, 0, 1000);
254254

255-
let mut peer_stream = BufferedTranscoder::<_, Vec<u8>>::new(peer_stream, None);
256-
peer_stream.send(message_1.clone()).await.unwrap();
257-
peer_stream.send(message_2.clone()).await.unwrap();
255+
let (_, mut peer_stream_writer) = new_message_stream::<_, Vec<u8>>(peer_stream, None);
256+
peer_stream_writer.send(message_1.clone()).await.unwrap();
257+
peer_stream_writer.send(message_2.clone()).await.unwrap();
258258

259-
let mut server_stream = BufferedTranscoder::<_, Vec<u8>>::new(server_stream, None);
260-
assert_eq!(server_stream.recv().await.unwrap(), message_1);
261-
assert_eq!(server_stream.recv().await.unwrap(), message_2);
259+
let (mut server_stream_reader, _) = new_message_stream::<_, Vec<u8>>(server_stream, None);
260+
assert_eq!(server_stream_reader.recv().await.unwrap(), message_1);
261+
assert_eq!(server_stream_reader.recv().await.unwrap(), message_2);
262262
}
263263

264264
#[tracing::instrument]

networking/src/transport/impls/tcp.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ mod tests {
156156

157157
use crate::{
158158
test_helpers::{TestTransportMaker, TestTransportTcp},
159-
transport::BufferedTranscoder,
159+
transport::new_message_stream,
160160
};
161161

162162
use super::*;
@@ -178,12 +178,13 @@ mod tests {
178178
let peer_stream = peer_res.unwrap();
179179

180180
let message = gen_random_bytes(&mut rng, 0, 1000);
181-
let mut peer_stream = BufferedTranscoder::new(peer_stream, Some(message.encoded_size()));
182-
peer_stream.send(message.clone()).await.unwrap();
181+
let (_, mut peer_stream_writer) =
182+
new_message_stream(peer_stream, Some(message.encoded_size()));
183+
peer_stream_writer.send(message.clone()).await.unwrap();
183184

184-
let mut server_stream =
185-
BufferedTranscoder::<_, Vec<u8>>::new(server_stream, Some(message.encoded_size()));
186-
assert_eq!(server_stream.recv().await.unwrap(), message);
185+
let (mut server_stream_reader, _) =
186+
new_message_stream::<_, Vec<u8>>(server_stream, Some(message.encoded_size()));
187+
assert_eq!(server_stream_reader.recv().await.unwrap(), message);
187188
}
188189

189190
#[tracing::instrument(skip(seed))]
@@ -205,12 +206,12 @@ mod tests {
205206
let message_1 = gen_random_bytes(&mut rng, 0, 1000);
206207
let message_2 = gen_random_bytes(&mut rng, 0, 1000);
207208

208-
let mut peer_stream = BufferedTranscoder::new(peer_stream, None);
209-
peer_stream.send(message_1.clone()).await.unwrap();
210-
peer_stream.send(message_2.clone()).await.unwrap();
209+
let (_, mut peer_stream_writer) = new_message_stream(peer_stream, None);
210+
peer_stream_writer.send(message_1.clone()).await.unwrap();
211+
peer_stream_writer.send(message_2.clone()).await.unwrap();
211212

212-
let mut server_stream = BufferedTranscoder::<_, Vec<u8>>::new(server_stream, None);
213-
assert_eq!(server_stream.recv().await.unwrap(), message_1);
214-
assert_eq!(server_stream.recv().await.unwrap(), message_2);
213+
let (mut server_stream_reader, _) = new_message_stream::<_, Vec<u8>>(server_stream, None);
214+
assert_eq!(server_stream_reader.recv().await.unwrap(), message_1);
215+
assert_eq!(server_stream_reader.recv().await.unwrap(), message_2);
215216
}
216217
}

0 commit comments

Comments
 (0)