Skip to content
This repository was archived by the owner on Jul 31, 2023. It is now read-only.

Commit 13369a4

Browse files
authored
Adds an exported function to flush internal reader (#1248)
1 parent e736602 commit 13369a4

File tree

2 files changed

+81
-2
lines changed

2 files changed

+81
-2
lines changed

metric/metricexport/reader.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func (ir *IntervalReader) Start() error {
130130
reportingInterval = ir.ReportingInterval
131131
}
132132

133-
if ir.done != nil {
133+
if ir.quit != nil {
134134
return errAlreadyStarted
135135
}
136136
ir.timer = time.NewTicker(reportingInterval)
@@ -172,6 +172,19 @@ func (ir *IntervalReader) Stop() {
172172
ir.quit = nil
173173
}
174174

175+
// Flush flushes the metrics if IntervalReader is stopped, otherwise no-op.
176+
func (ir *IntervalReader) Flush() {
177+
ir.mu.Lock()
178+
defer ir.mu.Unlock()
179+
180+
// No-op if IntervalReader is not stopped
181+
if ir.quit != nil {
182+
return
183+
}
184+
185+
ir.reader.ReadAndExport(ir.exporter)
186+
}
187+
175188
// ReadAndExport reads metrics from all producer registered with
176189
// producer manager and then exports them using provided exporter.
177190
func (r *Reader) ReadAndExport(exporter Exporter) {

metric/metricexport/reader_test.go

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,69 @@ func TestManualReadForIntervalReader(t *testing.T) {
117117
resetExporter(exporter1)
118118
}
119119

120+
func TestFlushNoOpForIntervalReader(t *testing.T) {
121+
ir1 = createAndStart(exporter1, duration1, t)
122+
123+
gaugeEntry.Set(1)
124+
125+
// since IR is not stopped, flush does nothing
126+
ir1.Flush()
127+
128+
// expect no data points
129+
checkExportedCount(exporter1, 0, t)
130+
checkExportedMetricDesc(exporter1, "active_request", t)
131+
ir1.Stop()
132+
resetExporter(exporter1)
133+
}
134+
135+
func TestFlushAllowMultipleForIntervalReader(t *testing.T) {
136+
ir1 = createAndStart(exporter1, duration1, t)
137+
138+
gaugeEntry.Set(1)
139+
140+
ir1.Stop()
141+
ir1.Flush()
142+
143+
// metric is still coming in
144+
gaugeEntry.Add(1)
145+
146+
// one more flush after IR stopped
147+
ir1.Flush()
148+
149+
// expect 2 data point, one from each flush
150+
checkExportedCount(exporter1, 2, t)
151+
checkExportedValues(exporter1, []int64{1, 2}, t)
152+
checkExportedMetricDesc(exporter1, "active_request", t)
153+
154+
resetExporter(exporter1)
155+
}
156+
157+
func TestFlushRestartForIntervalReader(t *testing.T) {
158+
ir1 = createAndStart(exporter1, duration1, t)
159+
160+
gaugeEntry.Set(1)
161+
ir1.Stop()
162+
ir1.Flush()
163+
164+
// restart the IR
165+
err := ir1.Start()
166+
if err != nil {
167+
t.Fatalf("error starting reader %v\n", err)
168+
}
169+
170+
gaugeEntry.Add(1)
171+
172+
ir1.Stop()
173+
ir1.Flush()
174+
175+
// expect 2 data point, one from each flush
176+
checkExportedCount(exporter1, 2, t)
177+
checkExportedValues(exporter1, []int64{1, 2}, t)
178+
checkExportedMetricDesc(exporter1, "active_request", t)
179+
180+
resetExporter(exporter1)
181+
}
182+
120183
func TestProducerWithIntervalReaderStop(t *testing.T) {
121184
ir1 = createAndStart(exporter1, duration1, t)
122185
ir1.Stop()
@@ -166,7 +229,10 @@ func TestIntervalReaderMultipleStop(t *testing.T) {
166229

167230
func TestIntervalReaderMultipleStart(t *testing.T) {
168231
ir1 = createAndStart(exporter1, duration1, t)
169-
ir1.Start()
232+
err := ir1.Start()
233+
if err == nil {
234+
t.Fatalf("expected error but got nil\n")
235+
}
170236

171237
gaugeEntry.Add(1)
172238

0 commit comments

Comments
 (0)