Skip to content
This repository was archived by the owner on Oct 3, 2023. It is now read-only.

Commit 0e2df90

Browse files
committed
stats, metrics: deduplicate TimeSeries with non-unique Metric.Type
Since existence, the stats exporter was sending Stackdriver Metrics only split up by chunks of maxUploadSize of 200, but Metric-s with the exact same Type were still uploaded in the same CreateTimeSeriesRequest which would cause: err: rpc error: code = InvalidArgument desc = One or more TimeSeries could not be written: Field timeSeries[?] had an invalid value: Duplicate TimeSeries encountered. Only one point can be written per TimeSeries per request.: timeSeries[?] and the previous remedy just relied on a synchronization of SetReportingPeriod of 60+s which would aggregate stats/view.Data. This change now splits up such Metrics so even if uploads are made in less than 60s, CreateTimeSeriesRequest-s will be uniquely uploaded and won't cause Stackdriver's backend to trip up. Fixes #73
1 parent 7f88e3b commit 0e2df90

5 files changed

Lines changed: 703 additions & 484 deletions

File tree

equivalence_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func TestStatsAndMetricsEquivalence(t *testing.T) {
111111
vdl := []*view.Data{vd}
112112
sctreql := se.makeReq(vdl, maxTimeSeriesPerUpload)
113113
tsl, _ := se.protoMetricToTimeSeries(ctx, last.Node, last.Resource, last.Metrics[0])
114-
pctreql := []*monitoringpb.CreateTimeSeriesRequest{se.combineTimeSeriesToCreateTimeSeriesRequest(tsl)}
114+
pctreql := se.combineTimeSeriesToCreateTimeSeriesRequest(tsl)
115115
if !reflect.DeepEqual(sctreql, pctreql) {
116116
t.Errorf("#%d: TimeSeries Mismatch\nStats CreateTimeSeriesRequest:\n\t%v\nProto CreateTimeSeriesRequest:\n\t%v\n",
117117
i, sctreql, pctreql)

metrics.go

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -101,25 +101,78 @@ func (se *statsExporter) handleMetricsUpload(payloads []*metricPayload) error {
101101
end = len(allTimeSeries)
102102
}
103103
batch := allTimeSeries[start:end]
104-
ctsreq := se.combineTimeSeriesToCreateTimeSeriesRequest(batch)
105-
if err := createTimeSeries(ctx, se.c, ctsreq); err != nil {
106-
// span.SetStatus(trace.Status{Code: 2, Message: err.Error()})
107-
// TODO(@odeke-em, @jbd): Don't fail fast here, perhaps batch errors?
108-
// return err
104+
ctsreql := se.combineTimeSeriesToCreateTimeSeriesRequest(batch)
105+
for _, ctsreq := range ctsreql {
106+
if err := createTimeSeries(ctx, se.c, ctsreq); err != nil {
107+
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
108+
// TODO(@odeke-em): Don't fail fast here, perhaps batch errors?
109+
// return err
110+
}
109111
}
110112
}
111113

112114
return nil
113115
}
114116

115-
func (se *statsExporter) combineTimeSeriesToCreateTimeSeriesRequest(ts []*monitoringpb.TimeSeries) *monitoringpb.CreateTimeSeriesRequest {
117+
func (se *statsExporter) combineTimeSeriesToCreateTimeSeriesRequest(ts []*monitoringpb.TimeSeries) (ctsreql []*monitoringpb.CreateTimeSeriesRequest) {
116118
if len(ts) == 0 {
117119
return nil
118120
}
119-
return &monitoringpb.CreateTimeSeriesRequest{
120-
Name: monitoring.MetricProjectPath(se.o.ProjectID),
121-
TimeSeries: ts,
121+
122+
// Since there are scenarios in which Metrics with the same Type
123+
// can be bunched in the same TimeSeries, we have to ensure that
124+
// we create a unique CreateTimeSeriesRequest with entirely unique Metrics
125+
// per TimeSeries, lest we'll encounter:
126+
//
127+
// err: rpc error: code = InvalidArgument desc = One or more TimeSeries could not be written:
128+
// Field timeSeries[2] had an invalid value: Duplicate TimeSeries encountered.
129+
// Only one point can be written per TimeSeries per request.: timeSeries[2]
130+
//
131+
// This scenario happens when we are using the OpenCensus Agent in which multiple metrics
132+
// are streamed by various client applications.
133+
// See https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/73
134+
uniqueTimeSeries := make([]*monitoringpb.TimeSeries, 0, len(ts))
135+
nonUniqueTimeSeries := make([]*monitoringpb.TimeSeries, 0, len(ts))
136+
seenMetrics := make(map[string]struct{})
137+
138+
for _, tti := range ts {
139+
signature := tti.Metric.GetType()
140+
if _, alreadySeen := seenMetrics[signature]; !alreadySeen {
141+
uniqueTimeSeries = append(uniqueTimeSeries, tti)
142+
seenMetrics[signature] = struct{}{}
143+
} else {
144+
nonUniqueTimeSeries = append(nonUniqueTimeSeries, tti)
145+
}
122146
}
147+
148+
// UniqueTimeSeries can be bunched up together
149+
// While for each nonUniqueTimeSeries, we have
150+
// to make a unique CreateTimeSeriesRequest.
151+
ctsreql = append(ctsreql, &monitoringpb.CreateTimeSeriesRequest{
152+
Name: monitoring.MetricProjectPath(se.o.ProjectID),
153+
TimeSeries: uniqueTimeSeries,
154+
})
155+
156+
// Now recursively also combine the non-unique TimeSeries
157+
// that were singly added to nonUniqueTimeSeries.
158+
// The reason is that we need optimal combinations
159+
// for optimal combinations because:
160+
// * "a/b/c"
161+
// * "a/b/c"
162+
// * "x/y/z"
163+
// * "a/b/c"
164+
// * "x/y/z"
165+
// * "p/y/z"
166+
// * "d/y/z"
167+
//
168+
// should produce:
169+
// CreateTimeSeries(uniqueTimeSeries) :: ["a/b/c", "x/y/z", "p/y/z", "d/y/z"]
170+
// CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c"]
171+
// CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c", "x/y/z"]
172+
nonUniqueRequests := se.combineTimeSeriesToCreateTimeSeriesRequest(nonUniqueTimeSeries)
173+
ctsreql = append(ctsreql, nonUniqueRequests...)
174+
175+
return ctsreql
123176
}
124177

125178
// protoMetricToTimeSeries converts a metric into a Stackdriver Monitoring v3 API CreateTimeSeriesRequest
@@ -468,8 +521,12 @@ func protoResourceToMonitoredResource(rsp *resourcepb.Resource) *monitoredrespb.
468521
Type: "global",
469522
}
470523
}
524+
typ := rsp.Type
525+
if typ == "" {
526+
typ = "global"
527+
}
471528
mrsp := &monitoredrespb.MonitoredResource{
472-
Type: rsp.Type,
529+
Type: typ,
473530
}
474531
if rsp.Labels != nil {
475532
mrsp.Labels = make(map[string]string, len(rsp.Labels))

metrics_test.go

Lines changed: 124 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"strings"
2222
"testing"
2323

24+
monitoring "cloud.google.com/go/monitoring/apiv3"
2425
"github.com/golang/protobuf/ptypes/timestamp"
2526
distributionpb "google.golang.org/genproto/googleapis/api/distribution"
2627
googlemetricpb "google.golang.org/genproto/googleapis/api/metric"
@@ -37,7 +38,7 @@ func TestProtoResourceToMonitoringResource(t *testing.T) {
3738
want *monitoredrespb.MonitoredResource
3839
}{
3940
{in: nil, want: &monitoredrespb.MonitoredResource{Type: "global"}},
40-
{in: &resourcepb.Resource{}, want: &monitoredrespb.MonitoredResource{}},
41+
{in: &resourcepb.Resource{}, want: &monitoredrespb.MonitoredResource{Type: "global"}},
4142
{
4243
in: &resourcepb.Resource{
4344
Type: "foo",
@@ -91,7 +92,7 @@ func TestProtoMetricToCreateTimeSeriesRequest(t *testing.T) {
9192

9293
tests := []struct {
9394
in *metricspb.Metric
94-
want *monitoringpb.CreateTimeSeriesRequest
95+
want []*monitoringpb.CreateTimeSeriesRequest
9596
wantErr string
9697
statsExporter *statsExporter
9798
}{
@@ -135,33 +136,35 @@ func TestProtoMetricToCreateTimeSeriesRequest(t *testing.T) {
135136
statsExporter: &statsExporter{
136137
o: Options{ProjectID: "foo"},
137138
},
138-
want: &monitoringpb.CreateTimeSeriesRequest{
139-
Name: "projects/foo",
140-
TimeSeries: []*monitoringpb.TimeSeries{
141-
{
142-
Metric: &googlemetricpb.Metric{
143-
Type: "custom.googleapis.com/opencensus/with_metric_descriptor",
144-
},
145-
Resource: &monitoredrespb.MonitoredResource{
146-
Type: "global",
147-
},
148-
Points: []*monitoringpb.Point{
149-
{
150-
Interval: &monitoringpb.TimeInterval{
151-
StartTime: startTimestamp,
152-
EndTime: endTimestamp,
153-
},
154-
Value: &monitoringpb.TypedValue{
155-
Value: &monitoringpb.TypedValue_DistributionValue{
156-
DistributionValue: &distributionpb.Distribution{
157-
Count: 1,
158-
Mean: 11.9,
159-
SumOfSquaredDeviation: 0,
160-
BucketCounts: []int64{0, 1, 0, 0, 0},
161-
BucketOptions: &distributionpb.Distribution_BucketOptions{
162-
Options: &distributionpb.Distribution_BucketOptions_ExplicitBuckets{
163-
ExplicitBuckets: &distributionpb.Distribution_BucketOptions_Explicit{
164-
Bounds: []float64{0, 10, 20, 30, 40},
139+
want: []*monitoringpb.CreateTimeSeriesRequest{
140+
{
141+
Name: "projects/foo",
142+
TimeSeries: []*monitoringpb.TimeSeries{
143+
{
144+
Metric: &googlemetricpb.Metric{
145+
Type: "custom.googleapis.com/opencensus/with_metric_descriptor",
146+
},
147+
Resource: &monitoredrespb.MonitoredResource{
148+
Type: "global",
149+
},
150+
Points: []*monitoringpb.Point{
151+
{
152+
Interval: &monitoringpb.TimeInterval{
153+
StartTime: startTimestamp,
154+
EndTime: endTimestamp,
155+
},
156+
Value: &monitoringpb.TypedValue{
157+
Value: &monitoringpb.TypedValue_DistributionValue{
158+
DistributionValue: &distributionpb.Distribution{
159+
Count: 1,
160+
Mean: 11.9,
161+
SumOfSquaredDeviation: 0,
162+
BucketCounts: []int64{0, 1, 0, 0, 0},
163+
BucketOptions: &distributionpb.Distribution_BucketOptions{
164+
Options: &distributionpb.Distribution_BucketOptions_ExplicitBuckets{
165+
ExplicitBuckets: &distributionpb.Distribution_BucketOptions_Explicit{
166+
Bounds: []float64{0, 10, 20, 30, 40},
167+
},
165168
},
166169
},
167170
},
@@ -404,6 +407,98 @@ func TestProtoMetricsToMonitoringMetrics_fromProtoPoint(t *testing.T) {
404407
}
405408
}
406409

410+
func TestCombineTimeSeriesAndDeduplication(t *testing.T) {
411+
se := new(statsExporter)
412+
413+
tests := []struct {
414+
in []*monitoringpb.TimeSeries
415+
want []*monitoringpb.CreateTimeSeriesRequest
416+
}{
417+
{
418+
in: []*monitoringpb.TimeSeries{
419+
{
420+
Metric: &googlemetricpb.Metric{
421+
Type: "a/b/c",
422+
},
423+
},
424+
{
425+
Metric: &googlemetricpb.Metric{
426+
Type: "a/b/c",
427+
},
428+
},
429+
{
430+
Metric: &googlemetricpb.Metric{
431+
Type: "A/b/c",
432+
},
433+
},
434+
{
435+
Metric: &googlemetricpb.Metric{
436+
Type: "a/b/c",
437+
},
438+
},
439+
{
440+
Metric: &googlemetricpb.Metric{
441+
Type: "X/Y/Z",
442+
},
443+
},
444+
},
445+
want: []*monitoringpb.CreateTimeSeriesRequest{
446+
{
447+
Name: monitoring.MetricProjectPath(se.o.ProjectID),
448+
TimeSeries: []*monitoringpb.TimeSeries{
449+
{
450+
Metric: &googlemetricpb.Metric{
451+
Type: "a/b/c",
452+
},
453+
},
454+
{
455+
Metric: &googlemetricpb.Metric{
456+
Type: "A/b/c",
457+
},
458+
},
459+
{
460+
Metric: &googlemetricpb.Metric{
461+
Type: "X/Y/Z",
462+
},
463+
},
464+
},
465+
},
466+
{
467+
Name: monitoring.MetricProjectPath(se.o.ProjectID),
468+
TimeSeries: []*monitoringpb.TimeSeries{
469+
{
470+
Metric: &googlemetricpb.Metric{
471+
Type: "a/b/c",
472+
},
473+
},
474+
},
475+
},
476+
{
477+
Name: monitoring.MetricProjectPath(se.o.ProjectID),
478+
TimeSeries: []*monitoringpb.TimeSeries{
479+
{
480+
Metric: &googlemetricpb.Metric{
481+
Type: "a/b/c",
482+
},
483+
},
484+
},
485+
},
486+
},
487+
},
488+
}
489+
490+
for i, tt := range tests {
491+
got := se.combineTimeSeriesToCreateTimeSeriesRequest(tt.in)
492+
want := tt.want
493+
if !reflect.DeepEqual(got, want) {
494+
gj, wj := serializeAsJSON(got), serializeAsJSON(want)
495+
if gj != wj {
496+
t.Errorf("#%d: Unmatched JSON\nGot:\n\t%s\nWant:\n\t%s", i, gj, wj)
497+
}
498+
}
499+
}
500+
}
501+
407502
func serializeAsJSON(v interface{}) string {
408503
blob, _ := json.MarshalIndent(v, "", " ")
409504
return string(blob)

stats.go

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -208,35 +208,38 @@ func (e *statsExporter) uploadStats(vds []*view.Data) error {
208208
return nil
209209
}
210210

211-
func (e *statsExporter) makeReq(vds []*view.Data, limit int) []*monitoringpb.CreateTimeSeriesRequest {
211+
func (se *statsExporter) makeReq(vds []*view.Data, limit int) []*monitoringpb.CreateTimeSeriesRequest {
212212
var reqs []*monitoringpb.CreateTimeSeriesRequest
213-
var timeSeries []*monitoringpb.TimeSeries
213+
214+
var allTimeSeries []*monitoringpb.TimeSeries
214215
for _, vd := range vds {
215216
for _, row := range vd.Rows {
216-
tags, resource := e.getMonitoredResource(vd.View, append([]tag.Tag(nil), row.Tags...))
217+
tags, resource := se.getMonitoredResource(vd.View, append([]tag.Tag(nil), row.Tags...))
217218
ts := &monitoringpb.TimeSeries{
218219
Metric: &metricpb.Metric{
219-
Type: e.metricType(vd.View),
220-
Labels: newLabels(e.defaultLabels, tags),
220+
Type: se.metricType(vd.View),
221+
Labels: newLabels(se.defaultLabels, tags),
221222
},
222223
Resource: resource,
223224
Points: []*monitoringpb.Point{newPoint(vd.View, row, vd.Start, vd.End)},
224225
}
225-
timeSeries = append(timeSeries, ts)
226-
if len(timeSeries) == limit {
227-
reqs = append(reqs, &monitoringpb.CreateTimeSeriesRequest{
228-
Name: monitoring.MetricProjectPath(e.o.ProjectID),
229-
TimeSeries: timeSeries,
230-
})
231-
timeSeries = []*monitoringpb.TimeSeries{}
232-
}
226+
allTimeSeries = append(allTimeSeries, ts)
233227
}
234228
}
229+
230+
var timeSeries []*monitoringpb.TimeSeries
231+
for _, ts := range allTimeSeries {
232+
timeSeries = append(timeSeries, ts)
233+
if len(timeSeries) == limit {
234+
ctsreql := se.combineTimeSeriesToCreateTimeSeriesRequest(timeSeries)
235+
reqs = append(reqs, ctsreql...)
236+
timeSeries = timeSeries[:0]
237+
}
238+
}
239+
235240
if len(timeSeries) > 0 {
236-
reqs = append(reqs, &monitoringpb.CreateTimeSeriesRequest{
237-
Name: monitoring.MetricProjectPath(e.o.ProjectID),
238-
TimeSeries: timeSeries,
239-
})
241+
ctsreql := se.combineTimeSeriesToCreateTimeSeriesRequest(timeSeries)
242+
reqs = append(reqs, ctsreql...)
240243
}
241244
return reqs
242245
}

0 commit comments

Comments
 (0)