Skip to content

Commit 5b16aa9

Browse files
committed
Merge branch 'refs/heads/staging' into 1864-infraaggregation_mode-add-server-setup-and-config-files
2 parents 3ef8f4c + 0d8b99b commit 5b16aa9

67 files changed

Lines changed: 9436 additions & 451 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ batcher/aligned/batch_inclusion_responses/*
1212
**/broadcast
1313
volume
1414
config-files/*.last_processed_batch.json
15+
config-files/*.last_aggregated_block.json
1516

1617
nonce_*.bin
1718

Makefile

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,10 @@ lint_contracts:
143143
@cd contracts && npm run lint:sol
144144

145145
anvil_start:
146-
@echo "Starting Anvil..."
147-
anvil --load-state contracts/scripts/anvil/state/alignedlayer-deployed-anvil-state.json
148-
149-
anvil_start_with_block_time:
150146
@echo "Starting Anvil..."
151147
anvil --load-state contracts/scripts/anvil/state/alignedlayer-deployed-anvil-state.json --block-time 7
152148

153-
anvil_start_with_block_time_with_more_prefunded_accounts:
149+
anvil_start_with_more_prefunded_accounts:
154150
@echo "Starting Anvil..."
155151
anvil --load-state contracts/scripts/anvil/state/alignedlayer-deployed-anvil-state.json --block-time 7 -a 2000
156152

@@ -1171,7 +1167,7 @@ setup_local_aligned_all:
11711167
tmux new-session -d -s aligned_layer
11721168

11731169
tmux new-window -t aligned_layer -n anvil
1174-
tmux send-keys -t aligned_layer 'make anvil_start_with_block_time' C-m
1170+
tmux send-keys -t aligned_layer 'make anvil_start' C-m
11751171

11761172
tmux new-window -t aligned_layer -n aggregator
11771173
tmux send-keys -t aligned_layer:aggregator 'make aggregator_start' C-m

aggregation_mode/aggregation_programs/sp1/src/lib.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,37 +2,37 @@ use serde::{Deserialize, Serialize};
22
use sha3::{Digest, Keccak256};
33

44
#[derive(Serialize, Deserialize)]
5-
pub struct SP1ProofInput {
5+
pub struct SP1VkAndPubInputs {
66
pub vk: [u32; 8],
77
pub public_inputs: Vec<u8>,
88
}
99

10-
impl SP1ProofInput {
10+
impl SP1VkAndPubInputs {
1111
pub fn hash(&self) -> [u8; 32] {
1212
let mut hasher = Keccak256::new();
1313
for &word in &self.vk {
14-
hasher.update(word.to_le_bytes());
14+
hasher.update(word.to_be_bytes());
1515
}
1616
hasher.update(&self.public_inputs);
1717
hasher.finalize().into()
1818
}
1919
}
2020

2121
#[derive(Serialize, Deserialize)]
22-
pub enum ProofInput {
23-
SP1Compressed(SP1ProofInput),
22+
pub enum ProofVkAndPubInputs {
23+
SP1Compressed(SP1VkAndPubInputs),
2424
}
2525

26-
impl ProofInput {
26+
impl ProofVkAndPubInputs {
2727
pub fn hash(&self) -> [u8; 32] {
2828
match self {
29-
ProofInput::SP1Compressed(proof) => proof.hash(),
29+
ProofVkAndPubInputs::SP1Compressed(proof_data) => proof_data.hash(),
3030
}
3131
}
3232
}
3333

3434
#[derive(Serialize, Deserialize)]
3535
pub struct Input {
36-
pub proofs: Vec<ProofInput>,
36+
pub proofs_vk_and_pub_inputs: Vec<ProofVkAndPubInputs>,
3737
pub merkle_root: [u8; 32],
3838
}

aggregation_mode/aggregation_programs/sp1/src/main.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ sp1_zkvm::entrypoint!(main);
33

44
use sha2::{Digest, Sha256};
55
use sha3::Keccak256;
6-
use sp1_aggregation_program::{Input, ProofInput};
6+
use sp1_aggregation_program::{Input, ProofVkAndPubInputs};
77

88
fn combine_hashes(hash_a: &[u8; 32], hash_b: &[u8; 32]) -> [u8; 32] {
99
let mut hasher = Keccak256::new();
@@ -13,7 +13,7 @@ fn combine_hashes(hash_a: &[u8; 32], hash_b: &[u8; 32]) -> [u8; 32] {
1313
}
1414

1515
/// Computes the merkle root for the given proofs using the vk
16-
fn compute_merkle_root(proofs: &[ProofInput]) -> [u8; 32] {
16+
fn compute_merkle_root(proofs: &[ProofVkAndPubInputs]) -> [u8; 32] {
1717
let mut leaves: Vec<[u8; 32]> = proofs
1818
.chunks(2)
1919
.map(|chunk| match chunk {
@@ -41,9 +41,9 @@ pub fn main() {
4141
let input = sp1_zkvm::io::read::<Input>();
4242

4343
// Verify the proofs.
44-
for proof in input.proofs.iter() {
44+
for proof in input.proofs_vk_and_pub_inputs.iter() {
4545
match proof {
46-
ProofInput::SP1Compressed(proof) => {
46+
ProofVkAndPubInputs::SP1Compressed(proof) => {
4747
let vkey = proof.vk;
4848
let public_values = &proof.public_inputs;
4949
let public_values_digest = Sha256::digest(public_values);
@@ -52,7 +52,7 @@ pub fn main() {
5252
}
5353
}
5454

55-
let merkle_root = compute_merkle_root(&input.proofs);
55+
let merkle_root = compute_merkle_root(&input.proofs_vk_and_pub_inputs);
5656

5757
assert_eq!(merkle_root, input.merkle_root);
5858

aggregation_mode/src/aggregators/sp1_aggregator.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
1+
use std::sync::LazyLock;
2+
13
use alloy::primitives::Keccak256;
2-
use sp1_aggregation_program::{ProofInput, SP1ProofInput};
4+
use sp1_aggregation_program::{ProofVkAndPubInputs, SP1VkAndPubInputs};
35
use sp1_sdk::{
4-
HashableKey, Prover, ProverClient, SP1ProofWithPublicValues, SP1Stdin, SP1VerifyingKey,
6+
EnvProver, HashableKey, Prover, ProverClient, SP1ProofWithPublicValues, SP1Stdin,
7+
SP1VerifyingKey,
58
};
69

710
use super::lib::{AggregatedProof, ProgramOutput, ProofAggregationError};
811

912
const PROGRAM_ELF: &[u8] =
1013
include_bytes!("../../aggregation_programs/sp1/elf/sp1_aggregator_program");
1114

15+
static SP1_PROVER_CLIENT: LazyLock<EnvProver> = LazyLock::new(ProverClient::from_env);
16+
1217
pub struct SP1ProofWithPubValuesAndElf {
1318
pub proof_with_pub_values: SP1ProofWithPublicValues,
1419
pub elf: Vec<u8>,
@@ -17,9 +22,8 @@ pub struct SP1ProofWithPubValuesAndElf {
1722
impl SP1ProofWithPubValuesAndElf {
1823
pub fn hash_vk_and_pub_inputs(&self) -> [u8; 32] {
1924
let mut hasher = Keccak256::new();
20-
for &word in &self.vk().hash_u32() {
21-
hasher.update(word.to_le_bytes());
22-
}
25+
let vk_bytes = &self.vk().hash_bytes();
26+
hasher.update(vk_bytes);
2327
hasher.update(self.proof_with_pub_values.public_values.as_slice());
2428
hasher.finalize().into()
2529
}
@@ -40,15 +44,15 @@ pub(crate) fn aggregate_proofs(
4044
let mut stdin = SP1Stdin::new();
4145

4246
let mut program_input = sp1_aggregation_program::Input {
43-
proofs: vec![],
47+
proofs_vk_and_pub_inputs: vec![],
4448
merkle_root: input.merkle_root,
4549
};
4650

4751
// write vk + public inputs
4852
for proof in input.proofs.iter() {
4953
program_input
50-
.proofs
51-
.push(ProofInput::SP1Compressed(SP1ProofInput {
54+
.proofs_vk_and_pub_inputs
55+
.push(ProofVkAndPubInputs::SP1Compressed(SP1VkAndPubInputs {
5256
public_inputs: proof.proof_with_pub_values.public_values.to_vec(),
5357
vk: proof.vk().hash_u32(),
5458
}));
@@ -66,7 +70,7 @@ pub(crate) fn aggregate_proofs(
6670
}
6771

6872
#[cfg(feature = "prove")]
69-
let client = ProverClient::from_env();
73+
let client = &*SP1_PROVER_CLIENT;
7074
// If not in prove mode, create a mock proof via mock client
7175
#[cfg(not(feature = "prove"))]
7276
let client = ProverClient::builder().mock().build();
@@ -102,7 +106,7 @@ pub enum AlignedSP1VerificationError {
102106
pub(crate) fn verify(
103107
sp1_proof_with_pub_values_and_elf: &SP1ProofWithPubValuesAndElf,
104108
) -> Result<(), AlignedSP1VerificationError> {
105-
let client = ProverClient::from_env();
109+
let client = &*SP1_PROVER_CLIENT;
106110

107111
let (_pk, vk) = client.setup(&sp1_proof_with_pub_values_and_elf.elf);
108112

@@ -122,7 +126,7 @@ pub(crate) fn verify(
122126
}
123127

124128
pub fn vk_from_elf(elf: &[u8]) -> SP1VerifyingKey {
125-
let prover = ProverClient::builder().cpu().build();
129+
let prover = &*SP1_PROVER_CLIENT;
126130
let (_, vk) = prover.setup(elf);
127131
vk
128132
}
Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,26 @@
1-
use serde::Deserialize;
2-
use std::{fs::File, io::Read};
1+
use serde::{Deserialize, Serialize};
2+
use std::{fs::File, fs::OpenOptions, io::Read, io::Write};
33

4-
#[derive(Debug, Deserialize)]
4+
#[derive(Debug, Deserialize, Serialize)]
55
pub struct ECDSAConfig {
66
pub private_key_store_path: String,
77
pub private_key_store_password: String,
88
}
99

10-
#[derive(Debug, Deserialize)]
10+
#[derive(Debug, Deserialize, Serialize)]
11+
pub struct LastAggregatedBlock {
12+
pub last_aggregated_block: u64,
13+
}
14+
15+
#[derive(Debug, Deserialize, Serialize)]
1116
pub struct Config {
1217
pub eth_rpc_url: String,
1318
pub eth_ws_url: String,
1419
pub max_proofs_in_queue: u16,
1520
pub proof_aggregation_service_address: String,
1621
pub aligned_service_manager_address: String,
22+
pub last_aggregated_block_filepath: String,
1723
pub ecdsa: ECDSAConfig,
18-
pub fetch_logs_from_secs_ago: u64,
19-
pub block_time_secs: u64,
2024
}
2125

2226
impl Config {
@@ -27,4 +31,32 @@ impl Config {
2731
let config: Config = serde_yaml::from_str(&contents)?;
2832
Ok(config)
2933
}
34+
35+
pub fn get_last_aggregated_block(&self) -> Result<u64, Box<dyn std::error::Error>> {
36+
let mut file = File::open(&self.last_aggregated_block_filepath)?;
37+
let mut contents = String::new();
38+
file.read_to_string(&mut contents)?;
39+
let lab: LastAggregatedBlock = serde_json::from_str(&contents)?;
40+
Ok(lab.last_aggregated_block)
41+
}
42+
43+
pub fn update_last_aggregated_block(
44+
&self,
45+
last_aggregated_block: u64,
46+
) -> Result<(), Box<dyn std::error::Error>> {
47+
let last_aggregated_block_struct = LastAggregatedBlock {
48+
last_aggregated_block,
49+
};
50+
51+
let mut file = OpenOptions::new()
52+
.write(true)
53+
.truncate(true)
54+
.create(true)
55+
.open(&self.last_aggregated_block_filepath)?;
56+
57+
let content = serde_json::to_string(&last_aggregated_block_struct)?;
58+
file.write_all(content.as_bytes())?;
59+
60+
Ok(())
61+
}
3062
}

aggregation_mode/src/backend/fetcher.rs

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@ use tracing::{error, info};
1717

1818
#[derive(Debug)]
1919
pub enum ProofsFetcherError {
20-
QueryingLogs,
21-
BlockNumber,
20+
GetLogs(String),
21+
GetBlockNumber(String),
2222
}
2323

2424
pub struct ProofsFetcher {
2525
rpc_provider: RPCProvider,
2626
aligned_service_manager: AlignedLayerServiceManagerContract,
27-
fetch_from_secs_ago: u64,
28-
block_time_secs: u64,
27+
last_aggregated_block: u64,
2928
}
3029

3130
impl ProofsFetcher {
@@ -38,31 +37,49 @@ impl ProofsFetcher {
3837
rpc_provider.clone(),
3938
);
4039

40+
let last_aggregated_block = config.get_last_aggregated_block().unwrap();
41+
4142
Self {
4243
rpc_provider,
4344
aligned_service_manager,
44-
fetch_from_secs_ago: config.fetch_logs_from_secs_ago,
45-
block_time_secs: config.block_time_secs,
45+
last_aggregated_block,
4646
}
4747
}
4848

49-
pub async fn fetch(&self) -> Result<Vec<AlignedProof>, ProofsFetcherError> {
50-
let from_block = self.get_block_number_to_fetch_from().await?;
49+
pub async fn fetch(&mut self) -> Result<Vec<AlignedProof>, ProofsFetcherError> {
50+
// Get current block
51+
let current_block = self
52+
.rpc_provider
53+
.get_block_number()
54+
.await
55+
.map_err(|e| ProofsFetcherError::GetBlockNumber(e.to_string()))?;
56+
57+
if current_block < self.last_aggregated_block {
58+
return Err(ProofsFetcherError::GetBlockNumber(
59+
"Invalid last processed block".to_string(),
60+
));
61+
}
62+
5163
info!(
52-
"Fetching proofs from batch logs starting from block number {}",
53-
from_block
64+
"Fetching proofs from batch logs starting from block number {} upto {}",
65+
self.last_aggregated_block, current_block
5466
);
67+
5568
// Subscribe to NewBatch event from AlignedServiceManager
5669
let logs = self
5770
.aligned_service_manager
5871
.NewBatchV3_filter()
59-
.from_block(from_block)
72+
.from_block(self.last_aggregated_block)
73+
.to_block(current_block)
6074
.query()
6175
.await
62-
.map_err(|_| ProofsFetcherError::QueryingLogs)?;
76+
.map_err(|e| ProofsFetcherError::GetLogs(e.to_string()))?;
6377

6478
info!("Logs collected {}", logs.len());
6579

80+
// Update last processed block after collecting logs
81+
self.last_aggregated_block = current_block;
82+
6683
let mut proofs = vec![];
6784

6885
for (batch, _) in logs {
@@ -119,15 +136,7 @@ impl ProofsFetcher {
119136
Ok(proofs)
120137
}
121138

122-
async fn get_block_number_to_fetch_from(&self) -> Result<u64, ProofsFetcherError> {
123-
let block_number = self
124-
.rpc_provider
125-
.get_block_number()
126-
.await
127-
.map_err(|_| ProofsFetcherError::BlockNumber)?;
128-
129-
let number_of_blocks_in_the_past = self.fetch_from_secs_ago / self.block_time_secs;
130-
131-
Ok(block_number.saturating_sub(number_of_blocks_in_the_past))
139+
pub fn get_last_aggregated_block(&self) -> u64 {
140+
self.last_aggregated_block
132141
}
133142
}

aggregation_mode/src/backend/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,17 @@ impl ProofAggregator {
6969
}
7070
}
7171

72-
pub async fn start(&mut self) {
72+
pub async fn start(&mut self, config: &Config) {
7373
info!("Starting proof aggregator service",);
7474

7575
info!("About to aggregate and submit proof to be verified on chain");
7676
let res = self.aggregate_and_submit_proofs_on_chain().await;
7777

7878
match res {
7979
Ok(()) => {
80+
config
81+
.update_last_aggregated_block(self.fetcher.get_last_aggregated_block())
82+
.unwrap();
8083
info!("Process finished successfully");
8184
}
8285
Err(err) => {

0 commit comments

Comments
 (0)