Skip to content
This repository was archived by the owner on Oct 3, 2023. It is now read-only.

Commit 626e69e

Browse files
nicktravrghetia
authored andcommitted
trace: allow for concurrent uploads to Stackdriver (#246)
* trace: allow for concurrent uploads to Stackdriver In situations where a large number of spans need to be exported from a single instance of the exporter, bundle uploads are limited to using a single goroutine. This limits the overall throughput of the exporter. Make use of the NumberOfWorkers option to allow the exporter to use multiple, concurrent goroutines to upload spans to Stackdriver. Closes #245. Signed-off-by: Nick Travers <n.e.travers@gmail.com> * Update documentation Signed-off-by: Nick Travers <n.e.travers@gmail.com> * Count number of exported spans Signed-off-by: Nick Travers <n.e.travers@gmail.com> * Fix data race in test case Signed-off-by: Nick Travers <n.e.travers@gmail.com>
1 parent 1cdca91 commit 626e69e

3 files changed

Lines changed: 71 additions & 2 deletions

File tree

stackdriver.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,8 +259,7 @@ type Options struct {
259259
ReportingInterval time.Duration
260260

261261
// NumberOfWorkers sets the number of go rountines that send requests
262-
// to Stackdriver Monitoring. This is only used for Proto metrics export
263-
// for now. The minimum number of workers is 1.
262+
// to Stackdriver Monitoring and Trace. The minimum number of workers is 1.
264263
NumberOfWorkers int
265264

266265
// ResourceByDescriptor may be provided to supply monitored resource dynamically

trace.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ func newTraceExporterWithClient(o Options, c *tracingclient.Client) *traceExport
7676
} else {
7777
b.BundleCountThreshold = 50
7878
}
79+
if o.NumberOfWorkers > 0 {
80+
b.HandlerLimit = o.NumberOfWorkers
81+
}
7982
// The measured "bytes" are not really bytes, see exportReceiver.
8083
b.BundleByteThreshold = b.BundleCountThreshold * 200
8184
b.BundleByteLimit = b.BundleCountThreshold * 1000

trace_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package stackdriver
1717
import (
1818
"context"
1919
"fmt"
20+
"sync"
2021
"testing"
2122
"time"
2223

@@ -63,6 +64,72 @@ func TestBundling(t *testing.T) {
6364
}
6465
}
6566

67+
func TestBundling_ConcurrentExports(t *testing.T) {
68+
workers := 2
69+
spansPerWorker := 10
70+
delay := 2 * time.Second
71+
exporter := newTraceExporterWithClient(Options{
72+
ProjectID: "fakeProjectID",
73+
BundleCountThreshold: spansPerWorker,
74+
BundleDelayThreshold: delay,
75+
NumberOfWorkers: workers,
76+
}, nil)
77+
78+
wg := sync.WaitGroup{}
79+
waitCh := make(chan struct{})
80+
wg.Add(workers)
81+
82+
var exportMap sync.Map // maintain a collection of the spans exported
83+
exporter.uploadFn = func(spans []*tracepb.Span) {
84+
for _, s := range spans {
85+
exportMap.Store(s.SpanId, true)
86+
}
87+
wg.Done()
88+
89+
// Don't complete the function until the WaitGroup is done.
90+
// This ensures the semaphore limiting the concurrent uploads is not
91+
// released by one goroutine completing before the other.
92+
wg.Wait()
93+
}
94+
trace.RegisterExporter(exporter)
95+
96+
totalSpans := workers * spansPerWorker
97+
var expectedSpanIDs []string
98+
go func() {
99+
// Release enough spans to form two bundles
100+
for i := 0; i < totalSpans; i++ {
101+
_, span := trace.StartSpan(context.Background(), "span", trace.WithSampler(trace.AlwaysSample()))
102+
expectedSpanIDs = append(expectedSpanIDs, span.SpanContext().SpanID.String())
103+
span.End()
104+
}
105+
106+
// Wait for the desired concurrency before completing
107+
wg.Wait()
108+
close(waitCh)
109+
}()
110+
111+
select {
112+
case <-waitCh:
113+
case <-time.After(delay / 2): // fail before a time-based flush is triggered
114+
t.Fatal("timed out waiting for concurrent uploads")
115+
}
116+
117+
// all the spans are accounted for
118+
var exportedSpans []string
119+
exportMap.Range(func(key, value interface{}) bool {
120+
exportedSpans = append(exportedSpans, key.(string))
121+
return true
122+
})
123+
if len(exportedSpans) != totalSpans {
124+
t.Errorf("got %d spans, want %d", len(exportedSpans), totalSpans)
125+
}
126+
for _, id := range expectedSpanIDs {
127+
if _, ok := exportMap.Load(id); !ok {
128+
t.Errorf("want %s; missing from exported spans", id)
129+
}
130+
}
131+
}
132+
66133
func TestNewContext_Timeout(t *testing.T) {
67134
e := newTraceExporterWithClient(Options{
68135
Timeout: 10 * time.Millisecond,

0 commit comments

Comments
 (0)