Skip to content

Commit 9bae879

Browse files
committed
Merge branch 'staging' into refactor/rust-crates
2 parents 7ce1641 + c94d77c commit 9bae879

8 files changed

Lines changed: 88 additions & 113 deletions

File tree

aggregation_mode/src/backend/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub struct Config {
2222
pub last_aggregated_block_filepath: String,
2323
pub ecdsa: ECDSAConfig,
2424
pub proofs_per_chunk: u16,
25+
pub total_proofs_limit: u16,
2526
}
2627

2728
impl Config {

aggregation_mode/src/backend/fetcher.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ impl ProofsFetcher {
5555
pub async fn fetch(
5656
&mut self,
5757
engine: ZKVMEngine,
58+
limit: u16,
5859
) -> Result<Vec<AlignedProof>, ProofsFetcherError> {
5960
// Get current block
6061
let current_block = self
@@ -86,12 +87,9 @@ impl ProofsFetcher {
8687

8788
info!("Logs collected {}", logs.len());
8889

89-
// Update last processed block after collecting logs
90-
self.last_aggregated_block = current_block;
91-
9290
let mut proofs = vec![];
9391

94-
for (batch, _) in logs {
92+
for (batch, log) in logs {
9593
info!(
9694
"New batch submitted, about to process. Batch merkle root {}...",
9795
batch.batchMerkleRoot
@@ -153,6 +151,18 @@ impl ProofsFetcher {
153151
proofs_to_add.len()
154152
);
155153

154+
if (proofs.len() + proofs_to_add.len()) > (limit as usize) {
155+
let log_block_number = log.block_number.unwrap();
156+
info!(
157+
"Limit of {} proofs reached, stopping at block number {}, which is {} from current block",
158+
limit, log_block_number, current_block - log_block_number
159+
);
160+
// Update last processed block to this log block number
161+
// So the next aggregation starts at this block
162+
self.last_aggregated_block = log_block_number;
163+
return Ok(proofs);
164+
}
165+
156166
// try to add them to the queue
157167
for proof in proofs_to_add {
158168
if let Err(err) = proof.verify() {
@@ -164,6 +174,9 @@ impl ProofsFetcher {
164174
}
165175
}
166176

177+
// Update last processed block after collecting logs
178+
self.last_aggregated_block = current_block;
179+
167180
Ok(proofs)
168181
}
169182

aggregation_mode/src/backend/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ impl ProofAggregator {
9797
) -> Result<(), AggregatedProofSubmissionError> {
9898
let proofs = self
9999
.fetcher
100-
.fetch(self.engine.clone())
100+
.fetch(self.engine.clone(), self.config.total_proofs_limit)
101101
.await
102102
.map_err(AggregatedProofSubmissionError::FetchingProofs)?;
103103

config-files/config-proof-aggregator-ethereum-package.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,13 @@ eth_ws_url: "ws://localhost:8546"
55
max_proofs_in_queue: 1000
66
last_aggregated_block_filepath: config-files/proof-aggregator.last_aggregated_block.json
77
proofs_per_chunk: 512 # Amount of proofs to process per chunk
8+
# This number comes from the blob data limit
9+
# Since each blob has a capacity of (4096 * 32) = 131.072 bytes
10+
# But to not surpass the field modulus we pad with a 0xo byte so we have (4096 * 31) = 126.976 bytes
11+
# of usable data
12+
# Since each proof commitments takes 32 bytes hash
13+
# We can aggregate as much proofs as 126.976 / 32 = 3968 per blob
14+
total_proofs_limit: 3968
815

916
ecdsa:
1017
private_key_store_path: "config-files/anvil.proof-aggregator.ecdsa.key.json"

config-files/config-proof-aggregator-mock-ethereum-package.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@ eth_ws_url: "ws://localhost:8546"
55
max_proofs_in_queue: 1000
66
last_aggregated_block_filepath: config-files/proof-aggregator.last_aggregated_block.json
77
proofs_per_chunk: 512 # Amount of proofs to process per chunk
8+
# This number comes from the blob data limit
9+
# Since each blob has a capacity of (4096 * 32) = 131.072 bytes
10+
# But to not surpass the field modulus we pad with a 0xo byte so we have (4096 * 31) = 126.976 bytes
11+
# of usable data
12+
# Since each proof commitments takes 32 bytes hash
13+
# We can aggregate as much proofs as 126.976 / 32 = 3968 per blob
14+
total_proofs_limit: 3968
15+
816

917
ecdsa:
1018
private_key_store_path: "config-files/anvil.proof-aggregator.ecdsa.key.json"

config-files/config-proof-aggregator-mock.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@ eth_ws_url: "ws://localhost:8545"
55
max_proofs_in_queue: 1000
66
last_aggregated_block_filepath: config-files/proof-aggregator.last_aggregated_block.json
77
proofs_per_chunk: 512 # Amount of proofs to process per chunk
8+
# This number comes from the blob data limit
9+
# Since each blob has a capacity of (4096 * 32) = 131.072 bytes
10+
# But to not surpass the field modulus we pad with a 0xo byte so we have (4096 * 31) = 126.976 bytes
11+
# of usable data
12+
# Since each proof commitments takes 32 bytes hash
13+
# We can aggregate as much proofs as 126.976 / 32 = 3968 per blob
14+
total_proofs_limit: 3968
15+
816

917
ecdsa:
1018
private_key_store_path: "config-files/anvil.proof-aggregator.ecdsa.key.json"

config-files/config-proof-aggregator.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@ eth_ws_url: "ws://localhost:8545"
55
max_proofs_in_queue: 1000
66
last_aggregated_block_filepath: config-files/proof-aggregator.last_aggregated_block.json
77
proofs_per_chunk: 512 # Amount of proofs to process per chunk
8+
# This number comes from the blob data limit
9+
# Since each blob has a capacity of (4096 * 32) = 131.072 bytes
10+
# But to not surpass the field modulus we pad with a 0xo byte so we have (4096 * 31) = 126.976 bytes
11+
# of usable data
12+
# Since each proof commitments takes 32 bytes hash
13+
# We can aggregate as much proofs as 126.976 / 32 = 3968 per blob
14+
total_proofs_limit: 3968
15+
816

917
ecdsa:
1018
private_key_store_path: "config-files/anvil.proof-aggregator.ecdsa.key.json"

crates/batcher/src/lib.rs

Lines changed: 38 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,8 @@ impl Batcher {
499499
mut address: Address,
500500
ws_conn_sink: WsMessageSink,
501501
) -> Result<(), Error> {
502-
if self.is_nonpaying(&address) {
502+
// If the address is not paying, we will return the nonce of the aligned_payment_address
503+
if !self.has_to_pay(&address) {
503504
info!("Handling nonpaying message");
504505
let Some(non_paying_config) = self.non_paying_config.as_ref() else {
505506
warn!(
@@ -551,6 +552,15 @@ impl Batcher {
551552
Ok(())
552553
}
553554

555+
/// Returns the Aligned-funded address that will be used to pay for proofs when users don't need to pay themselves.
556+
/// This function assumes that the non-paying configuration is set.
557+
fn aligned_payment_address(&self) -> Address {
558+
self.non_paying_config
559+
.as_ref()
560+
.map(|config| config.replacement.address())
561+
.unwrap()
562+
}
563+
554564
async fn handle_submit_proof_msg(
555565
self: Arc<Self>,
556566
client_msg: Box<SubmitProofMessage>,
@@ -585,14 +595,29 @@ impl Batcher {
585595
return Ok(());
586596
}
587597

588-
let Some(addr) = self
598+
let Some(addr_in_msg) = self
589599
.msg_signature_is_valid(&client_msg, &ws_conn_sink)
590600
.await
591601
else {
592602
return Ok(());
593603
};
594604

595-
let nonced_verification_data = client_msg.verification_data.clone();
605+
let addr;
606+
let signature = client_msg.signature;
607+
let nonced_verification_data;
608+
609+
if self.has_to_pay(&addr_in_msg) {
610+
addr = addr_in_msg;
611+
nonced_verification_data = client_msg.verification_data.clone();
612+
} else {
613+
info!("Generating non-paying data");
614+
// If the user is not required to pay, substitute their address with a pre-funded Aligned address
615+
addr = self.aligned_payment_address();
616+
// Substitute the max_fee to a high enough value to cover the gas cost of the proof
617+
let mut aux_verification_data = client_msg.verification_data.clone();
618+
aux_verification_data.max_fee = (DEFAULT_MAX_FEE_PER_PROOF * 100).into(); // 2_000 gas per proof * 100 gwei gas price (upper bound) * 100 to make sure it is enough
619+
nonced_verification_data = aux_verification_data
620+
}
596621

597622
// When pre-verification is enabled, batcher will verify proofs for faster feedback with clients
598623
if self.pre_verification_is_enabled {
@@ -634,14 +659,7 @@ impl Batcher {
634659
}
635660
}
636661

637-
if self.is_nonpaying(&addr) {
638-
// TODO: Non paying msg and paying should share some logic
639-
return self
640-
.handle_nonpaying_msg(ws_conn_sink.clone(), &client_msg)
641-
.await;
642-
}
643-
644-
info!("Handling paying message");
662+
info!("Handling message");
645663

646664
// We don't need a batch state lock here, since if the user locks its funds
647665
// after the check, some blocks should pass until he can withdraw.
@@ -743,6 +761,7 @@ impl Batcher {
743761
}
744762

745763
let cached_user_nonce = batch_state_lock.get_user_nonce(&addr).await;
764+
746765
let Some(expected_nonce) = cached_user_nonce else {
747766
error!("Failed to get cached user nonce: User not found in user states, but it should have been already inserted");
748767
std::mem::drop(batch_state_lock);
@@ -867,7 +886,7 @@ impl Batcher {
867886
batch_state_lock,
868887
nonced_verification_data,
869888
ws_conn_sink.clone(),
870-
client_msg.signature,
889+
signature,
871890
addr,
872891
)
873892
.await
@@ -1100,17 +1119,6 @@ impl Batcher {
11001119

11011120
info!("Current batch queue length: {}", queue_len);
11021121

1103-
let mut proof_submitter_addr = proof_submitter_addr;
1104-
1105-
// If the proof submitter is the nonpaying one, we should update the state
1106-
// of the replacement address.
1107-
proof_submitter_addr = if self.is_nonpaying(&proof_submitter_addr) {
1108-
self.get_nonpaying_replacement_addr()
1109-
.unwrap_or(proof_submitter_addr)
1110-
} else {
1111-
proof_submitter_addr
1112-
};
1113-
11141122
let Some(user_proof_count) = batch_state_lock
11151123
.get_user_proof_count(&proof_submitter_addr)
11161124
.await
@@ -1743,98 +1751,20 @@ impl Batcher {
17431751
0.0
17441752
}
17451753

1746-
/// Only relevant for testing and for users to easily use Aligned
1747-
fn is_nonpaying(&self, addr: &Address) -> bool {
1748-
self.non_paying_config
1749-
.as_ref()
1750-
.is_some_and(|non_paying_config| non_paying_config.address == *addr)
1754+
/// An address has to pay if it's on mainnet or is not the special designated address on testnet
1755+
fn has_to_pay(&self, addr: &Address) -> bool {
1756+
self.non_paying_config.is_none()
1757+
|| self
1758+
.non_paying_config
1759+
.as_ref()
1760+
.is_some_and(|non_paying_config| non_paying_config.address != *addr)
17511761
}
17521762

17531763
fn get_nonpaying_replacement_addr(&self) -> Option<Address> {
17541764
let non_paying_conf = self.non_paying_config.as_ref()?;
17551765
Some(non_paying_conf.replacement.address())
17561766
}
17571767

1758-
/// Only relevant for testing and for users to easily use Aligned in testnet.
1759-
async fn handle_nonpaying_msg(
1760-
&self,
1761-
ws_sink: WsMessageSink,
1762-
client_msg: &SubmitProofMessage,
1763-
) -> Result<(), Error> {
1764-
info!("Handling nonpaying message");
1765-
let Some(non_paying_config) = self.non_paying_config.as_ref() else {
1766-
warn!("There isn't a non-paying configuration loaded. This message will be ignored");
1767-
send_message(ws_sink.clone(), SubmitProofResponseMessage::InvalidNonce).await;
1768-
return Ok(());
1769-
};
1770-
1771-
let replacement_addr = non_paying_config.replacement.address();
1772-
let Some(replacement_user_balance) = self.get_user_balance(&replacement_addr).await else {
1773-
error!("Could not get balance for non-paying address {replacement_addr:?}");
1774-
send_message(
1775-
ws_sink.clone(),
1776-
SubmitProofResponseMessage::InsufficientBalance(replacement_addr),
1777-
)
1778-
.await;
1779-
return Ok(());
1780-
};
1781-
1782-
if replacement_user_balance == U256::from(0) {
1783-
error!("Insufficient funds for non-paying address {replacement_addr:?}");
1784-
send_message(
1785-
ws_sink.clone(),
1786-
SubmitProofResponseMessage::InsufficientBalance(replacement_addr),
1787-
)
1788-
.await;
1789-
return Ok(());
1790-
}
1791-
1792-
let batch_state_lock = self.batch_state.lock().await;
1793-
1794-
if batch_state_lock.is_queue_full() {
1795-
error!("Can't add new entry, the batcher queue is full");
1796-
send_message(
1797-
ws_sink.clone(),
1798-
SubmitProofResponseMessage::UnderpricedProof,
1799-
)
1800-
.await;
1801-
return Ok(());
1802-
}
1803-
1804-
let nonced_verification_data = NoncedVerificationData::new(
1805-
client_msg.verification_data.verification_data.clone(),
1806-
client_msg.verification_data.nonce,
1807-
DEFAULT_MAX_FEE_PER_PROOF.into(), // 2_000 gas per proof * 100 gwei gas price (upper bound)
1808-
self.chain_id,
1809-
self.payment_service.address(),
1810-
);
1811-
1812-
let client_msg = SubmitProofMessage::new(
1813-
nonced_verification_data.clone(),
1814-
non_paying_config.replacement.clone(),
1815-
)
1816-
.await;
1817-
1818-
let signature = client_msg.signature;
1819-
if let Err(e) = self
1820-
.add_to_batch(
1821-
batch_state_lock,
1822-
nonced_verification_data,
1823-
ws_sink.clone(),
1824-
signature,
1825-
replacement_addr,
1826-
)
1827-
.await
1828-
{
1829-
info!("Error while adding nonpaying address entry to batch: {e:?}");
1830-
send_message(ws_sink, SubmitProofResponseMessage::AddToBatchError).await;
1831-
return Ok(());
1832-
};
1833-
1834-
info!("Non-paying verification data message handled");
1835-
Ok(())
1836-
}
1837-
18381768
/// Gets the balance of user with address `addr` from Ethereum.
18391769
/// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
18401770
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs)

0 commit comments

Comments
 (0)