Skip to content

Commit 464d493

Browse files
fix: correct disableCache watch semantics and backoff
Address review feedback on knative#9036: - Fix Watch(rv="") replay bug: perform a lightweight LIST(limit=1) before each Watch to obtain the current resourceVersion, so Watch starts from that point and does not replay synthetic ADDED events for pre-existing objects (which is what rv="" triggers per the Kubernetes API contract). - Handle watch.Error 410 Gone: detect StatusReasonGone in drainWatchEvents and return so watchResourceLoop re-lists for a fresh resourceVersion, breaking the infinite tight-retry loop that would otherwise occur. - Replace hardcoded 5s retry with exponential backoff+jitter (wait.Backoff, 1s→60s cap) via watchResourceLoop, preventing thundering herd on reconnect. - Log warning when both DisableCache and FailFast are set, documenting that DisableCache takes precedence. - Replace TestAdapter_DisableCacheSkipsList (wrong: asserted no LIST was called) with TestAdapter_DisableCacheLightweightList (correct: asserts LIST is called with Limit=1). Add TestAdapter_DisableCacheEventDelivery which injects a watch event via a fake watcher and asserts a CloudEvent is delivered to the sink client.
1 parent cbea2ad commit 464d493

2 files changed

Lines changed: 243 additions & 72 deletions

File tree

pkg/adapter/apiserver/adapter.go

