@@ -42,23 +42,33 @@ type Aggregator struct {
4242 taskSubscriber event.Subscription
4343 blsAggregationService blsagg.BlsAggregationService
4444
45- // Using map here instead of slice to allow for easy lookup of tasks, when aggregator is restarting,
46- // its easier to get the task from the map instead of filling the slice again
47- tasks map [uint32 ][32 ]byte
48- // Mutex to protect the tasks map
49- tasksMutex * sync.Mutex
45+ // BLS Signature Service returns an Index
46+ // Since our ID is not an idx, we build this cache
47+ // Note: In case of a reboot, this doesn't need to be loaded,
48+ // and can start from zero
49+ batchesRootByIdx map [uint32 ][32 ]byte
50+ batchesRootByIdxMutex * sync.Mutex
51+
52+ // This is the counterpart,
53+ // to use when we have the batch but not the index
54+ // Note: In case of a reboot, this doesn't need to be loaded,
55+ // and can start from zero
56+ batchesIdxByRoot map [[32 ]byte ]uint32
57+ batchesIdxByRootMutex * sync.Mutex
58+
59+ // This task index is to communicate with the local BLS
60+ // Service.
61+ // Note: In case of a reboot it can start from 0 again
62+ nextBatchIndex uint32
63+ nextBatchIndexMutex * sync.Mutex
5064
5165 OperatorTaskResponses map [[32 ]byte ]* TaskResponsesWithStatus
5266 // Mutex to protect the taskResponses map
53- taskResponsesMutex * sync.Mutex
54- logger logging.Logger
55-
56- // FIXME(marian): This is a hacky workaround to send some sensible index to the BLS aggregation service,
57- // which needs a task index.
58- taskCounter uint32
59- taskCounterMutex * sync.Mutex
60- metricsReg * prometheus.Registry
61- metrics * metrics.Metrics
67+ batchesResponseMutex * sync.Mutex
68+ logger logging.Logger
69+
70+ metricsReg * prometheus.Registry
71+ metrics * metrics.Metrics
6272}
6373
6474func NewAggregator (aggregatorConfig config.AggregatorConfig ) (* Aggregator , error ) {
@@ -79,7 +89,9 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
7989 return nil , err
8090 }
8191
82- tasks := make (map [uint32 ][32 ]byte )
92+ batchesRootByIdx := make (map [uint32 ][32 ]byte )
93+ batchesIdxByRoot := make (map [[32 ]byte ]uint32 )
94+
8395 operatorTaskResponses := make (map [[32 ]byte ]* TaskResponsesWithStatus , 0 )
8496
8597 chainioConfig := sdkclients.BuildAllConfig {
@@ -104,27 +116,32 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
104116 avsRegistryService := avsregistry .NewAvsRegistryServiceChainCaller (avsReader .AvsRegistryReader , operatorPubkeysService , logger )
105117 blsAggregationService := blsagg .NewBlsAggregatorService (avsRegistryService , logger )
106118
107- // Explicitly initializing this value just in case.
108- taskCounter := uint32 (0 )
109-
110119 // Metrics
111120 reg := prometheus .NewRegistry ()
112121 aggregatorMetrics := metrics .NewMetrics (aggregatorConfig .Aggregator .MetricsIpPortAddress , reg , logger )
113122
123+ nextBatchIndex := uint32 (0 )
124+
114125 aggregator := Aggregator {
115- AggregatorConfig : & aggregatorConfig ,
116- avsReader : avsReader ,
117- avsSubscriber : avsSubscriber ,
118- avsWriter : avsWriter ,
119- NewBatchChan : newBatchChan ,
120- tasks : tasks ,
121- tasksMutex : & sync.Mutex {},
126+ AggregatorConfig : & aggregatorConfig ,
127+ avsReader : avsReader ,
128+ avsSubscriber : avsSubscriber ,
129+ avsWriter : avsWriter ,
130+ NewBatchChan : newBatchChan ,
131+
132+ batchesRootByIdx : batchesRootByIdx ,
133+ batchesRootByIdxMutex : & sync.Mutex {},
134+
135+ batchesIdxByRoot : batchesIdxByRoot ,
136+ batchesIdxByRootMutex : & sync.Mutex {},
137+
138+ nextBatchIndex : nextBatchIndex ,
139+ nextBatchIndexMutex : & sync.Mutex {},
140+
122141 OperatorTaskResponses : operatorTaskResponses ,
123- taskResponsesMutex : & sync.Mutex {},
142+ batchesResponseMutex : & sync.Mutex {},
124143 blsAggregationService : blsAggregationService ,
125144 logger : logger ,
126- taskCounter : taskCounter ,
127- taskCounterMutex : & sync.Mutex {},
128145 metricsReg : reg ,
129146 metrics : aggregatorMetrics ,
130147 }
@@ -193,9 +210,9 @@ func (agg *Aggregator) sendAggregatedResponseToContract(blsAggServiceResp blsagg
193210 "taskIndex" , blsAggServiceResp .TaskIndex ,
194211 )
195212
196- agg .tasksMutex .Lock ()
197- batchMerkleRoot := agg .tasks [blsAggServiceResp .TaskIndex ]
198- agg .tasksMutex .Unlock ()
213+ agg .batchesRootByIdxMutex .Lock ()
214+ batchMerkleRoot := agg .batchesRootByIdx [blsAggServiceResp .TaskIndex ]
215+ agg .batchesRootByIdxMutex .Unlock ()
199216
200217 _ , err := agg .avsWriter .SendAggregatedResponse (context .Background (), batchMerkleRoot , nonSignerStakesAndSignature )
201218 if err != nil {
@@ -206,31 +223,51 @@ func (agg *Aggregator) sendAggregatedResponseToContract(blsAggServiceResp blsagg
206223func (agg * Aggregator ) AddNewTask (batchMerkleRoot [32 ]byte , taskCreatedBlock uint32 ) {
207224 agg .AggregatorConfig .BaseConfig .Logger .Info ("Adding new task" , "Batch merkle root" , batchMerkleRoot )
208225
209- agg .taskCounterMutex .Lock ()
210- agg .taskCounter ++
211- agg .taskCounterMutex .Unlock ()
226+ agg .nextBatchIndexMutex .Lock ()
227+ batchIndex := agg .nextBatchIndex
228+ agg .nextBatchIndexMutex .Unlock ()
212229
213- agg .tasksMutex .Lock ()
214- if _ , ok := agg .tasks [agg .taskCounter ]; ok {
215- agg .logger .Warn ("Task already exists" , "taskIndex" , agg .taskCounter )
216- agg .tasksMutex .Unlock ()
230+ // --- UPDATE BATCH - INDEX CACHES ---
231+
232+ agg .batchesRootByIdxMutex .Lock ()
233+ if _ , ok := agg .batchesRootByIdx [batchIndex ]; ok {
234+ agg .logger .Warn ("Batch already exists" , "batchIndex" , batchIndex , "batchRoot" , batchMerkleRoot )
235+ agg .batchesRootByIdxMutex .Unlock ()
236+ return
237+ }
238+ agg .batchesRootByIdx [batchIndex ] = batchMerkleRoot
239+ agg .batchesRootByIdxMutex .Unlock ()
240+
241+ agg .batchesIdxByRootMutex .Lock ()
242+ // This shouldn't happen, since both maps are updated together
243+ if _ , ok := agg .batchesIdxByRoot [batchMerkleRoot ]; ok {
244+ agg .logger .Warn ("Batch already exists" , "batchIndex" , batchIndex , "batchRoot" , batchMerkleRoot )
245+ agg .batchesRootByIdxMutex .Unlock ()
217246 return
218247 }
219- agg .tasks [agg .taskCounter ] = batchMerkleRoot
220- agg .tasksMutex .Unlock ()
248+ agg .batchesIdxByRoot [batchMerkleRoot ] = batchIndex
249+ agg .batchesIdxByRootMutex .Unlock ()
250+
251+ // --- UPDATE TASK RESPONSES ---
221252
222- agg .taskResponsesMutex .Lock ()
253+ agg .batchesResponseMutex .Lock ()
223254 agg .OperatorTaskResponses [batchMerkleRoot ] = & TaskResponsesWithStatus {
224255 taskResponses : make ([]types.SignedTaskResponse , 0 ),
225256 submittedToEthereum : false ,
226257 }
227- agg .taskResponsesMutex .Unlock ()
258+ agg .batchesResponseMutex .Unlock ()
228259
229260 quorumNums := eigentypes.QuorumNums {eigentypes .QuorumNum (QUORUM_NUMBER )}
230261 quorumThresholdPercentages := eigentypes.QuorumThresholdPercentages {eigentypes .QuorumThresholdPercentage (QUORUM_THRESHOLD )}
231262
232- // FIXME(marian): Hardcoded value of timeToExpiry to 100s. How should be get this value?
233- err := agg .blsAggregationService .InitializeNewTask (agg .taskCounter , taskCreatedBlock , quorumNums , quorumThresholdPercentages , 100 * time .Second )
263+ err := agg .blsAggregationService .InitializeNewTask (batchIndex , taskCreatedBlock , quorumNums , quorumThresholdPercentages , 100 * time .Second )
264+
265+ // --- INCREASE BATCH INDEX ---
266+
267+ agg .nextBatchIndexMutex .Lock ()
268+ agg .nextBatchIndex = agg .nextBatchIndex + 1
269+ agg .nextBatchIndexMutex .Unlock ()
270+
234271 // FIXME(marian): When this errors, should we retry initializing new task? Logging fatal for now.
235272 if err != nil {
236273 agg .logger .Fatalf ("BLS aggregation service error when initializing new task: %s" , err )
0 commit comments