Skip to content

Commit de725f7

Browse files
refactor (batcher): put both the nonces and the queue together in a lock (#679)
1 parent d415404 commit de725f7

1 file changed

Lines changed: 70 additions & 38 deletions

File tree

  • batcher/aligned-batcher/src

batcher/aligned-batcher/src/lib.rs

Lines changed: 70 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -52,21 +52,50 @@ const ADDITIONAL_SUBMISSION_COST_PER_PROOF: u128 = 13_000;
5252
const CONSTANT_COST: u128 = AGGREGATOR_COST + BATCHER_SUBMISSION_BASE_COST;
5353
const MIN_BALANCE_PER_PROOF: u128 = ADDITIONAL_SUBMISSION_COST_PER_PROOF * 100_000_000_000; // 100 Gwei = 0.0000001 ether (high gas price)
5454

55+
struct BatchState {
56+
batch_queue: BatchQueue,
57+
user_nonces: HashMap<Address, U256>,
58+
user_proof_count_in_batch: HashMap<Address, u64>,
59+
}
60+
61+
impl BatchState {
62+
fn new() -> Self {
63+
Self {
64+
batch_queue: BatchQueue::new(),
65+
user_nonces: HashMap::new(),
66+
user_proof_count_in_batch: HashMap::new(),
67+
}
68+
}
69+
70+
fn get_user_proof_count(&self, addr: &Address) -> u64 {
71+
*self.user_proof_count_in_batch.get(addr).unwrap_or(&0)
72+
}
73+
74+
/*
75+
Increments the user proof count in the batch, if the user is already in the hashmap.
76+
If the user is not in the hashmap, it adds the user to the hashmap with a count of 1 to represent the first proof.
77+
*/
78+
fn increment_user_proof_count(&mut self, addr: &Address) {
79+
self.user_proof_count_in_batch
80+
.entry(*addr)
81+
.and_modify(|count| *count += 1)
82+
.or_insert(1);
83+
}
84+
}
85+
5586
pub struct Batcher {
5687
s3_client: S3Client,
5788
s3_bucket_name: String,
5889
eth_ws_provider: Provider<Ws>,
5990
payment_service: BatcherPaymentService,
60-
batch_queue: Mutex<BatchQueue>,
91+
batch_state: Mutex<BatchState>,
6192
max_block_interval: u64,
6293
min_batch_len: usize,
6394
max_proof_size: usize,
6495
max_batch_size: usize,
6596
last_uploaded_batch_block: Mutex<u64>,
6697
pre_verification_is_enabled: bool,
6798
non_paying_config: Option<NonPayingConfig>,
68-
user_nonces: Mutex<HashMap<Address, U256>>,
69-
user_proof_count_in_batch: Mutex<HashMap<Address, u64>>,
7099
}
71100

72101
impl Batcher {
@@ -114,23 +143,19 @@ impl Batcher {
114143
None
115144
};
116145

117-
let user_nonces = Mutex::new(HashMap::new());
118-
119146
Self {
120147
s3_client,
121148
s3_bucket_name,
122149
eth_ws_provider,
123150
payment_service,
124-
batch_queue: Mutex::new(BatchQueue::new()),
151+
batch_state: Mutex::new(BatchState::new()),
125152
max_block_interval: config.batcher.block_interval,
126153
min_batch_len: config.batcher.batch_size_interval,
127154
max_proof_size: config.batcher.max_proof_size,
128155
max_batch_size: config.batcher.max_batch_size,
129156
last_uploaded_batch_block: Mutex::new(last_uploaded_batch_block),
130157
pre_verification_is_enabled: config.batcher.pre_verification_is_enabled,
131158
non_paying_config,
132-
user_nonces,
133-
user_proof_count_in_batch: Mutex::new(HashMap::new()),
134159
}
135160
}
136161

@@ -221,7 +246,10 @@ impl Batcher {
221246
self.handle_nonpaying_msg(ws_conn_sink.clone(), client_msg)
222247
.await
223248
} else {
224-
if !self.check_user_balance(&addr).await {
249+
if !self
250+
.check_user_balance_and_increment_proof_count(&addr)
251+
.await
252+
{
225253
send_message(
226254
ws_conn_sink.clone(),
227255
ValidityResponseMessage::InsufficientBalance(addr),
@@ -280,13 +308,13 @@ impl Batcher {
280308

281309
// Checks user has sufficient balance
282310
// If user has sufficient balance, increments the user's proof count in the batch
283-
async fn check_user_balance(&self, addr: &Address) -> bool {
311+
async fn check_user_balance_and_increment_proof_count(&self, addr: &Address) -> bool {
284312
if self.user_balance_is_unlocked(addr).await {
285313
return false;
286314
}
315+
let mut batch_state = self.batch_state.lock().await;
287316

288-
let mut user_proof_counts = self.user_proof_count_in_batch.lock().await;
289-
let user_proofs_in_batch = *user_proof_counts.get(addr).unwrap_or(&0) + 1;
317+
let user_proofs_in_batch = batch_state.get_user_proof_count(addr) + 1;
290318

291319
let user_balance = self.get_user_balance(addr).await;
292320

@@ -295,14 +323,14 @@ impl Batcher {
295323
return false;
296324
}
297325

298-
user_proof_counts.insert(*addr, user_proofs_in_batch);
326+
batch_state.increment_user_proof_count(addr);
299327
true
300328
}
301329

302330
async fn check_nonce_and_increment(&self, addr: Address, nonce: U256) -> bool {
303-
let mut user_nonces = self.user_nonces.lock().await;
331+
let mut batch_state = self.batch_state.lock().await;
304332

305-
let expected_user_nonce = match user_nonces.get(&addr) {
333+
let expected_user_nonce = match batch_state.user_nonces.get(&addr) {
306334
Some(nonce) => *nonce,
307335
None => {
308336
let user_nonce = match self.payment_service.user_nonces(addr).call().await {
@@ -313,7 +341,7 @@ impl Batcher {
313341
}
314342
};
315343

316-
user_nonces.insert(addr, user_nonce);
344+
batch_state.user_nonces.insert(addr, user_nonce);
317345
user_nonce
318346
}
319347
};
@@ -326,7 +354,7 @@ impl Batcher {
326354
return false;
327355
}
328356

329-
user_nonces.insert(addr, nonce + U256::one());
357+
batch_state.user_nonces.insert(addr, nonce + U256::one());
330358
true
331359
}
332360

@@ -337,17 +365,20 @@ impl Batcher {
337365
ws_conn_sink: Arc<RwLock<SplitSink<WebSocketStream<TcpStream>, Message>>>,
338366
proof_submitter_sig: Signature,
339367
) {
340-
let mut batch_queue_lock = self.batch_queue.lock().await;
368+
let mut batch_state = self.batch_state.lock().await;
341369
info!("Calculating verification data commitments...");
342370
let verification_data_comm = verification_data.clone().into();
343371
info!("Adding verification data to batch...");
344-
batch_queue_lock.push((
372+
batch_state.batch_queue.push((
345373
verification_data,
346374
verification_data_comm,
347375
ws_conn_sink,
348376
proof_submitter_sig,
349377
));
350-
info!("Current batch queue length: {}", batch_queue_lock.len());
378+
info!(
379+
"Current batch queue length: {}",
380+
batch_state.batch_queue.len()
381+
);
351382
}
352383

353384
/// Given a new block number listened from the blockchain, checks if the current batch is ready to be posted.
@@ -361,8 +392,8 @@ impl Batcher {
361392
/// and all the elements up to that index are copied and cleared from the batch queue. The copy is then passed to the
362393
/// `finalize_batch` function.
363394
async fn is_batch_ready(&self, block_number: u64) -> Option<BatchQueue> {
364-
let mut batch_queue_lock = self.batch_queue.lock().await;
365-
let current_batch_len = batch_queue_lock.len();
395+
let mut batch_state = self.batch_state.lock().await;
396+
let current_batch_len = batch_state.batch_queue.len();
366397

367398
let last_uploaded_batch_block_lock = self.last_uploaded_batch_block.lock().await;
368399

@@ -383,7 +414,8 @@ impl Batcher {
383414
return None;
384415
}
385416

386-
let batch_verification_data: Vec<NoncedVerificationData> = batch_queue_lock
417+
let batch_verification_data: Vec<NoncedVerificationData> = batch_state
418+
.batch_queue
387419
.iter()
388420
.map(|(vd, _, _, _)| vd.clone())
389421
.collect();
@@ -395,23 +427,26 @@ impl Batcher {
395427
info!("Batch max size exceded. Splitting current batch...");
396428
let mut acc_batch_size = 0;
397429
let mut finalized_batch_idx = 0;
398-
for (idx, (verification_data, _, _, _)) in batch_queue_lock.iter().enumerate() {
430+
for (idx, (verification_data, _, _, _)) in batch_state.batch_queue.iter().enumerate() {
399431
acc_batch_size += serde_json::to_vec(verification_data).unwrap().len();
400432
if acc_batch_size > self.max_batch_size {
401433
finalized_batch_idx = idx;
402434
break;
403435
}
404436
}
405-
let finalized_batch = batch_queue_lock.drain(..finalized_batch_idx).collect();
437+
let finalized_batch = batch_state
438+
.batch_queue
439+
.drain(..finalized_batch_idx)
440+
.collect();
406441
return Some(finalized_batch);
407442
}
408443

409444
// A copy of the batch is made to be returned and the current batch is cleared
410-
let finalized_batch = batch_queue_lock.clone();
411-
batch_queue_lock.clear();
445+
let finalized_batch = batch_state.batch_queue.clone();
446+
batch_state.batch_queue.clear();
412447

413448
// Clear the user proofs in batch as well
414-
self.user_proof_count_in_batch.lock().await.clear();
449+
batch_state.user_proof_count_in_batch.clear();
415450

416451
Some(finalized_batch)
417452
}
@@ -500,18 +535,15 @@ impl Batcher {
500535

501536
async fn flush_queue_and_clear_nonce_cache(&self) {
502537
warn!("Resetting state... Flushing queue and nonces");
538+
let mut batch_state = self.batch_state.lock().await;
503539

504-
let mut batch_queue = self.batch_queue.lock().await;
505-
let mut user_nonces = self.user_nonces.lock().await;
506-
let mut user_proof_count_in_batch = self.user_proof_count_in_batch.lock().await;
507-
508-
for (_, _, ws_sink, _) in batch_queue.iter() {
540+
for (_, _, ws_sink, _) in batch_state.batch_queue.iter() {
509541
send_message(ws_sink.clone(), ResponseMessage::BatchReset).await;
510542
}
511543

512-
batch_queue.clear();
513-
user_nonces.clear();
514-
user_proof_count_in_batch.clear();
544+
batch_state.batch_queue.clear();
545+
batch_state.user_nonces.clear();
546+
batch_state.user_proof_count_in_batch.clear();
515547
}
516548

517549
/// Receives new block numbers, checks if conditions are met for submission and
@@ -679,9 +711,9 @@ impl Batcher {
679711
}
680712

681713
let nonced_verification_data = {
682-
let mut user_nonces = self.user_nonces.lock().await;
714+
let mut batch_state = self.batch_state.lock().await;
683715

684-
let nonpaying_nonce = match user_nonces.entry(addr) {
716+
let nonpaying_nonce = match batch_state.user_nonces.entry(addr) {
685717
Entry::Occupied(o) => o.into_mut(),
686718
Entry::Vacant(vacant) => {
687719
let nonce = self

0 commit comments

Comments
 (0)