Skip to content
This repository was archived by the owner on Jul 31, 2023. It is now read-only.

Commit beff310

Browse files
songy23rghetia
authored andcommitted
Exemplar: Record with sampled SpanContext in gRPC plugin. (#1127)
1 parent 648e9a0 commit beff310

File tree

3 files changed

+180
-17
lines changed

3 files changed

+180
-17
lines changed

plugin/ocgrpc/client_stats_handler_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,19 @@
1616
package ocgrpc
1717

1818
import (
19+
"reflect"
1920
"testing"
2021

22+
"github.com/google/go-cmp/cmp"
23+
"github.com/google/go-cmp/cmp/cmpopts"
24+
2125
"go.opencensus.io/trace"
2226
"google.golang.org/grpc/codes"
2327
"google.golang.org/grpc/status"
2428

2529
"golang.org/x/net/context"
2630

31+
"go.opencensus.io/metric/metricdata"
2732
"go.opencensus.io/stats/view"
2833
"go.opencensus.io/tag"
2934

@@ -334,6 +339,72 @@ func TestClientDefaultCollections(t *testing.T) {
334339
}
335340
}
336341

342+
func TestClientRecordExemplar(t *testing.T) {
343+
key, _ := tag.NewKey("test_key")
344+
tagInfo := &stats.RPCTagInfo{FullMethodName: "/package.service/method"}
345+
out := &stats.OutPayload{Length: 2000}
346+
end := &stats.End{Error: nil}
347+
348+
if err := view.Register(ClientSentBytesPerRPCView); err != nil {
349+
t.Error(err)
350+
}
351+
h := &ClientHandler{}
352+
h.StartOptions.Sampler = trace.AlwaysSample()
353+
ctx, err := tag.New(context.Background(), tag.Upsert(key, "test_val"))
354+
if err != nil {
355+
t.Error(err)
356+
}
357+
encoded := tag.Encode(tag.FromContext(ctx))
358+
ctx = stats.SetTags(context.Background(), encoded)
359+
ctx = h.TagRPC(ctx, tagInfo)
360+
361+
out.Client = true
362+
h.HandleRPC(ctx, out)
363+
end.Client = true
364+
h.HandleRPC(ctx, end)
365+
366+
span := trace.FromContext(ctx)
367+
if span == nil {
368+
t.Fatal("expected non-nil span, got nil")
369+
}
370+
if !span.IsRecordingEvents() {
371+
t.Errorf("span should be sampled")
372+
}
373+
attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: span.SpanContext()}
374+
wantExemplar := &metricdata.Exemplar{Value: 2000, Attachments: attachments}
375+
376+
rows, err := view.RetrieveData(ClientSentBytesPerRPCView.Name)
377+
if err != nil {
378+
t.Fatal("Error RetrieveData ", err)
379+
}
380+
if len(rows) == 0 {
381+
t.Fatal("No data was recorded.")
382+
}
383+
data := rows[0].Data
384+
dis, ok := data.(*view.DistributionData)
385+
if !ok {
386+
t.Fatal("want DistributionData, got ", data)
387+
}
388+
// Only recorded value is 2000, which falls into the second bucket (1024, 2048].
389+
wantBuckets := []int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
390+
if !reflect.DeepEqual(dis.CountPerBucket, wantBuckets) {
391+
t.Errorf("want buckets %v, got %v", wantBuckets, dis.CountPerBucket)
392+
}
393+
for i, e := range dis.ExemplarsPerBucket {
394+
// Only the second bucket should have an exemplar.
395+
if i == 1 {
396+
if diff := cmpExemplar(e, wantExemplar); diff != "" {
397+
t.Fatalf("Unexpected Exemplar -got +want: %s", diff)
398+
}
399+
} else if e != nil {
400+
t.Errorf("want nil exemplar, got %v", e)
401+
}
402+
}
403+
404+
// Unregister views to cleanup.
405+
view.Unregister(ClientSentBytesPerRPCView)
406+
}
407+
337408
// containsRow returns true if rows contain r.
338409
func containsRow(rows []*view.Row, r *view.Row) bool {
339410
for _, x := range rows {
@@ -343,3 +414,8 @@ func containsRow(rows []*view.Row, r *view.Row) bool {
343414
}
344415
return false
345416
}
417+
418+
// Compare exemplars while ignoring exemplar timestamp, since timestamp is non-deterministic.
419+
func cmpExemplar(got, want *metricdata.Exemplar) string {
420+
return cmp.Diff(got, want, cmpopts.IgnoreFields(metricdata.Exemplar{}, "Timestamp"), cmpopts.IgnoreUnexported(metricdata.Exemplar{}))
421+
}

plugin/ocgrpc/server_stats_handler_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616
package ocgrpc
1717

1818
import (
19+
"reflect"
1920
"testing"
2021

2122
"go.opencensus.io/trace"
2223
"golang.org/x/net/context"
2324

25+
"go.opencensus.io/metric/metricdata"
2426
"go.opencensus.io/stats/view"
2527
"go.opencensus.io/tag"
2628

@@ -334,3 +336,69 @@ func newDistributionData(countPerBucket []int64, count int64, min, max, mean, su
334336
CountPerBucket: countPerBucket,
335337
}
336338
}
339+
340+
func TestServerRecordExemplar(t *testing.T) {
341+
key, _ := tag.NewKey("test_key")
342+
tagInfo := &stats.RPCTagInfo{FullMethodName: "/package.service/method"}
343+
out := &stats.OutPayload{Length: 2000}
344+
end := &stats.End{Error: nil}
345+
346+
if err := view.Register(ServerSentBytesPerRPCView); err != nil {
347+
t.Error(err)
348+
}
349+
h := &ServerHandler{}
350+
h.StartOptions.Sampler = trace.AlwaysSample()
351+
ctx, err := tag.New(context.Background(), tag.Upsert(key, "test_val"))
352+
if err != nil {
353+
t.Error(err)
354+
}
355+
encoded := tag.Encode(tag.FromContext(ctx))
356+
ctx = stats.SetTags(context.Background(), encoded)
357+
ctx = h.TagRPC(ctx, tagInfo)
358+
359+
out.Client = false
360+
h.HandleRPC(ctx, out)
361+
end.Client = false
362+
h.HandleRPC(ctx, end)
363+
364+
span := trace.FromContext(ctx)
365+
if span == nil {
366+
t.Fatal("expected non-nil span, got nil")
367+
}
368+
if !span.IsRecordingEvents() {
369+
t.Errorf("span should be sampled")
370+
}
371+
attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: span.SpanContext()}
372+
wantExemplar := &metricdata.Exemplar{Value: 2000, Attachments: attachments}
373+
374+
rows, err := view.RetrieveData(ServerSentBytesPerRPCView.Name)
375+
if err != nil {
376+
t.Fatal("Error RetrieveData ", err)
377+
}
378+
if len(rows) == 0 {
379+
t.Fatal("No data was recorded.")
380+
}
381+
data := rows[0].Data
382+
dis, ok := data.(*view.DistributionData)
383+
if !ok {
384+
t.Fatal("want DistributionData, got ", data)
385+
}
386+
// Only recorded value is 2000, which falls into the second bucket (1024, 2048].
387+
wantBuckets := []int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
388+
if !reflect.DeepEqual(dis.CountPerBucket, wantBuckets) {
389+
t.Errorf("want buckets %v, got %v", wantBuckets, dis.CountPerBucket)
390+
}
391+
for i, e := range dis.ExemplarsPerBucket {
392+
// Only the second bucket should have an exemplar.
393+
if i == 1 {
394+
if diff := cmpExemplar(e, wantExemplar); diff != "" {
395+
t.Fatalf("Unexpected Exemplar -got +want: %s", diff)
396+
}
397+
} else if e != nil {
398+
t.Errorf("want nil exemplar, got %v", e)
399+
}
400+
}
401+
402+
// Unregister views to cleanup.
403+
view.Unregister(ServerSentBytesPerRPCView)
404+
}

