Skip to content

Commit 1044de8

Browse files
authored
fix: avs subscriber error not being handled correctly (#626)
1 parent 83993bb commit 1044de8

3 files changed

Lines changed: 49 additions & 46 deletions

File tree

Lines changed: 13 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,35 @@
11
package pkg
22

3-
import (
4-
"errors"
5-
"fmt"
6-
"time"
7-
8-
"github.com/ethereum/go-ethereum/accounts/abi/bind"
9-
)
10-
11-
const (
12-
MaxRetries = 100
13-
RetryInterval = 1 * time.Second
14-
)
15-
163
func (agg *Aggregator) SubscribeToNewTasks() error {
17-
for retries := 0; retries < MaxRetries; retries++ {
18-
err := agg.tryCreateTaskSubscriber()
19-
if err == nil {
20-
_ = agg.subscribeToNewTasks() // This will block until an error occurs
21-
}
22-
23-
message := fmt.Sprintf("Failed to subscribe to new tasks. Retrying in %v", RetryInterval)
24-
agg.AggregatorConfig.BaseConfig.Logger.Info(message)
25-
time.Sleep(RetryInterval)
4+
err := agg.subscribeToNewTasks()
5+
if err != nil {
6+
return err
267
}
278

28-
return errors.New("failed to subscribe to new tasks after max retries")
29-
}
30-
31-
func (agg *Aggregator) subscribeToNewTasks() error {
329
for {
3310
select {
3411
case err := <-agg.taskSubscriber.Err():
35-
return err
12+
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to subscribe to new tasks", "err", err)
13+
agg.taskSubscriber.Unsubscribe()
14+
err = agg.subscribeToNewTasks()
15+
if err != nil {
16+
return err
17+
}
3618
case newBatch := <-agg.NewBatchChan:
3719
agg.AddNewTask(newBatch.BatchMerkleRoot, newBatch.TaskCreatedBlock)
3820
}
3921
}
22+
4023
}
4124

42-
func (agg *Aggregator) tryCreateTaskSubscriber() error {
25+
func (agg *Aggregator) subscribeToNewTasks() error {
4326
var err error
4427

45-
agg.AggregatorConfig.BaseConfig.Logger.Info("Subscribing to Ethereum serviceManager task events")
46-
agg.taskSubscriber, err = agg.avsSubscriber.AvsContractBindings.ServiceManager.WatchNewBatch(&bind.WatchOpts{},
47-
agg.NewBatchChan, nil)
28+
agg.taskSubscriber, err = agg.avsSubscriber.SubscribeToNewTasks(agg.NewBatchChan)
4829

4930
if err != nil {
5031
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", err)
5132
}
33+
5234
return err
5335
}

core/chainio/avs_subscriber.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package chainio
22

33
import (
4+
"fmt"
5+
"time"
6+
47
"github.com/ethereum/go-ethereum/accounts/abi/bind"
58
"github.com/ethereum/go-ethereum/event"
69
servicemanager "github.com/yetanotherco/aligned_layer/contracts/bindings/AlignedLayerServiceManager"
@@ -9,6 +12,11 @@ import (
912
sdklogging "github.com/Layr-Labs/eigensdk-go/logging"
1013
)
1114

15+
const (
16+
MaxRetries = 100
17+
RetryInterval = 1 * time.Second
18+
)
19+
1220
// NOTE(marian): Leaving this commented code here as it may be useful in the short term.
1321
// type AvsSubscriberer interface {
1422
// SubscribeToNewTasks(newTaskCreatedChan chan *cstaskmanager.ContractAlignedLayerTaskManagerNewTaskCreated) event.Subscription
@@ -42,15 +50,22 @@ func NewAvsSubscriberFromConfig(baseConfig *config.BaseConfig) (*AvsSubscriber,
4250
}, nil
4351
}
4452

45-
func (s *AvsSubscriber) SubscribeToNewTasks(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatch) event.Subscription {
46-
sub, err := s.AvsContractBindings.ServiceManager.WatchNewBatch(
47-
&bind.WatchOpts{}, newTaskCreatedChan, nil,
48-
)
49-
if err != nil {
50-
s.logger.Error("Failed to subscribe to new AlignedLayer tasks", "err", err)
53+
func (s *AvsSubscriber) SubscribeToNewTasks(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatch) (event.Subscription, error) {
54+
for i := 0; i < MaxRetries; i++ {
55+
sub, err := s.AvsContractBindings.ServiceManager.WatchNewBatch(
56+
&bind.WatchOpts{}, newTaskCreatedChan, nil,
57+
)
58+
if err != nil {
59+
s.logger.Warn("Failed to subscribe to new AlignedLayer tasks", "err", err)
60+
time.Sleep(RetryInterval)
61+
continue
62+
}
63+
64+
s.logger.Info("Subscribed to new AlignedLayer tasks")
65+
return sub, nil
5166
}
52-
s.logger.Infof("Subscribed to new AlignedLayer tasks")
53-
return sub
67+
68+
return nil, fmt.Errorf("Failed to subscribe to new AlignedLayer tasks after %d retries", MaxRetries)
5469
}
5570

5671
// func (s *AvsSubscriber) SubscribeToTaskResponses(taskResponseChan chan *cstaskmanager.ContractAlignedLayerTaskManagerTaskResponded) event.Subscription {

operator/pkg/operator.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ import (
66
"crypto/ecdsa"
77
"encoding/binary"
88
"fmt"
9-
"github.com/ethereum/go-ethereum/crypto"
10-
"github.com/yetanotherco/aligned_layer/operator/risc_zero"
119
"log"
1210
"sync"
1311
"time"
1412

13+
"github.com/ethereum/go-ethereum/crypto"
14+
"github.com/yetanotherco/aligned_layer/operator/risc_zero"
15+
1516
"github.com/prometheus/client_golang/prometheus"
1617
"github.com/yetanotherco/aligned_layer/metrics"
1718

@@ -118,13 +119,15 @@ func NewOperatorFromConfig(configuration config.OperatorConfig) (*Operator, erro
118119
return operator, nil
119120
}
120121

121-
func (o *Operator) SubscribeToNewTasks() event.Subscription {
122-
sub := o.avsSubscriber.SubscribeToNewTasks(o.NewTaskCreatedChan)
123-
return sub
122+
func (o *Operator) SubscribeToNewTasks() (event.Subscription, error) {
123+
return o.avsSubscriber.SubscribeToNewTasks(o.NewTaskCreatedChan)
124124
}
125125

126126
func (o *Operator) Start(ctx context.Context) error {
127-
sub := o.SubscribeToNewTasks()
127+
sub, err := o.SubscribeToNewTasks()
128+
if err != nil {
129+
log.Fatal("Could not subscribe to new tasks")
130+
}
128131

129132
var metricsErrChan <-chan error
130133
if o.Config.Operator.EnableMetrics {
@@ -143,7 +146,10 @@ func (o *Operator) Start(ctx context.Context) error {
143146
case err := <-sub.Err():
144147
o.Logger.Infof("Error in websocket subscription", "err", err)
145148
sub.Unsubscribe()
146-
sub = o.SubscribeToNewTasks()
149+
sub, err = o.SubscribeToNewTasks()
150+
if err != nil {
151+
o.Logger.Fatal("Could not subscribe to new tasks")
152+
}
147153
case newBatchLog := <-o.NewTaskCreatedChan:
148154
err := o.ProcessNewBatchLog(newBatchLog)
149155
if err != nil {

0 commit comments

Comments
 (0)