Skip to content

Commit 1318774

Browse files
feat (batcher): dont retry + gas escalator + sequential txs (#680)
Co-authored-by: Mariano A. Nicolini <mariano.nicolini.91@gmail.com>
1 parent 4755fb0 commit 1318774

3 files changed

Lines changed: 118 additions & 114 deletions

File tree

Lines changed: 24 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,93 +1,52 @@
1-
use std::iter::repeat;
21
use std::str::FromStr;
32
use std::sync::Arc;
4-
use std::time::Duration;
53

6-
use aligned_sdk::eth::batcher_payment_service::{BatcherPaymentServiceContract, SignatureData};
4+
use aligned_sdk::eth::batcher_payment_service::BatcherPaymentServiceContract;
75
use ethers::prelude::k256::ecdsa::SigningKey;
86
use ethers::prelude::*;
9-
use log::{error, info, warn};
10-
use tokio::time::sleep;
7+
use gas_escalator::{Frequency, GeometricGasPrice};
118

12-
const CREATE_NEW_TASK_MAX_RETRIES: usize = 15;
13-
const CREATE_NEW_TASK_MILLISECS_BETWEEN_RETRIES: u64 = 2000;
14-
15-
use crate::{config::ECDSAConfig, types::errors::BatcherError};
9+
use crate::config::ECDSAConfig;
1610

1711
#[derive(Debug, Clone, EthEvent)]
1812
pub struct BatchVerified {
1913
pub batch_merkle_root: [u8; 32],
2014
}
2115

22-
pub type BatcherPaymentService =
23-
BatcherPaymentServiceContract<SignerMiddleware<Provider<Http>, Wallet<SigningKey>>>;
16+
pub type BatcherPaymentService = BatcherPaymentServiceContract<
17+
SignerMiddleware<GasEscalatorMiddleware<Provider<RetryClient<Http>>>, Wallet<SigningKey>>,
18+
>;
2419

25-
pub fn get_provider(eth_rpc_url: String) -> Result<Provider<Http>, anyhow::Error> {
26-
Provider::<Http>::try_from(eth_rpc_url).map_err(|err| anyhow::anyhow!(err))
27-
}
20+
const MAX_RETRIES: u32 = 15; // Max retries for the retry client. Will only retry on network errors
21+
const INITIAL_BACKOFF: u64 = 1000; // Initial backoff for the retry client in milliseconds, will increase every retry
22+
const GAS_MULTIPLIER: f64 = 1.125; // Multiplier for the gas price for gas escalator
23+
const GAS_ESCALATOR_INTERVAL: u64 = 12; // Time in seconds between gas escalations
2824

29-
pub async fn create_new_task(
30-
payment_service: &BatcherPaymentService,
31-
batch_merkle_root: [u8; 32],
32-
batch_data_pointer: String,
33-
leaves: Vec<[u8; 32]>,
34-
signatures: Vec<SignatureData>,
35-
gas_for_aggregator: U256,
36-
gas_per_proof: U256,
37-
) -> Result<TransactionReceipt, BatcherError> {
38-
// pad leaves to next power of 2
39-
let padded_leaves = pad_leaves(leaves);
25+
pub fn get_provider(eth_rpc_url: String) -> Result<Provider<RetryClient<Http>>, anyhow::Error> {
26+
let provider = Http::from_str(eth_rpc_url.as_str())
27+
.map_err(|e| anyhow::Error::msg(format!("Failed to create provider: {}", e)))?;
4028

41-
let call = payment_service.create_new_task(
42-
batch_merkle_root,
43-
batch_data_pointer,
44-
padded_leaves,
45-
signatures,
46-
gas_for_aggregator,
47-
gas_per_proof,
29+
let client = RetryClient::new(
30+
provider,
31+
Box::<ethers::providers::HttpRateLimitRetryPolicy>::default(),
32+
MAX_RETRIES,
33+
INITIAL_BACKOFF,
4834
);
4935

50-
// If there was a pending transaction from a previously sent batch, the `call.send()` will
51-
// fail because of the nonce not being updated. We should retry sending and not returning an error
52-
// immediatly.
53-
info!("Creating task for: {:x?}", batch_merkle_root);
54-
55-
for i in 0..CREATE_NEW_TASK_MAX_RETRIES {
56-
match call.send().await {
57-
Ok(pending_tx) => match pending_tx.await {
58-
Ok(Some(receipt)) => return Ok(receipt),
59-
Ok(None) => return Err(BatcherError::ReceiptNotFoundError),
60-
Err(_) => return Err(BatcherError::TransactionSendError),
61-
},
62-
Err(error) => {
63-
if i != CREATE_NEW_TASK_MAX_RETRIES - 1 {
64-
warn!(
65-
"Error when trying to create a task: {}\n Retrying ...",
66-
error
67-
);
68-
} else {
69-
error!("Error when trying to create a task on last retry. Batch task {:x?} will be lost", batch_merkle_root);
70-
return Err(BatcherError::TaskCreationError(error.to_string()));
71-
}
72-
}
73-
};
74-
75-
sleep(Duration::from_millis(
76-
CREATE_NEW_TASK_MILLISECS_BETWEEN_RETRIES,
77-
))
78-
.await;
79-
}
80-
81-
Err(BatcherError::MaxRetriesReachedError)
36+
Ok(Provider::<RetryClient<Http>>::new(client))
8237
}
8338

8439
pub async fn get_batcher_payment_service(
85-
provider: Provider<Http>,
40+
provider: Provider<RetryClient<Http>>,
8641
ecdsa_config: ECDSAConfig,
8742
contract_address: String,
8843
) -> Result<BatcherPaymentService, anyhow::Error> {
8944
let chain_id = provider.get_chainid().await?;
9045

46+
let escalator = GeometricGasPrice::new(GAS_MULTIPLIER, GAS_ESCALATOR_INTERVAL, None::<u64>);
47+
48+
let provider = GasEscalatorMiddleware::new(provider, escalator, Frequency::PerBlock);
49+
9150
// get private key from keystore
9251
let wallet = Wallet::decrypt_keystore(
9352
&ecdsa_config.private_key_store_path,
@@ -102,12 +61,3 @@ pub async fn get_batcher_payment_service(
10261

10362
Ok(service_manager)
10463
}
105-
106-
fn pad_leaves(leaves: Vec<[u8; 32]>) -> Vec<[u8; 32]> {
107-
let leaves_len = leaves.len();
108-
let last_leaf = leaves[leaves_len - 1];
109-
leaves
110-
.into_iter()
111-
.chain(repeat(last_leaf).take(leaves_len.next_power_of_two() - leaves_len))
112-
.collect()
113-
}

batcher/aligned-batcher/src/lib.rs

Lines changed: 93 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use serde::Serialize;
99
use std::collections::hash_map::Entry;
1010
use std::collections::HashMap;
1111
use std::env;
12+
use std::iter::repeat;
1213
use std::net::SocketAddr;
1314
use std::sync::Arc;
1415

@@ -20,7 +21,7 @@ use aws_sdk_s3::client::Client as S3Client;
2021
use eth::BatcherPaymentService;
2122
use ethers::prelude::{Middleware, Provider};
2223
use ethers::providers::Ws;
23-
use ethers::types::{Address, Signature, U256};
24+
use ethers::types::{Address, Signature, TransactionReceipt, U256};
2425
use futures_util::stream::{self, SplitSink};
2526
use futures_util::{future, SinkExt, StreamExt, TryStreamExt};
2627
use lambdaworks_crypto::merkle_tree::merkle::MerkleTree;
@@ -232,36 +233,34 @@ impl Batcher {
232233

233234
let nonce = U256::from_big_endian(client_msg.verification_data.nonce.as_slice());
234235
let nonced_verification_data = client_msg.verification_data;
235-
if nonced_verification_data.verification_data.proof.len() <= self.max_proof_size {
236-
// When pre-verification is enabled, batcher will verify proofs for faster feedback with clients
237-
if self.pre_verification_is_enabled
238-
&& !zk_utils::verify(&nonced_verification_data.verification_data)
239-
{
240-
error!("Invalid proof detected. Verification failed.");
241-
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidProof)
242-
.await;
243-
return Ok(()); // Send error message to the client and return
244-
}
245-
246-
// Doing nonce verification after proof verification to avoid unnecessary nonce increment
247-
if !self.check_nonce_and_increment(addr, nonce).await {
248-
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidNonce)
249-
.await;
250-
return Ok(()); // Send error message to the client and return
251-
}
252-
253-
self.add_to_batch(
254-
nonced_verification_data,
255-
ws_conn_sink.clone(),
256-
client_msg.signature,
257-
)
258-
.await;
259-
} else {
260-
error!("Proof is too large");
236+
if nonced_verification_data.verification_data.proof.len() > self.max_proof_size {
237+
error!("Proof size exceeds the maximum allowed size.");
261238
send_message(ws_conn_sink.clone(), ValidityResponseMessage::ProofTooLarge)
262239
.await;
240+
return Ok(());
241+
}
242+
243+
// When pre-verification is enabled, batcher will verify proofs for faster feedback with clients
244+
if self.pre_verification_is_enabled
245+
&& !zk_utils::verify(&nonced_verification_data.verification_data)
246+
{
247+
error!("Invalid proof detected. Verification failed.");
248+
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidProof).await;
263249
return Ok(()); // Send error message to the client and return
264-
};
250+
}
251+
252+
// Doing nonce verification after proof verification to avoid unnecessary nonce increment
253+
if !self.check_nonce_and_increment(addr, nonce).await {
254+
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidNonce).await;
255+
return Ok(()); // Send error message to the client and return
256+
}
257+
258+
self.add_to_batch(
259+
nonced_verification_data,
260+
ws_conn_sink.clone(),
261+
client_msg.signature,
262+
)
263+
.await;
265264

266265
info!("Verification data message handled");
267266

@@ -551,7 +550,6 @@ impl Batcher {
551550
info!("Batch sent to S3 with name: {}", file_name);
552551

553552
info!("Uploading batch to contract");
554-
let payment_service = &self.payment_service;
555553
let batch_data_pointer = "https://".to_owned() + &self.s3_bucket_name + "/" + &file_name;
556554

557555
let num_proofs_in_batch = leaves.len();
@@ -566,19 +564,75 @@ impl Batcher {
566564
.map(|(i, signature)| SignatureData::new(signature, nonces[i]))
567565
.collect();
568566

569-
eth::create_new_task(
570-
payment_service,
571-
*batch_merkle_root,
567+
match self
568+
.create_new_task(
569+
*batch_merkle_root,
570+
batch_data_pointer,
571+
leaves,
572+
signatures,
573+
AGGREGATOR_COST.into(),
574+
gas_per_proof.into(),
575+
)
576+
.await
577+
{
578+
Ok(_) => {
579+
info!("Batch verification task created on Aligned contract");
580+
Ok(())
581+
}
582+
Err(e) => {
583+
error!(
584+
"Failed to send batch to contract, batch will be lost: {:?}",
585+
e
586+
);
587+
588+
Err(e)
589+
}
590+
}
591+
}
592+
593+
async fn create_new_task(
594+
&self,
595+
batch_merkle_root: [u8; 32],
596+
batch_data_pointer: String,
597+
leaves: Vec<[u8; 32]>,
598+
signatures: Vec<SignatureData>,
599+
gas_for_aggregator: U256,
600+
gas_per_proof: U256,
601+
) -> Result<TransactionReceipt, BatcherError> {
602+
// pad leaves to next power of 2
603+
let padded_leaves = Self::pad_leaves(leaves);
604+
605+
let call = self.payment_service.create_new_task(
606+
batch_merkle_root,
572607
batch_data_pointer,
573-
leaves,
608+
padded_leaves,
574609
signatures,
575-
AGGREGATOR_COST.into(), // FIXME(uri): This value should be read from aligned_layer/contracts/script/deploy/config/devnet/batcher-payment-service.devnet.config.json
576-
gas_per_proof.into(), //FIXME(uri): This value should be read from aligned_layer/contracts/script/deploy/config/devnet/batcher-payment-service.devnet.config.json
577-
)
578-
.await?;
610+
gas_for_aggregator,
611+
gas_per_proof,
612+
);
579613

580-
info!("Batch verification task created on Aligned contract");
581-
Ok(())
614+
info!("Creating task for: {}", hex::encode(batch_merkle_root));
615+
616+
let pending_tx = call
617+
.send()
618+
.await
619+
.map_err(|e| BatcherError::TaskCreationError(e.to_string()))?;
620+
621+
let receipt = pending_tx
622+
.await
623+
.map_err(|_| BatcherError::TransactionSendError)?
624+
.ok_or(BatcherError::ReceiptNotFoundError)?;
625+
626+
Ok(receipt)
627+
}
628+
629+
fn pad_leaves(leaves: Vec<[u8; 32]>) -> Vec<[u8; 32]> {
630+
let leaves_len = leaves.len();
631+
let last_leaf = leaves[leaves_len - 1];
632+
leaves
633+
.into_iter()
634+
.chain(repeat(last_leaf).take(leaves_len.next_power_of_two() - leaves_len))
635+
.collect()
582636
}
583637

584638
/// Only relevant for testing and for users to easily use Aligned

contracts/scripts/anvil/state/alignedlayer-deployed-anvil-state.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)