Skip to content

Commit 549a2ba

Browse files
committed
feat: agg subscribes to both success
1 parent 6fa127a commit 549a2ba

3 files changed

Lines changed: 86 additions & 65 deletions

File tree

aggregator/internal/pkg/aggregator.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,8 @@ func (agg *Aggregator) Start(ctx context.Context) error {
218218

219219
const MaxSentTxRetries = 5
220220

221+
const V2 = false
222+
221223
func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsAggregationServiceResponse) {
222224
if blsAggServiceResp.Err != nil {
223225
agg.taskMutex.Lock()
@@ -281,7 +283,7 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
281283
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
282284

283285
for i := 0; i < MaxSentTxRetries; i++ {
284-
if false { //V1
286+
if !V2 { //V1
285287
agg.logger.Info("agg if V1")
286288
_, err = agg.sendAggregatedResponse(batchData.BatchMerkleRoot, nonSignerStakesAndSignature)
287289
if err == nil {

aggregator/internal/pkg/subscriber.go

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,38 +5,39 @@ func (agg *Aggregator) SubscribeToNewTasks() error {
55
if err != nil {
66
return err
77
}
8-
9-
for {
10-
select {
11-
case err := <-agg.taskSubscriber:
12-
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to subscribe to new tasks", "err", err)
13-
14-
err = agg.subscribeToNewTasks()
15-
if err != nil {
16-
return err
17-
}
18-
case newBatch := <-agg.NewBatchChan:
19-
agg.AddNewTask(newBatch.BatchMerkleRoot, newBatch.TaskCreatedBlock)
20-
}
21-
}
22-
}
23-
func (agg *Aggregator) SubscribeToNewTasksV2() error {
24-
err := agg.subscribeToNewTasks()
8+
err = agg.subscribeToNewTasksV2()
259
if err != nil {
2610
return err
2711
}
2812

13+
V2 := false
14+
2915
for {
3016
select {
3117
case err := <-agg.taskSubscriber:
3218
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to subscribe to new tasks", "err", err)
3319

34-
err = agg.subscribeToNewTasks()
35-
if err != nil {
36-
return err
20+
if !V2 {
21+
err = agg.subscribeToNewTasks()
22+
if err != nil {
23+
return err
24+
}
25+
} else {
26+
err = agg.subscribeToNewTasksV2()
27+
if err != nil {
28+
return err
29+
}
30+
}
31+
case newBatch := <-agg.NewBatchChan:
32+
if !V2 {
33+
agg.AggregatorConfig.BaseConfig.Logger.Info("Adding new task, V1")
34+
agg.AddNewTask(newBatch.BatchMerkleRoot, newBatch.TaskCreatedBlock)
35+
}
36+
case newBatchV2 := <-agg.NewBatchChanV2:
37+
if V2 {
38+
agg.AggregatorConfig.BaseConfig.Logger.Info("Adding new task, V2")
39+
agg.AddNewTaskV2(newBatchV2.BatchMerkleRoot, newBatchV2.SenderAddress, newBatchV2.TaskCreatedBlock)
3740
}
38-
case newBatch := <-agg.NewBatchChanV2:
39-
agg.AddNewTaskV2(newBatch.BatchMerkleRoot, newBatch.SenderAddress, newBatch.TaskCreatedBlock)
4041
}
4142
}
4243
}
@@ -52,3 +53,14 @@ func (agg *Aggregator) subscribeToNewTasks() error {
5253

5354
return err
5455
}
56+
func (agg *Aggregator) subscribeToNewTasksV2() error {
57+
var err error
58+
59+
agg.taskSubscriber, err = agg.avsSubscriber.SubscribeToNewTasksV2(agg.NewBatchChanV2)
60+
61+
if err != nil {
62+
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", err)
63+
}
64+
65+
return err
66+
}

operator/pkg/operator.go

Lines changed: 49 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ func (o *Operator) Start(ctx context.Context) error {
149149
metricsErrChan = make(chan error, 1)
150150
}
151151

152+
var V2 = false
153+
152154
for {
153155
select {
154156
case <-context.Background().Done():
@@ -169,51 +171,56 @@ func (o *Operator) Start(ctx context.Context) error {
169171
o.Logger.Fatal("Could not subscribe to new tasks")
170172
}
171173
case newBatchLog := <-o.NewTaskCreatedChan:
172-
o.Logger.Infof("Received new batch log: V1")
173-
err := o.ProcessNewBatchLog(newBatchLog)
174-
if err != nil {
175-
o.Logger.Infof("batch %x did not verify. Err: %v", newBatchLog.BatchMerkleRoot, err)
176-
continue
177-
}
178-
179-
responseSignature := o.SignTaskResponse(newBatchLog.BatchMerkleRoot)
180-
o.Logger.Debugf("responseSignature about to send: %x", responseSignature)
174+
if !V2 {
175+
o.Logger.Infof("Received new batch log: V1")
176+
err := o.ProcessNewBatchLog(newBatchLog)
177+
if err != nil {
178+
o.Logger.Infof("batch %x did not verify. Err: %v", newBatchLog.BatchMerkleRoot, err)
179+
continue
180+
}
181+
182+
responseSignature := o.SignTaskResponse(newBatchLog.BatchMerkleRoot)
183+
o.Logger.Debugf("responseSignature about to send: %x", responseSignature)
184+
185+
signedTaskResponse := types.SignedTaskResponse{
186+
BatchMerkleRoot: newBatchLog.BatchMerkleRoot,
187+
BlsSignature: *responseSignature,
188+
OperatorId: o.OperatorId,
189+
}
190+
o.Logger.Infof("Signed Task Response to send: BatchIdentifierHash=%s, BatchMerkleRoot=%s, SenderAddress=%s",
191+
hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]),
192+
)
193+
go o.aggRpcClient.SendSignedTaskResponseToAggregator(&signedTaskResponse)
194+
}
181195

182-
signedTaskResponse := types.SignedTaskResponse{
183-
BatchMerkleRoot: newBatchLog.BatchMerkleRoot,
184-
BlsSignature: *responseSignature,
185-
OperatorId: o.OperatorId,
186-
}
187-
o.Logger.Infof("Signed Task Response to send: BatchIdentifierHash=%s, BatchMerkleRoot=%s, SenderAddress=%s",
188-
hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]),
189-
)
190-
go o.aggRpcClient.SendSignedTaskResponseToAggregator(&signedTaskResponse)
191196
case newBatchLogV2 := <-o.NewTaskCreatedChanV2:
192-
o.Logger.Infof("Received new batch log: V2")
193-
err := o.ProcessNewBatchLogV2(newBatchLogV2)
194-
if err != nil {
195-
o.Logger.Infof("batch %x did not verify. Err: %v", newBatchLogV2.BatchMerkleRoot, err)
196-
continue
197-
}
198-
199-
batchIdentifier := append(newBatchLogV2.BatchMerkleRoot[:], newBatchLogV2.SenderAddress[:]...)
200-
var batchIdentifierHash = *(*[32]byte)(crypto.Keccak256(batchIdentifier))
201-
responseSignature := o.SignTaskResponse(batchIdentifierHash)
202-
o.Logger.Debugf("responseSignature about to send: %x", responseSignature)
203-
204-
signedTaskResponse := types.SignedTaskResponseV2{
205-
BatchIdentifierHash: batchIdentifierHash,
206-
BatchMerkleRoot: newBatchLogV2.BatchMerkleRoot,
207-
SenderAddress: newBatchLogV2.SenderAddress,
208-
BlsSignature: *responseSignature,
209-
OperatorId: o.OperatorId,
197+
if V2 {
198+
o.Logger.Infof("Received new batch log: V2")
199+
err := o.ProcessNewBatchLogV2(newBatchLogV2)
200+
if err != nil {
201+
o.Logger.Infof("batch %x did not verify. Err: %v", newBatchLogV2.BatchMerkleRoot, err)
202+
continue
203+
}
204+
205+
batchIdentifier := append(newBatchLogV2.BatchMerkleRoot[:], newBatchLogV2.SenderAddress[:]...)
206+
var batchIdentifierHash = *(*[32]byte)(crypto.Keccak256(batchIdentifier))
207+
responseSignature := o.SignTaskResponse(batchIdentifierHash)
208+
o.Logger.Debugf("responseSignature about to send: %x", responseSignature)
209+
210+
signedTaskResponse := types.SignedTaskResponseV2{
211+
BatchIdentifierHash: batchIdentifierHash,
212+
BatchMerkleRoot: newBatchLogV2.BatchMerkleRoot,
213+
SenderAddress: newBatchLogV2.SenderAddress,
214+
BlsSignature: *responseSignature,
215+
OperatorId: o.OperatorId,
216+
}
217+
o.Logger.Infof("Signed Task Response to send: BatchIdentifierHash=%s, BatchMerkleRoot=%s, SenderAddress=%s",
218+
hex.EncodeToString(signedTaskResponse.BatchIdentifierHash[:]),
219+
hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]),
220+
hex.EncodeToString(signedTaskResponse.SenderAddress[:]),
221+
)
222+
go o.aggRpcClient.SendSignedTaskResponseToAggregatorV2(&signedTaskResponse)
210223
}
211-
o.Logger.Infof("Signed Task Response to send: BatchIdentifierHash=%s, BatchMerkleRoot=%s, SenderAddress=%s",
212-
hex.EncodeToString(signedTaskResponse.BatchIdentifierHash[:]),
213-
hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]),
214-
hex.EncodeToString(signedTaskResponse.SenderAddress[:]),
215-
)
216-
go o.aggRpcClient.SendSignedTaskResponseToAggregatorV2(&signedTaskResponse)
217224
}
218225
}
219226
}

0 commit comments

Comments
 (0)