Skip to content
This repository was archived by the owner on Oct 3, 2023. It is now read-only.

Commit 2b7f4fc

Browse files
author
Ramon Nogueira
authored
Add timeouts; make span size buffer limit precise (#58)
Add a timeout (customizeable by option) to the contexts used to make API calls (default to 2 seconds). Add a size limit on spans we buffer in memory and compute the size of the span by converting to protobuf before buffering, rather than estimating from the SpanData struct.
1 parent 65a75df commit 2b7f4fc

5 files changed

Lines changed: 125 additions & 37 deletions

File tree

stackdriver.go

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,16 +89,22 @@ type Options struct {
8989
TraceClientOptions []option.ClientOption
9090

9191
// BundleDelayThreshold determines the max amount of time
92-
// the exporter can wait before uploading view data to
92+
// the exporter can wait before uploading view data or trace spans to
9393
// the backend.
9494
// Optional.
9595
BundleDelayThreshold time.Duration
9696

97-
// BundleCountThreshold determines how many view data events
97+
// BundleCountThreshold determines how many view data events or trace spans
9898
// can be buffered before batch uploading them to the backend.
9999
// Optional.
100100
BundleCountThreshold int
101101

102+
// TraceSpansBufferMaxBytes is the maximum size (in bytes) of spans that
103+
// will be buffered in memory before being dropped.
104+
//
105+
// If unset, a default of 8MB will be used.
106+
TraceSpansBufferMaxBytes int
107+
102108
// Resource sets the MonitoredResource against which all views will be
103109
// recorded by this exporter.
104110
//
@@ -190,9 +196,14 @@ type Options struct {
190196
// trace and metric clients, and then every time a new batch of traces or
191197
// stats needs to be uploaded.
192198
//
199+
// Do not set a timeout on this context. Instead, set the Timeout option.
200+
//
193201
// If unset, context.Background() will be used.
194202
Context context.Context
195203

204+
// Timeout for all API calls. If not set, defaults to 5 seconds.
205+
Timeout time.Duration
206+
196207
// GetMonitoredResource may be provided to supply the details of the
197208
// monitored resource dynamically based on the tags associated with each
198209
// data point. Most users will not need to set this, but should instead
@@ -209,6 +220,8 @@ type Options struct {
209220
GetMonitoredResource func(*view.View, []tag.Tag) ([]tag.Tag, monitoredresource.Interface)
210221
}
211222

223+
const defaultTimeout = 5 * time.Second
224+
212225
// Exporter is a stats and trace exporter that uploads data to Stackdriver.
213226
//
214227
// You can create a single Exporter and register it as both a trace exporter
@@ -222,11 +235,12 @@ type Exporter struct {
222235
// NewExporter creates a new Exporter that implements both stats.Exporter and
223236
// trace.Exporter.
224237
func NewExporter(o Options) (*Exporter, error) {
225-
if o.Context == nil {
226-
o.Context = context.Background()
227-
}
228238
if o.ProjectID == "" {
229-
creds, err := google.FindDefaultCredentials(o.Context, traceapi.DefaultAuthScopes()...)
239+
ctx := o.Context
240+
if ctx == nil {
241+
ctx = context.Background()
242+
}
243+
creds, err := google.FindDefaultCredentials(ctx, traceapi.DefaultAuthScopes()...)
230244
if err != nil {
231245
return nil, fmt.Errorf("stackdriver: %v", err)
232246
}
@@ -297,6 +311,18 @@ func (o Options) handleError(err error) {
297311
log.Printf("Failed to export to Stackdriver: %v", err)
298312
}
299313

314+
func (o Options) newContextWithTimeout() (context.Context, func()) {
315+
ctx := o.Context
316+
if ctx == nil {
317+
ctx = context.Background()
318+
}
319+
timeout := o.Timeout
320+
if timeout <= 0 {
321+
timeout = defaultTimeout
322+
}
323+
return context.WithTimeout(ctx, timeout)
324+
}
325+
300326
// convertMonitoredResourceToPB converts MonitoredResource data in to
301327
// protocol buffer.
302328
func convertMonitoredResourceToPB(mr monitoredresource.Interface) *monitoredrespb.MonitoredResource {

stats.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ func newStatsExporter(o Options) (*statsExporter, error) {
7878
}
7979

8080
opts := append(o.MonitoringClientOptions, option.WithUserAgent(userAgent))
81-
client, err := monitoring.NewMetricClient(o.Context, opts...)
81+
ctx, cancel := o.newContextWithTimeout()
82+
defer cancel()
83+
client, err := monitoring.NewMetricClient(ctx, opts...)
8284
if err != nil {
8385
return nil, err
8486
}
@@ -167,8 +169,10 @@ func (e *statsExporter) Flush() {
167169
}
168170

169171
func (e *statsExporter) uploadStats(vds []*view.Data) error {
172+
ctx, cancel := e.o.newContextWithTimeout()
173+
defer cancel()
170174
ctx, span := trace.StartSpan(
171-
e.o.Context,
175+
ctx,
172176
"contrib.go.opencensus.io/exporter/stackdriver.uploadStats",
173177
trace.WithSampler(trace.NeverSample()),
174178
)

stats_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@ package stackdriver
1616

1717
import (
1818
"context"
19-
"contrib.go.opencensus.io/exporter/stackdriver/monitoredresource"
2019
"fmt"
2120
"testing"
2221
"time"
2322

23+
"contrib.go.opencensus.io/exporter/stackdriver/monitoredresource"
24+
2425
"cloud.google.com/go/monitoring/apiv3"
2526
"github.com/golang/protobuf/ptypes/timestamp"
2627
"github.com/google/go-cmp/cmp"

trace.go

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
package stackdriver
1616

1717
import (
18+
"context"
1819
"fmt"
1920
"log"
2021
"sync"
2122
"time"
2223

2324
tracingclient "cloud.google.com/go/trace/apiv2"
25+
"github.com/golang/protobuf/proto"
2426
"go.opencensus.io/trace"
2527
"google.golang.org/api/support/bundler"
2628
tracepb "google.golang.org/genproto/googleapis/devtools/cloudtrace/v2"
@@ -34,63 +36,69 @@ type traceExporter struct {
3436
projectID string
3537
bundler *bundler.Bundler
3638
// uploadFn defaults to uploadSpans; it can be replaced for tests.
37-
uploadFn func(spans []*trace.SpanData)
39+
uploadFn func(spans []*tracepb.Span)
3840
overflowLogger
3941
client *tracingclient.Client
4042
}
4143

4244
var _ trace.Exporter = (*traceExporter)(nil)
4345

4446
func newTraceExporter(o Options) (*traceExporter, error) {
45-
client, err := tracingclient.NewClient(o.Context, o.TraceClientOptions...)
47+
ctx := o.Context
48+
if ctx == nil {
49+
ctx = context.Background()
50+
}
51+
client, err := tracingclient.NewClient(ctx, o.TraceClientOptions...)
4652
if err != nil {
4753
return nil, fmt.Errorf("stackdriver: couldn't initialize trace client: %v", err)
4854
}
4955
return newTraceExporterWithClient(o, client), nil
5056
}
5157

58+
const defaultBufferedByteLimit = 8 * 1024 * 1024
59+
5260
func newTraceExporterWithClient(o Options, c *tracingclient.Client) *traceExporter {
5361
e := &traceExporter{
5462
projectID: o.ProjectID,
5563
client: c,
5664
o: o,
5765
}
58-
bundler := bundler.NewBundler((*trace.SpanData)(nil), func(bundle interface{}) {
59-
e.uploadFn(bundle.([]*trace.SpanData))
66+
b := bundler.NewBundler((*tracepb.Span)(nil), func(bundle interface{}) {
67+
e.uploadFn(bundle.([]*tracepb.Span))
6068
})
6169
if o.BundleDelayThreshold > 0 {
62-
bundler.DelayThreshold = o.BundleDelayThreshold
70+
b.DelayThreshold = o.BundleDelayThreshold
6371
} else {
64-
bundler.DelayThreshold = 2 * time.Second
72+
b.DelayThreshold = 2 * time.Second
6573
}
6674
if o.BundleCountThreshold > 0 {
67-
bundler.BundleCountThreshold = o.BundleCountThreshold
75+
b.BundleCountThreshold = o.BundleCountThreshold
6876
} else {
69-
bundler.BundleCountThreshold = 50
77+
b.BundleCountThreshold = 50
7078
}
7179
// The measured "bytes" are not really bytes, see exportReceiver.
72-
bundler.BundleByteThreshold = bundler.BundleCountThreshold * 200
73-
bundler.BundleByteLimit = bundler.BundleCountThreshold * 1000
74-
bundler.BufferedByteLimit = bundler.BundleCountThreshold * 2000
80+
b.BundleByteThreshold = b.BundleCountThreshold * 200
81+
b.BundleByteLimit = b.BundleCountThreshold * 1000
82+
if o.TraceSpansBufferMaxBytes > 0 {
83+
b.BufferedByteLimit = o.TraceSpansBufferMaxBytes
84+
} else {
85+
b.BufferedByteLimit = defaultBufferedByteLimit
86+
}
7587

76-
e.bundler = bundler
88+
e.bundler = b
7789
e.uploadFn = e.uploadSpans
7890
return e
7991
}
8092

8193
// ExportSpan exports a SpanData to Stackdriver Trace.
8294
func (e *traceExporter) ExportSpan(s *trace.SpanData) {
83-
// n is a length heuristic.
84-
n := 1
85-
n += len(s.Attributes)
86-
n += len(s.Annotations)
87-
n += len(s.MessageEvents)
88-
err := e.bundler.Add(s, n)
95+
protoSpan := protoFromSpanData(s, e.projectID, e.o.Resource)
96+
protoSize := proto.Size(protoSpan)
97+
err := e.bundler.Add(protoSpan, protoSize)
8998
switch err {
9099
case nil:
91100
return
92101
case bundler.ErrOversizedItem:
93-
go e.uploadFn([]*trace.SpanData{s})
94102
case bundler.ErrOverflow:
95103
e.overflowLogger.log()
96104
default:
@@ -107,17 +115,16 @@ func (e *traceExporter) Flush() {
107115
}
108116

109117
// uploadSpans uploads a set of spans to Stackdriver.
110-
func (e *traceExporter) uploadSpans(spans []*trace.SpanData) {
118+
func (e *traceExporter) uploadSpans(spans []*tracepb.Span) {
111119
req := tracepb.BatchWriteSpansRequest{
112120
Name: "projects/" + e.projectID,
113-
Spans: make([]*tracepb.Span, 0, len(spans)),
114-
}
115-
for _, span := range spans {
116-
req.Spans = append(req.Spans, protoFromSpanData(span, e.projectID, e.o.Resource))
121+
Spans: spans,
117122
}
118123
// Create a never-sampled span to prevent traces associated with exporter.
119-
ctx, span := trace.StartSpan( // TODO: add timeouts
120-
e.o.Context,
124+
ctx, cancel := e.o.newContextWithTimeout()
125+
defer cancel()
126+
ctx, span := trace.StartSpan(
127+
ctx,
121128
"contrib.go.opencensus.io/exporter/stackdriver.uploadSpans",
122129
trace.WithSampler(trace.NeverSample()),
123130
)

trace_test.go

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@ package stackdriver
1616

1717
import (
1818
"context"
19+
"fmt"
1920
"testing"
2021
"time"
2122

2223
"go.opencensus.io/trace"
24+
tracepb "google.golang.org/genproto/googleapis/devtools/cloudtrace/v2"
2325
)
2426

2527
func TestBundling(t *testing.T) {
@@ -29,8 +31,8 @@ func TestBundling(t *testing.T) {
2931
BundleCountThreshold: 10,
3032
}, nil)
3133

32-
ch := make(chan []*trace.SpanData)
33-
exporter.uploadFn = func(spans []*trace.SpanData) {
34+
ch := make(chan []*tracepb.Span)
35+
exporter.uploadFn = func(spans []*tracepb.Span) {
3436
ch <- spans
3537
}
3638
trace.RegisterExporter(exporter)
@@ -60,3 +62,51 @@ func TestBundling(t *testing.T) {
6062
case <-time.After(time.Second / 5):
6163
}
6264
}
65+
66+
func TestNewContext_Timeout(t *testing.T) {
67+
e := newTraceExporterWithClient(Options{
68+
Timeout: 10 * time.Millisecond,
69+
}, nil)
70+
ctx, cancel := e.o.newContextWithTimeout()
71+
defer cancel()
72+
select {
73+
case <-time.After(60 * time.Second):
74+
t.Fatal("should have timed out")
75+
case <-ctx.Done():
76+
}
77+
}
78+
79+
func TestTraceSpansBufferMaxBytes(t *testing.T) {
80+
e := newTraceExporterWithClient(Options{
81+
Context: context.Background(),
82+
Timeout: 10 * time.Millisecond,
83+
TraceSpansBufferMaxBytes: 20000,
84+
}, nil)
85+
waitCh := make(chan struct{})
86+
exported := 0
87+
e.uploadFn = func(spans []*tracepb.Span) {
88+
<-waitCh
89+
exported++
90+
}
91+
for i := 0; i < 10; i++ {
92+
e.ExportSpan(makeSampleSpanData())
93+
}
94+
close(waitCh)
95+
e.Flush()
96+
if exported != 2 {
97+
t.Errorf("exported = %d; want 2", exported)
98+
}
99+
}
100+
101+
func makeSampleSpanData() *trace.SpanData {
102+
sd := &trace.SpanData{
103+
Annotations: make([]trace.Annotation, 32),
104+
Links: make([]trace.Link, 32),
105+
MessageEvents: make([]trace.MessageEvent, 128),
106+
Attributes: make(map[string]interface{}),
107+
}
108+
for i := 0; i < 32; i++ {
109+
sd.Attributes[fmt.Sprintf("attribute-%d", i)] = ""
110+
}
111+
return sd
112+
}

0 commit comments

Comments
 (0)