Skip to content

Commit aead1d8

Browse files
committed
refactor: drop batch lock early and more readable code
1 parent 5695a7a commit aead1d8

2 files changed

Lines changed: 65 additions & 26 deletions

File tree

batcher/aligned-batcher/src/lib.rs

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ pub struct Batcher {
8888
max_proof_size: usize,
8989
max_batch_byte_size: usize,
9090
max_batch_proof_qty: usize,
91-
max_queue_size: usize,
9291
last_uploaded_batch_block: Mutex<u64>,
9392
pre_verification_is_enabled: bool,
9493
non_paying_config: Option<NonPayingConfig>,
@@ -211,7 +210,7 @@ impl Batcher {
211210
.expect("Failed to get fallback Service Manager contract");
212211

213212
let mut user_states = HashMap::new();
214-
let mut batch_state = BatchState::new();
213+
let mut batch_state = BatchState::new(config.batcher.max_queue_size);
215214
let non_paying_config = if let Some(non_paying_config) = config.batcher.non_paying {
216215
warn!("Non-paying address configuration detected. Will replace non-paying address {} with configured address.",
217216
non_paying_config.address);
@@ -229,7 +228,8 @@ impl Batcher {
229228
non_paying_user_state,
230229
);
231230

232-
batch_state = BatchState::new_with_user_states(user_states);
231+
batch_state =
232+
BatchState::new_with_user_states(user_states, config.batcher.max_queue_size);
233233
Some(non_paying_config)
234234
} else {
235235
None
@@ -264,7 +264,6 @@ impl Batcher {
264264
max_proof_size: config.batcher.max_proof_size,
265265
max_batch_byte_size: config.batcher.max_batch_byte_size,
266266
max_batch_proof_qty: config.batcher.max_batch_proof_qty,
267-
max_queue_size: config.batcher.max_queue_size,
268267
last_uploaded_batch_block: Mutex::new(last_uploaded_batch_block),
269268
pre_verification_is_enabled: config.batcher.pre_verification_is_enabled,
270269
non_paying_config,
@@ -800,30 +799,51 @@ impl Batcher {
800799
// * Perform validation over batcher queue *
801800
// * ---------------------------------------------------------------------*
802801

803-
if batch_state_lock.batch_queue.len() == self.max_queue_size {
804-
// Check if the new proof have more priority than the lowest pirority entry
805-
if let Some((_, lowest_priority_entry_priority)) = batch_state_lock.batch_queue.peek() {
806-
if *lowest_priority_entry_priority
807-
> BatchQueueEntryPriority::new(
808-
nonced_verification_data.max_fee,
809-
nonced_verification_data.nonce,
810-
)
811-
{
812-
let (removed_entry, _) = batch_state_lock.batch_queue.pop().unwrap();
802+
if batch_state_lock.is_queue_full() {
803+
info!("Batch queue is full. Evaluating if the incoming proof can replace a lower-priority entry.");
804+
805+
let msg_entry_priority = BatchQueueEntryPriority::new(
806+
nonced_verification_data.max_fee,
807+
nonced_verification_data.nonce,
808+
);
809+
810+
if let Some(lowest_entry_priority) = batch_state_lock.lowest_entry_priority() {
811+
// If the new proof has more priority than the lowest one in the queue, discard the latter one and push the new one
812+
if msg_entry_priority > lowest_entry_priority {
813+
let Some((removed_entry, _)) = batch_state_lock.batch_queue.pop() else {
814+
warn!("Failed to remove lowest-priority proof despite queue being full.");
815+
std::mem::drop(batch_state_lock);
816+
send_message(
817+
ws_conn_sink.clone(),
818+
SubmitProofResponseMessage::BatchQueueLimitExceededError,
819+
)
820+
.await;
821+
return Ok(());
822+
};
823+
813824
info!(
814-
"Removing proof from entry. Sender {}, Nonce {}.",
815-
removed_entry.sender, removed_entry.nonced_verification_data.nonce
825+
"Incoming proof (nonce: {}, fee: {}) has higher priority. Replacing lowest priority proof from sender {} with nonce {}.",
826+
nonced_verification_data.nonce,
827+
nonced_verification_data.max_fee,
828+
removed_entry.sender,
829+
removed_entry.nonced_verification_data.nonce
816830
);
817831

818832
batch_state_lock.remove_entry_from_user_state(&removed_entry);
819-
send_message(
820-
removed_entry.messaging_sink.unwrap(),
821-
SubmitProofResponseMessage::BatchQueueLimitExceededError,
822-
)
823-
.await;
833+
if let Some(removed_entry_ws) = removed_entry.messaging_sink {
834+
send_message(
835+
removed_entry_ws,
836+
SubmitProofResponseMessage::BatchQueueLimitExceededError,
837+
)
838+
.await;
839+
};
824840
} else {
825-
// Can't add new entry with less priority to the batch queue
826-
error!("Can't add new entry, the batcher queue is full");
841+
warn!(
842+
"Incoming proof (nonce: {}, fee: {}) has lower priority than all entries in the full queue. Rejecting submission.",
843+
nonced_verification_data.nonce,
844+
nonced_verification_data.max_fee
845+
);
846+
std::mem::drop(batch_state_lock);
827847
send_message(
828848
ws_conn_sink.clone(),
829849
SubmitProofResponseMessage::BatchQueueLimitExceededError,
@@ -1767,7 +1787,7 @@ impl Batcher {
17671787

17681788
let batch_state_lock = self.batch_state.lock().await;
17691789

1770-
if batch_state_lock.batch_queue.len() == self.max_queue_size {
1790+
if batch_state_lock.is_queue_full() {
17711791
error!("Can't add new entry, the batcher queue is full");
17721792
send_message(
17731793
ws_sink.clone(),

batcher/aligned-batcher/src/types/batch_state.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use std::collections::{hash_map::Entry, HashMap};
22

3+
use crate::BatchQueueEntryPriority;
4+
35
use super::{
46
batch_queue::{BatchQueue, BatchQueueEntry},
57
user_state::UserState,
@@ -10,22 +12,28 @@ use log::debug;
1012
pub(crate) struct BatchState {
1113
pub(crate) batch_queue: BatchQueue,
1214
pub(crate) user_states: HashMap<Address, UserState>,
15+
pub(crate) max_size: usize,
1316
}
1417

1518
impl BatchState {
1619
// CONSTRUCTORS:
1720

18-
pub(crate) fn new() -> Self {
21+
pub(crate) fn new(max_size: usize) -> Self {
1922
Self {
2023
batch_queue: BatchQueue::new(),
2124
user_states: HashMap::new(),
25+
max_size,
2226
}
2327
}
2428

25-
pub(crate) fn new_with_user_states(user_states: HashMap<Address, UserState>) -> Self {
29+
pub(crate) fn new_with_user_states(
30+
user_states: HashMap<Address, UserState>,
31+
max_size: usize,
32+
) -> Self {
2633
Self {
2734
batch_queue: BatchQueue::new(),
2835
user_states,
36+
max_size,
2937
}
3038
}
3139

@@ -250,4 +258,15 @@ impl BatchState {
250258
}
251259
}
252260
}
261+
262+
pub(crate) fn is_queue_full(&self) -> bool {
263+
self.batch_queue.len() >= self.max_size
264+
}
265+
266+
pub(crate) fn lowest_entry_priority(&self) -> Option<BatchQueueEntryPriority> {
267+
match self.batch_queue.peek() {
268+
Some((_, priority_entry)) => Some(priority_entry.clone()),
269+
None => None,
270+
}
271+
}
253272
}

0 commit comments

Comments
 (0)