Skip to content
This repository was archived by the owner on Oct 3, 2023. It is now read-only.

Commit 79f9672

Browse files
authored
Add opencensus_task labels to proto metrics if no default labels are provided. (#148)
* Add opencensus_task labels to proto metrics if no default labels are provided. * Make default labels stateless.
1 parent 76b193a commit 79f9672

3 files changed

Lines changed: 79 additions & 20 deletions

File tree

equivalence_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func TestStatsAndMetricsEquivalence(t *testing.T) {
9797
if err != nil {
9898
t.Errorf("#%d: Stats.viewToMetricDescriptor: %v", i, err)
9999
}
100-
pMD, err := se.protoMetricDescriptorToCreateMetricDescriptorRequest(ctx, last.Metrics[0])
100+
pMD, err := se.protoMetricDescriptorToCreateMetricDescriptorRequest(ctx, last.Metrics[0], nil)
101101
if err != nil {
102102
t.Errorf("#%d: Stats.protoMetricDescriptorToMetricDescriptor: %v", i, err)
103103
}
@@ -107,7 +107,7 @@ func TestStatsAndMetricsEquivalence(t *testing.T) {
107107

108108
vdl := []*view.Data{vd}
109109
sctreql := se.makeReq(vdl, maxTimeSeriesPerUpload)
110-
tsl, _ := se.protoMetricToTimeSeries(ctx, last.Node, last.Resource, last.Metrics[0])
110+
tsl, _ := se.protoMetricToTimeSeries(ctx, last.Node, last.Resource, last.Metrics[0], nil)
111111
pctreql := se.combineTimeSeriesToCreateTimeSeriesRequest(tsl)
112112
if diff := cmpTSReqs(pctreql, sctreql); diff != "" {
113113
t.Fatalf("TimeSeries Mismatch -FromMetrics +FromStats: %s", diff)

metrics_proto.go

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,10 @@ var errNilMetric = errors.New("expecting a non-nil metric")
4747
var errNilMetricDescriptor = errors.New("expecting a non-nil metric descriptor")
4848

4949
type metricProtoPayload struct {
50-
node *commonpb.Node
51-
resource *resourcepb.Resource
52-
metric *metricspb.Metric
50+
node *commonpb.Node
51+
resource *resourcepb.Resource
52+
metric *metricspb.Metric
53+
additionalLabels map[string]labelValue
5354
}
5455

5556
// ExportMetricsProto exports OpenCensus Metrics Proto to Stackdriver Monitoring.
@@ -58,11 +59,18 @@ func (se *statsExporter) ExportMetricsProto(ctx context.Context, node *commonpb.
5859
return errNilMetric
5960
}
6061

62+
additionalLabels := se.defaultLabels
63+
if additionalLabels == nil {
64+
// additionalLabels must be stateless because each node is different
65+
additionalLabels = getDefaultLabelsFromNode(node)
66+
}
67+
6168
for _, metric := range metrics {
6269
payload := &metricProtoPayload{
63-
metric: metric,
64-
resource: rsc,
65-
node: node,
70+
metric: metric,
71+
resource: rsc,
72+
node: node,
73+
additionalLabels: additionalLabels,
6674
}
6775
se.protoMetricsBundler.Add(payload, 1)
6876
}
@@ -83,15 +91,15 @@ func (se *statsExporter) handleMetricsProtoUpload(payloads []*metricProtoPayload
8391

8492
for _, payload := range payloads {
8593
// Now create the metric descriptor remotely.
86-
if err := se.createMetricDescriptor(ctx, payload.metric); err != nil {
94+
if err := se.createMetricDescriptor(ctx, payload.metric, payload.additionalLabels); err != nil {
8795
span.SetStatus(trace.Status{Code: 2, Message: err.Error()})
8896
return err
8997
}
9098
}
9199

92100
var allTimeSeries []*monitoringpb.TimeSeries
93101
for _, payload := range payloads {
94-
tsl, err := se.protoMetricToTimeSeries(ctx, payload.node, payload.resource, payload.metric)
102+
tsl, err := se.protoMetricToTimeSeries(ctx, payload.node, payload.resource, payload.metric, payload.additionalLabels)
95103
if err != nil {
96104
span.SetStatus(trace.Status{Code: 2, Message: err.Error()})
97105
return err
@@ -196,7 +204,7 @@ func (se *statsExporter) combineTimeSeriesToCreateTimeSeriesRequest(ts []*monito
196204

197205
// protoMetricToTimeSeries converts a metric into a Stackdriver Monitoring v3 API CreateTimeSeriesRequest
198206
// but it doesn't invoke any remote API.
199-
func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metric *metricspb.Metric) ([]*monitoringpb.TimeSeries, error) {
207+
func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metric *metricspb.Metric, additionalLabels map[string]labelValue) ([]*monitoringpb.TimeSeries, error) {
200208
if metric == nil {
201209
return nil, errNilMetric
202210
}
@@ -223,7 +231,7 @@ func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, node *comm
223231

224232
// Each TimeSeries has labelValues which MUST be correlated
225233
// with that from the MetricDescriptor
226-
labels, err := labelsPerTimeSeries(se.defaultLabels, metricLabelKeys, protoTimeSeries.GetLabelValues())
234+
labels, err := labelsPerTimeSeries(additionalLabels, metricLabelKeys, protoTimeSeries.GetLabelValues())
227235
if err != nil {
228236
// TODO: (@odeke-em) perhaps log this error from labels extraction, if non-nil.
229237
continue
@@ -261,10 +269,10 @@ func labelsPerTimeSeries(defaults map[string]labelValue, labelKeys []*metricspb.
261269
return labels, nil
262270
}
263271

264-
func (se *statsExporter) protoMetricDescriptorToCreateMetricDescriptorRequest(ctx context.Context, metric *metricspb.Metric) (*monitoringpb.CreateMetricDescriptorRequest, error) {
272+
func (se *statsExporter) protoMetricDescriptorToCreateMetricDescriptorRequest(ctx context.Context, metric *metricspb.Metric, additionalLabels map[string]labelValue) (*monitoringpb.CreateMetricDescriptorRequest, error) {
265273
// Otherwise, we encountered a cache-miss and
266274
// should create the metric descriptor remotely.
267-
inMD, err := se.protoToMonitoringMetricDescriptor(metric)
275+
inMD, err := se.protoToMonitoringMetricDescriptor(metric, additionalLabels)
268276
if err != nil {
269277
return nil, err
270278
}
@@ -279,7 +287,7 @@ func (se *statsExporter) protoMetricDescriptorToCreateMetricDescriptorRequest(ct
279287

280288
// createMetricDescriptor creates a metric descriptor from the OpenCensus proto metric
281289
// and then creates it remotely using Stackdriver's API.
282-
func (se *statsExporter) createMetricDescriptor(ctx context.Context, metric *metricspb.Metric) error {
290+
func (se *statsExporter) createMetricDescriptor(ctx context.Context, metric *metricspb.Metric, additionalLabels map[string]labelValue) error {
283291
se.protoMu.Lock()
284292
defer se.protoMu.Unlock()
285293

@@ -290,7 +298,7 @@ func (se *statsExporter) createMetricDescriptor(ctx context.Context, metric *met
290298

291299
// Otherwise, we encountered a cache-miss and
292300
// should create the metric descriptor remotely.
293-
inMD, err := se.protoToMonitoringMetricDescriptor(metric)
301+
inMD, err := se.protoToMonitoringMetricDescriptor(metric, additionalLabels)
294302
if err != nil {
295303
return err
296304
}
@@ -337,7 +345,7 @@ func (se *statsExporter) protoTimeSeriesToMonitoringPoints(ts *metricspb.TimeSer
337345
return sptl, nil
338346
}
339347

340-
func (se *statsExporter) protoToMonitoringMetricDescriptor(metric *metricspb.Metric) (*googlemetricpb.MetricDescriptor, error) {
348+
func (se *statsExporter) protoToMonitoringMetricDescriptor(metric *metricspb.Metric, additionalLabels map[string]labelValue) (*googlemetricpb.MetricDescriptor, error) {
341349
if metric == nil {
342350
return nil, errNilMetric
343351
}
@@ -358,7 +366,7 @@ func (se *statsExporter) protoToMonitoringMetricDescriptor(metric *metricspb.Met
358366
Type: metricType,
359367
MetricKind: metricKind,
360368
ValueType: valueType,
361-
Labels: labelDescriptorsFromProto(se.defaultLabels, metric.GetMetricDescriptor().GetLabelKeys()),
369+
Labels: labelDescriptorsFromProto(additionalLabels, metric.GetMetricDescriptor().GetLabelKeys()),
362370
}
363371

364372
return sdm, nil
@@ -573,3 +581,13 @@ func protoResourceToMonitoredResource(rsp *resourcepb.Resource) *monitoredrespb.
573581
}
574582
return mrsp
575583
}
584+
585+
func getDefaultLabelsFromNode(node *commonpb.Node) map[string]labelValue {
586+
taskValue := fmt.Sprintf("%s-%d@%s", strings.ToLower(node.LibraryInfo.GetLanguage().String()), node.Identifier.Pid, node.Identifier.HostName)
587+
return map[string]labelValue{
588+
opencensusTaskKey: {
589+
val: taskValue,
590+
desc: opencensusTaskDescription,
591+
},
592+
}
593+
}

metrics_proto_test.go

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package stackdriver
1616

1717
import (
1818
"context"
19+
"reflect"
1920
"strings"
2021
"testing"
2122

@@ -27,6 +28,7 @@ import (
2728
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
2829
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
2930

31+
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
3032
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
3133
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
3234
)
@@ -180,7 +182,7 @@ func TestProtoMetricToCreateTimeSeriesRequest(t *testing.T) {
180182
if se == nil {
181183
se = new(statsExporter)
182184
}
183-
tsl, err := se.protoMetricToTimeSeries(context.Background(), nil, nil, tt.in)
185+
tsl, err := se.protoMetricToTimeSeries(context.Background(), nil, nil, tt.in, nil)
184186
if tt.wantErr != "" {
185187
if err == nil || !strings.Contains(err.Error(), tt.wantErr) {
186188
t.Errorf("#%d: unmatched error. Got\n\t%v\nWant\n\t%v", i, err, tt.wantErr)
@@ -241,7 +243,7 @@ func TestProtoToMonitoringMetricDescriptor(t *testing.T) {
241243
if se == nil {
242244
se = new(statsExporter)
243245
}
244-
got, err := se.protoToMonitoringMetricDescriptor(tt.in)
246+
got, err := se.protoToMonitoringMetricDescriptor(tt.in, nil)
245247
if tt.wantErr != "" {
246248
if err == nil || !strings.Contains(err.Error(), tt.wantErr) {
247249
t.Errorf("#%d: \nGot %v\nWanted error substring %q", i, err, tt.wantErr)
@@ -474,3 +476,42 @@ func TestCombineTimeSeriesAndDeduplication(t *testing.T) {
474476
}
475477
}
476478
}
479+
480+
func TestNodeToDefaultLabels(t *testing.T) {
481+
tests := []struct {
482+
in *commonpb.Node
483+
want map[string]labelValue
484+
}{
485+
{
486+
in: &commonpb.Node{
487+
Identifier: &commonpb.ProcessIdentifier{HostName: "host1", Pid: 8081},
488+
LibraryInfo: &commonpb.LibraryInfo{Language: commonpb.LibraryInfo_JAVA},
489+
},
490+
want: map[string]labelValue{
491+
"opencensus_task": {
492+
val: "java-8081@host1",
493+
desc: "Opencensus task identifier",
494+
},
495+
},
496+
},
497+
{
498+
in: &commonpb.Node{
499+
Identifier: &commonpb.ProcessIdentifier{HostName: "host2", Pid: 9090},
500+
LibraryInfo: &commonpb.LibraryInfo{Language: commonpb.LibraryInfo_PYTHON},
501+
},
502+
want: map[string]labelValue{
503+
"opencensus_task": {
504+
val: "python-9090@host2",
505+
desc: "Opencensus task identifier",
506+
},
507+
},
508+
},
509+
}
510+
511+
for i, tt := range tests {
512+
got := getDefaultLabelsFromNode(tt.in)
513+
if !reflect.DeepEqual(got, tt.want) {
514+
t.Fatalf("Test %d failed. Default labels mismatch. Want %v\nGot %v\n", i, tt.want, got)
515+
}
516+
}
517+
}

0 commit comments

Comments
 (0)