Skip to content
This repository was archived by the owner on Oct 3, 2023. It is now read-only.

Commit 2798eee

Browse files
authored
Apply timeout to each worker in metrics_proto export (#222)
* Apply timeout to each worker in metrics_proto export * Apply timeout to createMetricDescriptor
1 parent 633d9ea commit 2798eee

4 files changed

Lines changed: 29 additions & 15 deletions

File tree

metrics_batcher.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"strings"
2121
"sync"
22+
"time"
2223

2324
monitoring "cloud.google.com/go/monitoring/apiv3"
2425
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
@@ -44,7 +45,7 @@ type metricsBatcher struct {
4445
wg *sync.WaitGroup
4546
}
4647

47-
func newMetricsBatcher(ctx context.Context, projectID string, numWorkers int, mc *monitoring.MetricClient) *metricsBatcher {
48+
func newMetricsBatcher(ctx context.Context, projectID string, numWorkers int, mc *monitoring.MetricClient, timeout time.Duration) *metricsBatcher {
4849
if numWorkers < minNumWorkers {
4950
numWorkers = minNumWorkers
5051
}
@@ -58,7 +59,7 @@ func newMetricsBatcher(ctx context.Context, projectID string, numWorkers int, mc
5859
var wg sync.WaitGroup
5960
wg.Add(numWorkers)
6061
for i := 0; i < numWorkers; i++ {
61-
w := newWorker(ctx, mc, reqsChan, respsChan, &wg)
62+
w := newWorker(ctx, mc, reqsChan, respsChan, &wg, timeout)
6263
workers = append(workers, w)
6364
go w.start()
6465
}
@@ -143,8 +144,9 @@ func sendReq(ctx context.Context, c *monitoring.MetricClient, req *monitoringpb.
143144
}
144145

145146
type worker struct {
146-
ctx context.Context
147-
mc *monitoring.MetricClient
147+
ctx context.Context
148+
timeout time.Duration
149+
mc *monitoring.MetricClient
148150

149151
resp *response
150152

@@ -159,7 +161,8 @@ func newWorker(
159161
mc *monitoring.MetricClient,
160162
reqsChan chan *monitoringpb.CreateTimeSeriesRequest,
161163
respsChan chan *response,
162-
wg *sync.WaitGroup) *worker {
164+
wg *sync.WaitGroup,
165+
timeout time.Duration) *worker {
163166
return &worker{
164167
ctx: ctx,
165168
mc: mc,
@@ -172,12 +175,19 @@ func newWorker(
172175

173176
func (w *worker) start() {
174177
for req := range w.reqsChan {
175-
w.recordDroppedTimeseries(sendReq(w.ctx, w.mc, req))
178+
w.sendReqWithTimeout(req)
176179
}
177180
w.respsChan <- w.resp
178181
w.wg.Done()
179182
}
180183

184+
func (w *worker) sendReqWithTimeout(req *monitoringpb.CreateTimeSeriesRequest) {
185+
ctx, cancel := newContextWithTimeout(w.ctx, w.timeout)
186+
defer cancel()
187+
188+
w.recordDroppedTimeseries(sendReq(ctx, w.mc, req))
189+
}
190+
181191
func (w *worker) recordDroppedTimeseries(numTimeSeries int, err error) {
182192
w.resp.droppedTimeSeries += numTimeSeries
183193
if err != nil {

metrics_batcher_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,13 @@ func TestWorkers(t *testing.T) {
3535
if err != nil {
3636
t.Fatalf("Failed to create metric client %v", err)
3737
}
38-
m1 := newMetricsBatcher(ctx, "test", 1, c1) // batcher with 1 worker
38+
m1 := newMetricsBatcher(ctx, "test", 1, c1, defaultTimeout) // batcher with 1 worker
3939

4040
c2, err := makeClient(addr)
4141
if err != nil {
4242
t.Fatalf("Failed to create metric client %v", err)
4343
}
44-
m2 := newMetricsBatcher(ctx, "test", 2, c2) // batcher with 2 workers
44+
m2 := newMetricsBatcher(ctx, "test", 2, c2, defaultTimeout) // batcher with 2 workers
4545

4646
tss := make([]*monitoringpb.TimeSeries, 0, 500) // make 500 time series, should be split to 3 reqs
4747
for i := 0; i < 500; i++ {

metrics_proto.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,10 @@ func (se *statsExporter) PushMetricsProto(ctx context.Context, node *commonpb.No
5454
return 0, errNilMetricOrMetricDescriptor
5555
}
5656

57-
ctx, cancel := newContextWithTimeout(ctx, se.o.Timeout)
58-
defer cancel()
59-
6057
// Caches the resources seen so far
6158
seenResources := make(map[*resourcepb.Resource]*monitoredrespb.MonitoredResource)
6259

63-
mb := newMetricsBatcher(ctx, se.o.ProjectID, se.o.NumberOfWorkers, se.c)
60+
mb := newMetricsBatcher(ctx, se.o.ProjectID, se.o.NumberOfWorkers, se.c, se.o.Timeout)
6461
for _, metric := range metrics {
6562
if len(metric.GetTimeseries()) == 0 {
6663
// No TimeSeries to export, skip this metric.
@@ -70,14 +67,14 @@ func (se *statsExporter) PushMetricsProto(ctx context.Context, node *commonpb.No
7067
if metric.GetMetricDescriptor().GetType() == metricspb.MetricDescriptor_SUMMARY {
7168
summaryMtcs := se.convertSummaryMetrics(metric)
7269
for _, summaryMtc := range summaryMtcs {
73-
if err := se.protoCreateMetricDescriptor(ctx, summaryMtc); err != nil {
70+
if err := se.createMetricDescriptorWithTimeout(ctx, summaryMtc); err != nil {
7471
mb.recordDroppedTimeseries(len(summaryMtc.GetTimeseries()), err)
7572
continue
7673
}
7774
se.protoMetricToTimeSeries(ctx, mappedRsc, summaryMtc, mb)
7875
}
7976
} else {
80-
if err := se.protoCreateMetricDescriptor(ctx, metric); err != nil {
77+
if err := se.createMetricDescriptorWithTimeout(ctx, metric); err != nil {
8178
mb.recordDroppedTimeseries(len(metric.GetTimeseries()), err)
8279
continue
8380
}
@@ -298,6 +295,13 @@ func labelsPerTimeSeries(defaults map[string]labelValue, labelKeys []string, lab
298295
return labels, nil
299296
}
300297

298+
func (se *statsExporter) createMetricDescriptorWithTimeout(ctx context.Context, metric *metricspb.Metric) error {
299+
ctx, cancel := newContextWithTimeout(ctx, se.o.Timeout)
300+
defer cancel()
301+
302+
return se.protoCreateMetricDescriptor(ctx, metric)
303+
}
304+
301305
// createMetricDescriptor creates a metric descriptor from the OpenCensus proto metric
302306
// and then creates it remotely using Stackdriver's API.
303307
func (se *statsExporter) protoCreateMetricDescriptor(ctx context.Context, metric *metricspb.Metric) error {

metrics_proto_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1083,7 +1083,7 @@ func makePercentileValue(val, percentile float64) *metricspb.SummaryValue_Snapsh
10831083
}
10841084

10851085
func protoMetricToTimeSeries(ctx context.Context, se *statsExporter, mappedRsc *monitoredrespb.MonitoredResource, metric *metricspb.Metric) ([]*monitoringpb.TimeSeries, error) {
1086-
mb := newMetricsBatcher(ctx, se.o.ProjectID, se.o.NumberOfWorkers, se.c)
1086+
mb := newMetricsBatcher(ctx, se.o.ProjectID, se.o.NumberOfWorkers, se.c, defaultTimeout)
10871087
se.protoMetricToTimeSeries(ctx, mappedRsc, metric, mb)
10881088
return mb.allTss, mb.close(ctx)
10891089
}

0 commit comments

Comments
 (0)