Skip to content

Commit 50b225b

Browse files
committed
Add remove from q logic
1 parent 0bb312c commit 50b225b

2 files changed

Lines changed: 47 additions & 36 deletions

File tree

batcher/aligned-batcher/src/lib.rs

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use tokio::time::{timeout, Instant};
1616
use types::batch_state::BatchState;
1717
use types::user_state::UserState;
1818

19-
use batch_queue::{calculate_batch_size, try_push_to_queue};
19+
use batch_queue::{calculate_batch_size, get_lowest_priority_entry};
2020
use std::collections::HashMap;
2121
use std::env;
2222
use std::net::SocketAddr;
@@ -1049,19 +1049,36 @@ impl Batcher {
10491049

10501050
let max_fee = verification_data.max_fee;
10511051
let nonce = verification_data.nonce;
1052-
try_push_to_queue(
1053-
&mut batch_state_lock.batch_queue,
1054-
BatchQueueEntry::new(
1055-
verification_data,
1056-
verification_data_comm,
1057-
ws_conn_sink,
1058-
proof_submitter_sig,
1059-
proof_submitter_addr,
1060-
),
1061-
BatchQueueEntryPriority::new(max_fee, nonce),
1062-
self.max_batch_byte_size,
1063-
self.max_batch_proof_qty,
1064-
)?;
1052+
1053+
let batch_queue_len = batch_state_lock.batch_queue.len();
1054+
1055+
let new_entry = BatchQueueEntry::new(
1056+
verification_data,
1057+
verification_data_comm,
1058+
ws_conn_sink,
1059+
proof_submitter_sig,
1060+
proof_submitter_addr,
1061+
);
1062+
let new_entry_priority = BatchQueueEntryPriority::new(max_fee, nonce) ;
1063+
1064+
if batch_queue_len + 1 > self.max_batch_proof_qty {
1065+
let (lowest_priority_entry, lowest_priority_entry_priority) =
1066+
get_lowest_priority_entry(&batch_state_lock.batch_queue).unwrap();
1067+
1068+
if lowest_priority_entry_priority < new_entry_priority {
1069+
if batch_state_lock.batch_queue.remove(&lowest_priority_entry).is_none(){
1070+
//err
1071+
}
1072+
1073+
// todo send msg to the removed entry ws
1074+
1075+
batch_state_lock.batch_queue.push(new_entry, new_entry_priority);
1076+
} else {
1077+
// can't accept the new proof
1078+
}
1079+
} else {
1080+
batch_state_lock.batch_queue.push(new_entry, new_entry_priority );
1081+
}
10651082

10661083
// Update metrics
10671084
let queue_len = batch_state_lock.batch_queue.len();

batcher/aligned-batcher/src/types/batch_queue.rs

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -212,29 +212,23 @@ fn calculate_fee_per_proof(batch_len: usize, gas_price: U256, constant_gas_cost:
212212
U256::from(gas_per_proof) * gas_price
213213
}
214214

215-
pub(crate) fn try_push_to_queue(
216-
batch_queue: &mut BatchQueue,
217-
item: BatchQueueEntry,
218-
priority: BatchQueueEntryPriority,
219-
max_batch_byte_size: usize,
220-
max_batch_proof_qty: usize,
221-
) -> Result<(), BatcherError> {
222-
let queue_len = batch_queue.len();
223-
let queue_size_bytes = calculate_batch_size(&batch_queue)?;
224-
225-
let verification_data_bytes =
226-
cbor_serialize(&item.nonced_verification_data.verification_data)
227-
.map_err(|e|{BatcherError::SerializationError(e.to_string())})?;
228-
229-
if queue_len + 1 > max_batch_proof_qty ||
230-
queue_size_bytes + verification_data_bytes.len() + CBOR_ARRAY_MAX_OVERHEAD > max_batch_byte_size
231-
{
232-
// do something
233-
} else {
234-
batch_queue.push(item, priority);
215+
pub(crate) fn get_lowest_priority_entry(batch_queue: &BatchQueue) -> Option<(BatchQueueEntry, BatchQueueEntryPriority)> {
216+
let mut lowest_fee_entry: Option<(BatchQueueEntry, BatchQueueEntryPriority)> = None;
217+
218+
for (entry, priority) in batch_queue {
219+
match &lowest_fee_entry {
220+
Some((e, p)) => {
221+
if *priority < *p {
222+
lowest_fee_entry = Some((e.clone(), p.clone()));
223+
}
224+
}
225+
None => {
226+
lowest_fee_entry = Some((entry.clone(), priority.clone()));
227+
}
228+
}
235229
}
236-
237-
Ok(())
230+
231+
lowest_fee_entry
238232
}
239233

240234
#[cfg(test)]

0 commit comments

Comments
 (0)