@@ -10,15 +10,17 @@ use alloy::{
1010 providers:: { Provider , ProviderBuilder } ,
1111} ;
1212use sqlx:: types:: BigDecimal ;
13+ use tracing:: info;
1314
1415pub struct PaymentsPoller {
1516 db : Db ,
1617 proof_aggregation_service : AggregationModePaymentServiceContract ,
1718 rpc_provider : RpcProvider ,
19+ config : Config ,
1820}
1921
2022impl PaymentsPoller {
21- pub fn new ( db : Db , config : Config ) -> Self {
23+ pub fn new ( db : Db , config : Config ) -> Result < Self , Box < dyn std :: error :: Error > > {
2224 let rpc_url = config. eth_rpc_url . parse ( ) . expect ( "RPC URL should be valid" ) ;
2325 let rpc_provider = ProviderBuilder :: new ( ) . connect_http ( rpc_url) ;
2426 let proof_aggregation_service = AggregationModePaymentService :: new (
@@ -27,16 +29,30 @@ impl PaymentsPoller {
2729 rpc_provider. clone ( ) ,
2830 ) ;
2931
30- Self {
32+ // This check is here to catch early failures on last block fetching
33+ let _ = config. get_last_block_fetched ( ) ?;
34+
35+ Ok ( Self {
3136 db,
3237 proof_aggregation_service,
3338 rpc_provider,
34- }
39+ config,
40+ } )
3541 }
3642
3743 pub async fn start ( & self ) {
3844 let seconds_to_wait_between_polls = 12 ;
45+
3946 loop {
47+ let Ok ( last_block_fetched) = self . config . get_last_block_fetched ( ) else {
48+ tracing:: warn!( "Could not get last block fetched, skipping polling iteration..." ) ;
49+ tokio:: time:: sleep ( std:: time:: Duration :: from_secs (
50+ seconds_to_wait_between_polls,
51+ ) )
52+ . await ;
53+ continue ;
54+ } ;
55+
4056 let Ok ( current_block) = self . rpc_provider . get_block_number ( ) . await else {
4157 tracing:: warn!( "Could not get current block skipping polling iteration..." ) ;
4258 tokio:: time:: sleep ( std:: time:: Duration :: from_secs (
@@ -46,10 +62,13 @@ impl PaymentsPoller {
4662 continue ;
4763 } ;
4864
65+ let from = last_block_fetched. saturating_sub ( 5 ) ;
66+ info ! ( "Fetching logs from block {from} to {current_block}" ) ;
67+
4968 let Ok ( logs) = self
5069 . proof_aggregation_service
5170 . UserPayment_filter ( )
52- . from_block ( current_block - 5 )
71+ . from_block ( last_block_fetched - 5 )
5372 . to_block ( current_block)
5473 . query ( )
5574 . await
@@ -91,6 +110,11 @@ impl PaymentsPoller {
91110 }
92111 }
93112
113+ if let Err ( err) = self . config . update_last_block_fetched ( current_block) {
114+ tracing:: error!( "Failed to update the last aggregated block: {err}" ) ;
115+ continue ;
116+ } ;
117+
94118 tokio:: time:: sleep ( std:: time:: Duration :: from_secs (
95119 seconds_to_wait_between_polls,
96120 ) )
0 commit comments