Skip to content
This repository was archived by the owner on Oct 3, 2023. It is now read-only.

Commit 3b40441

Browse files
authored
Add a metric batcher which preallocates batch sizes. (#194)
* Add a metric batcher which preallocates batch sizes. * Fix received/dropped logic. * Fix comments and add a new method that returns num dropped timeseries * Use recordDroppedTimeseries when possible * Rename ExportMetricsProtoAndReturnDropped to PushMetricsProto * Fix tests.
1 parent 3a3e471 commit 3b40441

6 files changed

Lines changed: 214 additions & 172 deletions

File tree

equivalence_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,18 +102,22 @@ func TestStatsAndMetricsEquivalence(t *testing.T) {
102102
Name: fmt.Sprintf("projects/%s", se.o.ProjectID),
103103
MetricDescriptor: sMD,
104104
}
105-
pMDR, err := se.protoMetricDescriptorToCreateMetricDescriptorRequest(ctx, metricPbs[i], nil)
105+
inMD, err := se.protoToMonitoringMetricDescriptor(metricPbs[i], nil)
106106
if err != nil {
107107
t.Errorf("#%d: Stats.protoMetricDescriptorToMetricDescriptor: %v", i, err)
108108
}
109+
pMDR := &monitoringpb.CreateMetricDescriptorRequest{
110+
Name: fmt.Sprintf("projects/%s", se.o.ProjectID),
111+
MetricDescriptor: inMD,
112+
}
109113
if diff := cmpMDReq(pMDR, sMDR); diff != "" {
110114
t.Fatalf("MetricDescriptor Mismatch -FromMetricsPb +FromMetrics: %s", diff)
111115
}
112116

113117
stss, _ := se.metricToMpbTs(ctx, metric)
114118
sctreql := se.combineTimeSeriesToCreateTimeSeriesRequest(stss)
115-
tsl, _ := se.protoMetricToTimeSeries(ctx, se.getResource(nil, metricPbs[i], seenResources), metricPbs[i], nil)
116-
pctreql := se.combineTimeSeriesToCreateTimeSeriesRequest(tsl)
119+
allTss, _ := protoMetricToTimeSeries(ctx, se, se.getResource(nil, metricPbs[i], seenResources), metricPbs[i])
120+
pctreql := se.combineTimeSeriesToCreateTimeSeriesRequest(allTss)
117121
if diff := cmpTSReqs(pctreql, sctreql); diff != "" {
118122
t.Fatalf("TimeSeries Mismatch -FromMetricsPb +FromMetrics: %s", diff)
119123
}
@@ -336,7 +340,7 @@ func TestEquivalenceStatsVsMetricsUploads(t *testing.T) {
336340
})
337341

338342
// Export the proto Metrics to the Stackdriver backend.
339-
se.ExportMetricsProto(context.Background(), nil, nil, metricPbs)
343+
se.PushMetricsProto(context.Background(), nil, nil, metricPbs)
340344
se.Flush()
341345

342346
var stackdriverTimeSeriesFromMetricsPb []*monitoringpb.CreateTimeSeriesRequest

metrics_batcher.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Copyright 2019, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package stackdriver
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"strings"
21+
22+
monitoring "cloud.google.com/go/monitoring/apiv3"
23+
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
24+
)
25+
26+
type metricsBatcher struct {
27+
projectID string
28+
allReqs []*monitoringpb.CreateTimeSeriesRequest
29+
allTss []*monitoringpb.TimeSeries
30+
allErrs []error
31+
// Counts all dropped TimeSeries by this exporter.
32+
droppedTimeSeries int
33+
}
34+
35+
func newMetricsBatcher(projectID string) *metricsBatcher {
36+
return &metricsBatcher{
37+
projectID: projectID,
38+
allTss: make([]*monitoringpb.TimeSeries, 0, maxTimeSeriesPerUpload),
39+
droppedTimeSeries: 0,
40+
}
41+
}
42+
43+
func (mb *metricsBatcher) recordDroppedTimeseries(numTimeSeries int, err error) {
44+
mb.droppedTimeSeries += numTimeSeries
45+
mb.allErrs = append(mb.allErrs, err)
46+
}
47+
48+
func (mb *metricsBatcher) addTimeSeries(ts *monitoringpb.TimeSeries) {
49+
mb.allTss = append(mb.allTss, ts)
50+
if len(mb.allTss) == maxTimeSeriesPerUpload {
51+
mb.allReqs = append(mb.allReqs, &monitoringpb.CreateTimeSeriesRequest{
52+
Name: monitoring.MetricProjectPath(mb.projectID),
53+
TimeSeries: mb.allTss,
54+
})
55+
mb.allTss = make([]*monitoringpb.TimeSeries, maxTimeSeriesPerUpload)
56+
}
57+
}
58+
59+
func (mb *metricsBatcher) export(ctx context.Context, mc *monitoring.MetricClient) {
60+
// Last batch, if any.
61+
if len(mb.allTss) > 0 {
62+
mb.allReqs = append(mb.allReqs, &monitoringpb.CreateTimeSeriesRequest{
63+
Name: monitoring.MetricProjectPath(mb.projectID),
64+
TimeSeries: mb.allTss,
65+
})
66+
}
67+
68+
// Send create time series requests to Stackdriver.
69+
for _, req := range mb.allReqs {
70+
if err := createTimeSeries(ctx, mc, req); err != nil {
71+
mb.recordDroppedTimeseries(len(req.TimeSeries), err)
72+
}
73+
}
74+
}
75+
76+
func (mb *metricsBatcher) finalError() error {
77+
numErrors := len(mb.allErrs)
78+
if numErrors == 0 {
79+
return nil
80+
}
81+
82+
if numErrors == 1 {
83+
return mb.allErrs[0]
84+
}
85+
86+
errMsgs := make([]string, 0, numErrors)
87+
for _, err := range mb.allErrs {
88+
errMsgs = append(errMsgs, err.Error())
89+
}
90+
return fmt.Errorf("[%s]", strings.Join(errMsgs, "; "))
91+
}

0 commit comments

Comments
 (0)