plugin/ocgrpc/stats_common.go

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ import (
2222
"sync/atomic"
2323
"time"
2424

25+
"go.opencensus.io/metric/metricdata"
2526
ocstats "go.opencensus.io/stats"
2627
"go.opencensus.io/stats/view"
2728
"go.opencensus.io/tag"
29+
"go.opencensus.io/trace"
2830
"google.golang.org/grpc/codes"
2931
"google.golang.org/grpc/grpclog"
3032
"google.golang.org/grpc/stats"
@@ -141,27 +143,31 @@ func handleRPCEnd(ctx context.Context, s *stats.End) {
141143
}
142144

143145
latencyMillis := float64(elapsedTime) / float64(time.Millisecond)
146+
attachments := getSpanCtxAttachment(ctx)
144147
if s.Client {
145-
ocstats.RecordWithTags(ctx,
146-
[]tag.Mutator{
148+
ocstats.RecordWithOptions(ctx,
149+
ocstats.WithTags(
147150
tag.Upsert(KeyClientMethod, methodName(d.method)),
148-
tag.Upsert(KeyClientStatus, st),
149-
},
150-
ClientSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)),
151-
ClientSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentCount)),
152-
ClientReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvCount)),
153-
ClientReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)),
154-
ClientRoundtripLatency.M(latencyMillis))
151+
tag.Upsert(KeyClientStatus, st)),
152+
ocstats.WithAttachments(attachments),
153+
ocstats.WithMeasurements(
154+
ClientSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)),
155+
ClientSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentCount)),
156+
ClientReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvCount)),
157+
ClientReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)),
158+
ClientRoundtripLatency.M(latencyMillis)))
155159
} else {
156-
ocstats.RecordWithTags(ctx,
157-
[]tag.Mutator{
160+
ocstats.RecordWithOptions(ctx,
161+
ocstats.WithTags(
158162
tag.Upsert(KeyServerStatus, st),
159-
},
160-
ServerSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)),
161-
ServerSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentCount)),
162-
ServerReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvCount)),
163-
ServerReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)),
164-
ServerLatency.M(latencyMillis))
163+
),
164+
ocstats.WithAttachments(attachments),
165+
ocstats.WithMeasurements(
166+
ServerSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)),
167+
ServerSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentCount)),
168+
ServerReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvCount)),
169+
ServerReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)),
170+
ServerLatency.M(latencyMillis)))
165171
}
166172
}
167173

@@ -206,3 +212,16 @@ func statusCodeToString(s *status.Status) string {
206212
return "CODE_" + strconv.FormatInt(int64(c), 10)
207213
}
208214
}
215+
216+
func getSpanCtxAttachment(ctx context.Context) metricdata.Attachments {
217+
attachments := map[string]interface{}{}
218+
span := trace.FromContext(ctx)
219+
if span == nil {
220+
return attachments
221+
}
222+
spanCtx := span.SpanContext()
223+
if spanCtx.IsSampled() {
224+
attachments[metricdata.AttachmentKeySpanContext] = spanCtx
225+
}
226+
return attachments
227+
}

0 commit comments

Comments
 (0)