Skip to content

Commit 09def0a

Browse files
authored
fix (aggregator): dont allow operator to respond twice (#702)
1 parent e7a7c41 commit 09def0a

2 files changed

Lines changed: 48 additions & 7 deletions

File tree

aggregator/internal/pkg/aggregator.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ package pkg
33
import (
44
"context"
55
"encoding/hex"
6-
gethtypes "github.com/ethereum/go-ethereum/core/types"
76
"sync"
87
"time"
98

9+
gethtypes "github.com/ethereum/go-ethereum/core/types"
10+
1011
"github.com/prometheus/client_golang/prometheus"
1112
"github.com/yetanotherco/aligned_layer/metrics"
1213

@@ -56,6 +57,12 @@ type Aggregator struct {
5657
// Stores the taskCreatedBlock for each batch bt batch index
5758
batchCreatedBlockByIdx map[uint32]uint64
5859

60+
// Stores if an operator already submitted a response for a batch
61+
// This is to avoid double submissions
62+
// struct{} is used as a placeholder because it is the smallest type
63+
// go does not have a set type
64+
operatorRespondedBatch map[uint32]map[eigentypes.Bytes32]struct{}
65+
5966
// This task index is to communicate with the local BLS
6067
// Service.
6168
// Note: In case of a reboot it can start from 0 again
@@ -133,6 +140,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
133140
batchesRootByIdx: batchesRootByIdx,
134141
batchesIdxByRoot: batchesIdxByRoot,
135142
batchCreatedBlockByIdx: batchCreatedBlockByIdx,
143+
operatorRespondedBatch: make(map[uint32]map[eigentypes.Bytes32]struct{}),
136144
nextBatchIndex: nextBatchIndex,
137145
taskMutex: &sync.Mutex{},
138146
walletMutex: &sync.Mutex{},
@@ -183,6 +191,14 @@ const MaxSentTxRetries = 5
183191
func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsAggregationServiceResponse) {
184192
if blsAggServiceResp.Err != nil {
185193
agg.logger.Warn("BlsAggregationServiceResponse contains an error", "err", blsAggServiceResp.Err)
194+
agg.logger.Info("- Locking task mutex: Delete task from operator map", "taskIndex", blsAggServiceResp.TaskIndex)
195+
agg.taskMutex.Lock()
196+
197+
// Remove task from the list of tasks
198+
delete(agg.operatorRespondedBatch, blsAggServiceResp.TaskIndex)
199+
200+
agg.logger.Info("- Unlocking task mutex: Delete task from operator map", "taskIndex", blsAggServiceResp.TaskIndex)
201+
agg.taskMutex.Unlock()
186202
return
187203
}
188204
nonSignerPubkeys := []servicemanager.BN254G1Point{}
@@ -209,13 +225,16 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
209225
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Fetching merkle root")
210226
batchMerkleRoot := agg.batchesRootByIdx[blsAggServiceResp.TaskIndex]
211227
taskCreatedBlock := agg.batchCreatedBlockByIdx[blsAggServiceResp.TaskIndex]
228+
229+
// Delete the task from the map
230+
delete(agg.operatorRespondedBatch, blsAggServiceResp.TaskIndex)
231+
212232
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Fetching merkle root")
213233
agg.taskMutex.Unlock()
214234

215235
agg.logger.Info("Threshold reached", "taskIndex", blsAggServiceResp.TaskIndex,
216236
"merkleRoot", hex.EncodeToString(batchMerkleRoot[:]))
217237

218-
219238
currentBlock, err := agg.AggregatorConfig.BaseConfig.EthRpcClient.BlockNumber(context.Background())
220239
if err != nil {
221240
agg.logger.Error("Error getting current block number", "err", err)
@@ -270,10 +289,8 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
270289
"merkleRoot", hex.EncodeToString(batchMerkleRoot[:]))
271290
}
272291

273-
274-
275-
/// Sends response to contract and waits for transaction receipt
276-
/// Returns error if it fails to send tx or receipt is not found
292+
// / Sends response to contract and waits for transaction receipt
293+
// / Returns error if it fails to send tx or receipt is not found
277294
func (agg *Aggregator) sendAggregatedResponse(batchMerkleRoot [32]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*gethtypes.Receipt, error) {
278295
agg.walletMutex.Lock()
279296
agg.logger.Infof("- Locked Wallet Resources: Sending aggregated response for batch %s", hex.EncodeToString(batchMerkleRoot[:]))
@@ -299,7 +316,6 @@ func (agg *Aggregator) sendAggregatedResponse(batchMerkleRoot [32]byte, nonSigne
299316
return receipt, nil
300317
}
301318

302-
303319
func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, taskCreatedBlock uint32) {
304320
agg.AggregatorConfig.BaseConfig.Logger.Info("Adding new task",
305321
"Batch merkle root", hex.EncodeToString(batchMerkleRoot[:]))

aggregator/internal/pkg/server.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"net/rpc"
88
"time"
99

10+
eigentypes "github.com/Layr-Labs/eigensdk-go/types"
11+
1012
"github.com/yetanotherco/aligned_layer/core/types"
1113
)
1214

@@ -73,6 +75,26 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponse(signedTaskResponse *typ
7375
return nil
7476
}
7577

78+
// Note: we already have lock here
79+
agg.logger.Debug("- Checking if operator already responded")
80+
batchResponses, ok := agg.operatorRespondedBatch[taskIndex]
81+
if !ok {
82+
batchResponses = make(map[eigentypes.Bytes32]struct{})
83+
agg.operatorRespondedBatch[taskIndex] = batchResponses
84+
}
85+
86+
if _, ok := batchResponses[signedTaskResponse.OperatorId]; ok {
87+
*reply = 0
88+
agg.logger.Warn("Operator already responded, ignoring",
89+
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]),
90+
"taskIndex", taskIndex, "batchMerkleRoot", hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]))
91+
92+
agg.taskMutex.Unlock()
93+
return nil
94+
}
95+
96+
batchResponses[signedTaskResponse.OperatorId] = struct{}{}
97+
7698
// Don't wait infinitely if it can't answer
7799
// Create a context with a timeout of 5 seconds
78100
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
@@ -90,6 +112,9 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponse(signedTaskResponse *typ
90112

91113
if err != nil {
92114
agg.logger.Warnf("BLS aggregation service error: %s", err)
115+
// remove operator from the list of operators that responded
116+
// so that it can try again
117+
delete(batchResponses, signedTaskResponse.OperatorId)
93118
} else {
94119
agg.logger.Info("BLS process succeeded")
95120
}

0 commit comments

Comments
 (0)