Skip to content

Commit 11f1d40

Browse files
committed
Merge remote-tracking branch 'origin/staging' into feat/support-multiple-dbs
2 parents 66d9837 + 6f2be89 commit 11f1d40

29 files changed

Lines changed: 575 additions & 87 deletions

Makefile

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ reset_last_aggregated_block:
240240
@echo "Resetting last aggregated block..."
241241
@echo '{"last_aggregated_block":0}' > config-files/proof-aggregator.last_aggregated_block.json
242242

243-
AGGREGATION_MODE_SOURCES = $(wildcard ./aggregation_mode/Cargo.toml) $(wildcard ./aggregation_mode/src/**) $(wildcard ./aggregation_mode/aggregation_programs/risc0/Cargo.toml) $(wildcard ./aggregation_mode/aggregation_programs/risc0/src/**) $(wildcard ./aggregation_mode/aggregation_programs/sp1/Cargo.toml) $(wildcard ./aggregation_mode/aggregation_programs/sp1/src/**)
243+
AGGREGATION_MODE_SOURCES = $(wildcard ./aggregation_mode/Cargo.toml) $(wildcard ./aggregation_mode/proof_aggregator/Cargo.toml) $(wildcard ./aggregation_mode/proof_aggregator/src/**) $(wildcard ./aggregation_mode/proof_aggregator/aggregation_programs/risc0/Cargo.toml) $(wildcard ./aggregation_mode/proof_aggregator/aggregation_programs/risc0/src/**) $(wildcard ./aggregation_mode/proof_aggregator/aggregation_programs/sp1/Cargo.toml) $(wildcard ./aggregation_mode/proof_aggregator/aggregation_programs/sp1/src/**)
244244

245245
### All Dev proof aggregator receipts with no real proving
246246
./aggregation_mode/target/release/proof_aggregator_dev: $(AGGREGATION_MODE_SOURCES)
@@ -341,6 +341,15 @@ agg_mode_gateway_send_sp1_proof:
341341
--vk scripts/test_files/sp1/sp1_fibonacci_5_0_0_vk.bin \
342342
--private-key "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d"
343343

344+
agg_mode_install_cli: ## Install the aggregation mode CLI
345+
@cargo install --path aggregation_mode/cli
346+
347+
agg_mode_task_sender_start: agg_mode_install_cli ## Send proofs to agg mode gateway
348+
@. scripts/.agg_mode.task_sender.env && . ./scripts/agg_mode_send_sp1_proof_interval.sh
349+
350+
agg_mode_get_quotas:
351+
curl -X GET http://127.0.0.1:8089/quotas/0x70997970C51812dc3A010C7d01b50e0d17dc79C8
352+
344353
__AGGREGATOR__: ## ____
345354

346355
aggregator_start: ## Start the Aggregator. Parameters: ENVIRONMENT=<devnet|testnet|mainnet>, AGG_CONFIG_FILE
@@ -994,6 +1003,10 @@ upgrade_proof_aggregator: ## Upgrade ProofAggregator contract. Parameters: NETWO
9941003
@echo "Upgrading ProofAggregator Contract on $(NETWORK) network..."
9951004
@. contracts/scripts/.env.$(NETWORK) && . contracts/scripts/upgrade_proof_aggregator.sh
9961005

1006+
deploy_agg_mode_payment_service:
1007+
@echo "Deploying Agg Mode Payment Service contract on $(NETWORK) network..."
1008+
@. contracts/scripts/.env.$(NETWORK) && . contracts/scripts/deploy_agg_mode_payment_service.sh
1009+
9971010
__SP1_FFI__: ##
9981011
build_sp1_macos:
9991012
@cd operator/sp1/lib && cargo build $(RELEASE_FLAG)

aggregation_mode/Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

aggregation_mode/db/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ edition = "2021"
66
[dependencies]
77
serde = { workspace = true }
88
tokio = { version = "1"}
9-
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "migrate" , "uuid", "bigdecimal"] }
9+
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "migrate", "chrono", "uuid", "bigdecimal"] }
1010
tracing = { version = "0.1", features = ["log"] }
1111

1212
[[bin]]
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TABLE tasks add COLUMN status_updated_at TIMESTAMPTZ DEFAULT now();

aggregation_mode/db/src/types.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use sqlx::{
22
prelude::FromRow,
3-
types::{BigDecimal, Uuid},
3+
types::{
4+
chrono::{DateTime, Utc},
5+
BigDecimal, Uuid,
6+
},
47
Type,
58
};
69

@@ -21,6 +24,7 @@ pub struct Task {
2124
pub program_commitment: Vec<u8>,
2225
pub merkle_path: Option<Vec<u8>>,
2326
pub status: TaskStatus,
27+
pub status_updated_at: DateTime<Utc>,
2428
}
2529

2630
#[derive(Debug, Clone, FromRow)]

aggregation_mode/gateway/src/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize};
44

55
#[derive(Clone, Debug, Deserialize, Serialize)]
66
pub struct Config {
7+
pub ip: String,
78
pub port: u16,
89
pub db_connection_urls: Vec<String>,
910
pub network: String,

aggregation_mode/gateway/src/helpers.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::time::{SystemTime, UNIX_EPOCH};
2+
13
pub(super) fn format_merkle_path(bytes: &[u8]) -> Result<Vec<String>, String> {
24
if bytes.is_empty() {
35
return Ok(vec![]);
@@ -12,3 +14,17 @@ pub(super) fn format_merkle_path(bytes: &[u8]) -> Result<Vec<String>, String> {
1214
.map(|chunk| format!("0x{}", hex::encode(chunk)))
1315
.collect())
1416
}
17+
18+
pub(crate) fn get_time_left_day_formatted() -> String {
19+
let now = SystemTime::now()
20+
.duration_since(UNIX_EPOCH)
21+
.expect("Error al obtener el tiempo");
22+
23+
let seconds_remaining = 86400 - (now.as_secs() % 86400);
24+
25+
let hours = seconds_remaining / 3600;
26+
let minutes = (seconds_remaining % 3600) / 60;
27+
let seconds = seconds_remaining % 60;
28+
29+
format!("{hours}:{minutes}:{seconds} UTC")
30+
}

aggregation_mode/gateway/src/http.rs

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use super::{
2222
use crate::{
2323
config::Config,
2424
db::Db,
25+
helpers::get_time_left_day_formatted,
2526
types::{GetReceiptsResponse, SubmitProofRequestRisc0, SubmitProofRequestSP1},
2627
verifiers::{verify_sp1_proof, VerificationError},
2728
};
@@ -52,18 +53,25 @@ impl GatewayServer {
5253
HttpServer::new(move || {
5354
App::new()
5455
.app_data(Data::new(state.clone()))
56+
.route("/", web::get().to(Self::get_root))
5557
.route("/nonce/{address}", web::get().to(Self::get_nonce))
5658
.route("/receipts", web::get().to(Self::get_receipts))
5759
.route("/proof/sp1", web::post().to(Self::post_proof_sp1))
5860
.route("/proof/risc0", web::post().to(Self::post_proof_risc0))
61+
.route("/quotas/{address}", web::get().to(Self::get_quotas))
5962
})
60-
.bind(("127.0.0.1", port))
63+
.bind((self.config.ip.as_str(), port))
6164
.expect("To bind socket correctly")
6265
.run()
6366
.await
6467
.expect("Server to never end");
6568
}
6669

70+
// Returns an OK response (code 200), no matters what receives in the request
71+
async fn get_root(_req: HttpRequest) -> impl Responder {
72+
HttpResponse::Ok().json(AppResponse::new_sucessfull(serde_json::json!({})))
73+
}
74+
6775
// Returns the nonce (number of submitted tasks) for a given address
6876
async fn get_nonce(req: HttpRequest) -> impl Responder {
6977
let Some(address_raw) = req.match_info().get("address") else {
@@ -147,8 +155,13 @@ impl GatewayServer {
147155
};
148156

149157
if daily_tasks_by_address >= state.config.max_daily_proofs_per_user {
158+
let formatted_time_left = get_time_left_day_formatted();
159+
150160
return HttpResponse::InternalServerError().json(AppResponse::new_unsucessfull(
151-
"Request denied: Query limit exceeded.",
161+
format!(
162+
"Request denied: Query limit exceeded. Quotas renew in {formatted_time_left}"
163+
)
164+
.as_str(),
152165
400,
153166
));
154167
}
@@ -322,4 +335,73 @@ impl GatewayServer {
322335
.json(AppResponse::new_unsucessfull("Internal server error", 500)),
323336
}
324337
}
338+
339+
async fn get_quotas(req: HttpRequest) -> impl Responder {
340+
let Some(state) = req.app_data::<Data<GatewayServer>>() else {
341+
return HttpResponse::InternalServerError().json(AppResponse::new_unsucessfull(
342+
"Internal server error: Failed to get app data",
343+
500,
344+
));
345+
};
346+
347+
let state = state.get_ref();
348+
349+
let Some(address_raw) = req.match_info().get("address") else {
350+
return HttpResponse::BadRequest()
351+
.json(AppResponse::new_unsucessfull("Missing address", 400));
352+
};
353+
354+
// Check that the address is a valid ethereum address
355+
if alloy::primitives::Address::from_str(address_raw.trim()).is_err() {
356+
return HttpResponse::BadRequest()
357+
.json(AppResponse::new_unsucessfull("Invalid address", 400));
358+
}
359+
360+
let address = address_raw.trim().to_lowercase();
361+
362+
let Ok(daily_tasks_by_address) = state.db.get_daily_tasks_by_address(&address).await else {
363+
return HttpResponse::InternalServerError()
364+
.json(AppResponse::new_unsucessfull("Internal server error", 500));
365+
};
366+
367+
let formatted_time_left = get_time_left_day_formatted();
368+
369+
let now_epoch = match SystemTime::now().duration_since(UNIX_EPOCH) {
370+
Ok(duration) => duration.as_secs(),
371+
Err(_) => {
372+
return HttpResponse::InternalServerError()
373+
.json(AppResponse::new_unsucessfull("Internal server error", 500));
374+
}
375+
};
376+
377+
let has_payment = match state
378+
.db
379+
.has_active_payment_event(
380+
&address,
381+
// safe unwrap the number comes from a valid u64 primitive
382+
BigDecimal::from_str(&now_epoch.to_string()).unwrap(),
383+
)
384+
.await
385+
{
386+
Ok(result) => result,
387+
Err(_) => {
388+
return HttpResponse::InternalServerError()
389+
.json(AppResponse::new_unsucessfull("Internal server error", 500));
390+
}
391+
};
392+
393+
if has_payment {
394+
HttpResponse::Ok().json(AppResponse::new_sucessfull(serde_json::json!({
395+
"proofs_submitted": daily_tasks_by_address,
396+
"quota_limit": state.config.max_daily_proofs_per_user,
397+
"quota_remaining": (state.config.max_daily_proofs_per_user - daily_tasks_by_address),
398+
"quota_resets_in": formatted_time_left.as_str()
399+
})))
400+
} else {
401+
HttpResponse::Ok().json(AppResponse::new_unsucessfull(
402+
"The address doesn't have an active subscription",
403+
404,
404+
))
405+
}
406+
}
325407
}

aggregation_mode/payments_poller/src/config.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{fs::File, io::Read};
1+
use std::{fs::File, fs::OpenOptions, io::Read, io::Write};
22

33
use serde::{Deserialize, Serialize};
44

@@ -7,6 +7,12 @@ pub struct Config {
77
pub db_connection_urls: Vec<String>,
88
pub eth_rpc_url: String,
99
pub payment_service_address: String,
10+
pub last_block_fetched_filepath: String,
11+
}
12+
13+
#[derive(Debug, Deserialize, Serialize)]
14+
pub struct LastBlockFetched {
15+
pub last_block_fetched: u64,
1016
}
1117

1218
impl Config {
@@ -17,4 +23,30 @@ impl Config {
1723
let config: Config = serde_yaml::from_str(&contents)?;
1824
Ok(config)
1925
}
26+
27+
pub fn get_last_block_fetched(&self) -> Result<u64, Box<dyn std::error::Error>> {
28+
let mut file = File::open(&self.last_block_fetched_filepath)?;
29+
let mut contents = String::new();
30+
file.read_to_string(&mut contents)?;
31+
let lbf_struct: LastBlockFetched = serde_json::from_str(&contents)?;
32+
Ok(lbf_struct.last_block_fetched)
33+
}
34+
35+
pub fn update_last_block_fetched(
36+
&self,
37+
last_block_fetched: u64,
38+
) -> Result<(), Box<dyn std::error::Error>> {
39+
let last_block_fetched_struct = LastBlockFetched { last_block_fetched };
40+
41+
let mut file = OpenOptions::new()
42+
.write(true)
43+
.truncate(true)
44+
.create(true)
45+
.open(&self.last_block_fetched_filepath)?;
46+
47+
let content = serde_json::to_string(&last_block_fetched_struct)?;
48+
file.write_all(content.as_bytes())?;
49+
50+
Ok(())
51+
}
2052
}

aggregation_mode/payments_poller/src/main.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ async fn main() {
3030
.await
3131
.expect("db to start");
3232

33-
let payment_poller = PaymentsPoller::new(db, config);
34-
payment_poller.start().await;
33+
let payments_poller = match PaymentsPoller::new(db, config) {
34+
Ok(poller) => poller,
35+
Err(err) => {
36+
tracing::error!("Failed to create Payments Poller: {err:?}");
37+
return;
38+
}
39+
};
40+
41+
payments_poller.start().await;
3542
}

0 commit comments

Comments
 (0)