@@ -41,7 +41,7 @@ use aws_sdk_s3::client::Client as S3Client;
4141use eth:: payment_service:: { BatcherPaymentService , CreateNewTaskFeeParams , SignerMiddlewareT } ;
4242use ethers:: prelude:: { Middleware , Provider } ;
4343use ethers:: types:: { Address , Signature , TransactionReceipt , U256 } ;
44- use futures_util:: { future, SinkExt , StreamExt , TryStreamExt } ;
44+ use futures_util:: { future, join , SinkExt , StreamExt , TryStreamExt } ;
4545use lambdaworks_crypto:: merkle_tree:: merkle:: MerkleTree ;
4646use lambdaworks_crypto:: merkle_tree:: traits:: IsMerkleTreeBackend ;
4747use log:: { debug, error, info, warn} ;
@@ -351,10 +351,19 @@ impl Batcher {
351351
352352 let last_seen_block = Mutex :: < u64 > :: new ( 0 ) ;
353353
354- while let Some ( block) = tokio:: select! {
355- block = stream. next( ) => block,
356- block = stream_fallback. next( ) => block,
357- } {
354+ loop {
355+ // Wait for both responses
356+ let ( block_main, block_fallback) = join ! ( stream. next( ) , stream_fallback. next( ) ) ;
357+
358+ let block = if let Some ( block) = block_main {
359+ block
360+ } else if let Some ( block) = block_fallback {
361+ block
362+ } else {
363+ // Both rpc failed to respond, break and try to reconnect
364+ break ;
365+ } ;
366+
358367 let batcher = self . clone ( ) ;
359368 let block_number = block. number . unwrap_or_default ( ) ;
360369 let block_number = u64:: try_from ( block_number) . unwrap_or_default ( ) ;
@@ -371,7 +380,7 @@ impl Batcher {
371380 tokio:: spawn ( async move {
372381 if let Err ( e) = batcher. handle_new_block ( block_number) . await {
373382 error ! ( "Error when handling new block: {:?}" , e) ;
374- } ;
383+ }
375384 } ) ;
376385 }
377386 error ! ( "Failed to fetch blocks" ) ;
0 commit comments