Lines changed: 110 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package apiserver
1919
import (
2020
"context"
2121
"fmt"
22+
"math"
2223
"sync"
2324
"time"
2425

@@ -28,6 +29,7 @@ import (
2829
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2930
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3031
"k8s.io/apimachinery/pkg/runtime"
32+
"k8s.io/apimachinery/pkg/util/wait"
3133
"k8s.io/apimachinery/pkg/watch"
3234

3335
"k8s.io/client-go/discovery"
@@ -104,57 +106,126 @@ func (a *apiServerAdapter) start(ctx context.Context, stopCh <-chan struct{}) er
104106
return a.startResilient(ctx, stopCh, delegate, matches)
105107
}
106108

107-
// startWatchOnly starts watches without an initial LIST call, so pre-existing objects
108-
// are not enumerated. This avoids the N*namespaces LIST requests that cause client-side
109-
// throttling in large clusters.
109+
// startWatchOnly starts watches for all matched resources. It performs a lightweight
110+
// LIST (limit=1) per resource interface to obtain the current resourceVersion, then
111+
// issues a Watch from that point. This means pre-existing objects do not produce
112+
// events on startup, and startup API load is O(resources*namespaces) lightweight
113+
// LISTs rather than full object dumps.
114+
//
115+
// When both DisableCache and FailFast are set, DisableCache takes precedence.
110116
func (a *apiServerAdapter) startWatchOnly(ctx context.Context, stopCh <-chan struct{}, delegate cache.Store, matches []resourceWatchMatch) error {
117+
if a.config.FailFast {
118+
a.logger.Warn("disableCache=true takes precedence over failFast=true; running in no-cache mode without fail-fast behavior")
119+
}
111120
for _, match := range matches {
112121
if match.apiResource == nil {
113122
a.logger.Errorf("could not retrieve information about resource %s: it doesn't exist. skipping...", match.resourceWatch.GVR.String())
114123
continue
115124
}
116125
for _, res := range match.resourceInterfaces {
117-
go func(ri dynamic.ResourceInterface, labelSelector string) {
118-
for {
119-
select {
120-
case <-ctx.Done():
121-
return
122-
default:
123-
}
126+
go a.watchResourceLoop(ctx, res, match.resourceWatch.LabelSelector, delegate)
127+
}
128+
}
129+
<-stopCh
130+
return nil
131+
}
124132

125-
lo := metav1.ListOptions{LabelSelector: labelSelector}
126-
w, err := ri.Watch(ctx, lo)
127-
if err != nil {
128-
a.logger.Errorw("watch error, retrying", zap.Error(err))
129-
select {
130-
case <-ctx.Done():
131-
return
132-
case <-time.After(5 * time.Second):
133-
}
134-
continue
135-
}
133+
func noCacheBackoff() wait.Backoff {
134+
return wait.Backoff{
135+
Duration: 1 * time.Second,
136+
Factor: 2.0,
137+
Jitter: 0.1,
138+
Steps: math.MaxInt32,
139+
Cap: 60 * time.Second,
140+
}
141+
}
136142

137-
for event := range w.ResultChan() {
138-
obj, ok := event.Object.(*unstructured.Unstructured)
139-
if !ok {
140-
continue
141-
}
142-
switch event.Type {
143-
case watch.Added:
144-
_ = delegate.Add(obj)
145-
case watch.Modified:
146-
_ = delegate.Update(obj)
147-
case watch.Deleted:
148-
_ = delegate.Delete(obj)
149-
}
150-
}
151-
}
152-
}(res, match.resourceWatch.LabelSelector)
143+
// watchResourceLoop runs a continuous List+Watch loop for a single resource interface.
144+
// A lightweight LIST (limit=1) is issued before each Watch to obtain the current
145+
// resourceVersion so that Watch does not replay synthetic ADDED events for
146+
// pre-existing objects (the behaviour when resourceVersion="" is passed to Watch).
147+
// On watch.Error with StatusReasonGone (HTTP 410 — resourceVersion expired from the
148+
// watch cache), the loop re-lists to get a fresh resourceVersion.
149+
// All failures back off exponentially with jitter before retrying.
150+
func (a *apiServerAdapter) watchResourceLoop(ctx context.Context, ri dynamic.ResourceInterface, labelSelector string, delegate cache.Store) {
151+
backoff := noCacheBackoff()
152+
for ctx.Err() == nil {
153+
list, err := ri.List(ctx, metav1.ListOptions{
154+
LabelSelector: labelSelector,
155+
Limit: 1,
156+
})
157+
if err != nil {
158+
if ctx.Err() != nil {
159+
return
160+
}
161+
a.logger.Errorw("failed to list for resourceVersion, retrying", zap.Error(err))
162+
select {
163+
case <-ctx.Done():
164+
return
165+
case <-time.After(backoff.Step()):
166+
}
167+
continue
153168
}
169+
rv := list.GetResourceVersion()
170+
backoff = noCacheBackoff()
171+
172+
w, err := ri.Watch(ctx, metav1.ListOptions{
173+
LabelSelector: labelSelector,
174+
ResourceVersion: rv,
175+
})
176+
if err != nil {
177+
if ctx.Err() != nil {
178+
return
179+
}
180+
a.logger.Errorw("watch error, retrying", zap.Error(err))
181+
select {
182+
case <-ctx.Done():
183+
return
184+
case <-time.After(backoff.Step()):
185+
}
186+
continue
187+
}
188+
backoff = noCacheBackoff()
189+
a.drainWatchEvents(ctx, w, delegate)
154190
}
191+
}
155192

156-
<-stopCh
157-
return nil
193+
// drainWatchEvents reads from a watch until the context is done, the watch
194+
// channel closes, or a watch.Error event is received. On StatusReasonGone
195+
// (HTTP 410), it returns immediately so watchResourceLoop can re-list.
196+
func (a *apiServerAdapter) drainWatchEvents(ctx context.Context, w watch.Interface, delegate cache.Store) {
197+
defer w.Stop()
198+
for {
199+
select {
200+
case <-ctx.Done():
201+
return
202+
case event, ok := <-w.ResultChan():
203+
if !ok {
204+
return
205+
}
206+
switch event.Type {
207+
case watch.Added:
208+
if obj, ok := event.Object.(*unstructured.Unstructured); ok {
209+
_ = delegate.Add(obj)
210+
}
211+
case watch.Modified:
212+
if obj, ok := event.Object.(*unstructured.Unstructured); ok {
213+
_ = delegate.Update(obj)
214+
}
215+
case watch.Deleted:
216+
if obj, ok := event.Object.(*unstructured.Unstructured); ok {
217+
_ = delegate.Delete(obj)
218+
}
219+
case watch.Error:
220+
if status, ok := event.Object.(*metav1.Status); ok && status.Reason == metav1.StatusReasonGone {
221+
a.logger.Info("watch resourceVersion expired (410 Gone), will re-list")
222+
} else {
223+
a.logger.Errorw("received watch error event", zap.Any("object", event.Object))
224+
}
225+
return
226+
}
227+
}
228+
}
158229
}
159230

160231
func (a *apiServerAdapter) startResilient(ctx context.Context, stopCh <-chan struct{}, delegate cache.Store, matches []resourceWatchMatch) error {

0 commit comments

Comments
 (0)