feat(queuev2): cached timer queue reader with prefetch and time-based eviction#7962
Conversation
180eedb to
206586b
Compare
206586b to
308bee5
Compare
308bee5 to
4bee2d2
Compare
1. extraInCache moved to mismatchTags (logged on mismatch only) 2. All mismatch log fields prefixed shadowMismatch.* 3. HasMismatches restored to include ExtraInCache — still a mismatch even if benign, for consistent observability 4. ack-level-update → read-level-update in updateInclusiveLowerBound 5. LookAHead now delegates to base during warmup, matching GetTask Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
Collapse the partial/full-page if/else into a single target computation: target := exclusiveMaxKey (partial page) target = NextTaskKey (full page) One updateExclusiveUpperBound call, one reason tag 'prefetch-advance'. Drop the verbose comment block — the early return on trim is self-explanatory. Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…efetch putTasks now returns true when RTrimBySize fired and updated the upper bound. prefetch uses this directly instead of comparing exclusiveUpperBound before and after the call, removing the prevUpper variable and the verbose !Equal guard. Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
… reset updateExclusiveUpperBound: now logs inclusiveLowerBound + cacheSize for context, and records the size histogram. updateInclusiveLowerBound: now a pure setter with log + metrics. No advance guard, no LTrim — safe to call for resets too. advanceInclusiveLowerBound: new helper wrapping the advance-only logic (upper-bound cap, advance guard, LTrim) then calls updateInclusiveLowerBound. Gap detection: uses updateInclusiveLowerBound directly after Clear() since going backwards is intentional and no LTrim is needed. timeEvict + UpdateReadLevel: fixed to call advanceInclusiveLowerBound (were calling updateInclusiveLowerBound after the rename, which skipped the LTrim — tasks below the eviction point were never removed). Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…fy Inject guard Gap detection now calls updateInclusiveLowerBound (the simple setter) instead of assigning inclusiveLowerBound directly. No LTrim needed since the queue is already cleared, and the setter accepts backwards resets. Inject: combine isDisabled and isInWarmup into a single guard condition. Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
ExtraInCache (tasks in cache not in DB) is benign given task independence. It now logs at Info without incrementing the mismatch counter, so it is still observable without polluting the Warn metric used for alerting. MissingFromCache and NextKeyMismatch are real divergences: they log at Warn and increment the counter as before. shadowMismatch.dbTaskCount and shadowMismatch.cacheTaskCount moved into the mismatch tag set so they are only emitted when there is something to act on. The match path logs a bare Debug with no extra fields. Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
Adds a readLevelSyncLoop goroutine to cachedScheduledQueue that ticks every TimerProcessorCacheReadLevelSyncInterval (default 1s) and calls reader.UpdateReadLevel(virtualQueueManager.GetMinReadLevel()). This keeps the cache lower bound within ~1s of actual processing progress, compared to the previous ~30s lag from updateQueueStateFn which is gated on DB writes. New config: TimerProcessorCacheReadLevelSyncInterval (global, default 1s). Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…vent startup false coverage At shard takeover, the cached reader starts with inclusiveLowerBound = MinimumHistoryTaskKey and exclusiveUpperBound = MinimumHistoryTaskKey. After the first prefetch, exclusiveUpperBound jumps to now + MaxLookAheadWindow but inclusiveLowerBound stays at MinimumHistoryTaskKey until timeEvict catches up (~60s). During this window, isRangeCovered returns true for ALL historical tasks (from the beginning of time to the cache ceiling). Tasks from the previous shard owner exist in DB but not in the fresh cache. GetTask returns 0, the virtual slice pops its progress entry, and those tasks are permanently skipped at shard takeover. Fix: after the first prefetch, advance inclusiveLowerBound to the fetch anchor (now - EvictionSafeWindow). Tasks before that anchor now correctly miss the cache and fall through to the DB. Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…adow comparison Add two debug logs in Inject — one when a task is accepted into the cache window and one when it is skipped — each carrying the task's visibilityTimestamp so misses are traceable in logs. Remove the second cache re-read (liveResp) from getTaskInShadow. The re-read was filtering tasks as "benign inject races", but the scheduledTaskMaxReadLevel invariant guarantees every newly-committed task has visibilityTimestamp > readLevel, so no in-flight task can appear in the GetTask range at commit time. The filter was hiding real cache misses; dropping it makes the shadow comparison strict. Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
91ce398 to
fc34e25
Compare
Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
taskID=0 is reserved for range-boundary keys (e.g. NewHistoryTaskKey(ts, 0) used for exclusiveUpperBound/inclusiveLowerBound). Real tasks always get non-zero IDs from generateTaskIDLocked (RangeID << RangeSizeBits). Inserting sentinels corrupts queue ordering; filter them at putTasks, the single insertion point for both Inject and prefetch paths. Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…guous fetched data Previously gap detection called queue.Clear() and reset both bounds to MinimumHistoryTaskKey. This wiped previously-injected tasks and forced the re-prefetch to start fresh, creating a Cassandra read-after-write race window where tasks committed just before the gap could be missed. Now gap detection only discards the in-flight fetched batch (which is non-contiguous with the current window). Existing cache contents and the updated exclusiveUpperBound set by the concurrent Inject/RTrimBySize are preserved. The next prefetch starts from the current exclusiveUpperBound and fills the gap correctly. Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…inject race During a prefetch DB call, exclusiveUpperBound has not yet advanced to prefetchTargetUpper, so isTaskCovered returns false for tasks in [exclusiveUpperBound, prefetchTargetUpper). If such a task is also committed to Cassandra after the DB snapshot, it ends up in neither the DB results nor the cache, while the cache later claims coverage of the range — a permanent miss in enabled mode. Fix: record prefetchTargetUpper before the DB call under mu.Lock. In Inject, tasks in [exclusiveUpperBound, prefetchTargetUpper) are placed in a staging buffer instead of being skipped. After the DB call (success, error, or gap detection), drainPendingInjectBuffer inserts any buffered tasks that are now within the extended window. putTask deduplicates by key so tasks returned by both the DB and Inject are inserted only once. Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…date logs timeEvictionLoop starts immediately on Start() with no warmup guard, so timeEvict fires every TimeEvictionInterval even during the warmup grace period. During warmup the in-memory queue is empty (inject is a no-op and the prefetch loop hasn't fired yet), so LTrim has nothing to remove. The only observable effect is spurious 'lower bound is updated' log lines — especially noisy if log levels are raised to Info. Add an isInWarmup() guard at the top of timeEvict() to return early when still in warmup. This mirrors the existing guards in Inject, GetTask, and LookAHead. UpdateReadLevel (called by readLevelSyncLoop) is intentionally not guarded: it tracks real processing progress and should update lb even during warmup. Update tests: existing TimeEvict table tests now set injectAllowedAfter to the past to bypass warmup; TimeEvictLoop lifecycle test does the same while keeping the 10-minute WarmupGracePeriod to suppress the prefetch timer. Add TestCachedQueueReader_TimeEvict_DuringWarmup to pin the new behaviour. Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…ject Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
taskID=0 is reserved for range-boundary sentinel keys (e.g. exclusiveUpperBound). Real tasks always receive a non-zero ID from generateTaskIDLocked. putTasks() already filtered them as a safety net, but the correct rejection point is the Inject boundary so sentinels cannot corrupt pendingInjectBuffer ordering or produce spurious log lines. Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…nject loop Replace the inline if/else chain in Inject with two focused boolean helpers: - shouldInjectTask: returns true when a task falls in [lb, ub) and is not a sentinel - shouldBufferTask: returns true when a task falls in the pending-prefetch window [ub, prefetchTargetUpper) and is not a sentinel Also removes the unused injectDecision enum and classifyInjectTask function that were partially written in a previous (interrupted) session. Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…nject loop Replace the inline if/else chain in Inject with two focused boolean helpers: - shouldInjectTask: returns true when a task falls in [lb, ub) and is not a sentinel - shouldBufferTask: returns true when a task falls in the pending-prefetch window [ub, prefetchTargetUpper) and is not a sentinel; called only when shouldInjectTask returned false Also removes the unused injectDecision enum and classifyInjectTask function that were left over from a previous interrupted session. Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
Add uniform random jitter [0, PrefetchJitter) to the computed prefetch delay in nextPrefetchDelay(). When many shards start simultaneously (e.g. after a node restart), their prefetch loops would otherwise fire at near-identical intervals and create a thundering-herd on Cassandra. Jitter breaks this synchronisation without changing the minimum-interval floor. Implementation: - New PrefetchJitter DurationPropertyFn in cachedQueueReaderOptions and the history.timerProcessorCachePrefetchJitter dynamic config key (default 0, which disables jitter for backwards compat) - rand.Rand seeded at construction; accessed only from the single-goroutine prefetchLoop so no mutex is needed - defaultTestOptions sets PrefetchJitter=0 so existing exact-delay assertions are unaffected; new TestCachedQueueReader_NextPrefetchDelay_Jitter verifies that 20 consecutive calls produce varied delays within [0, maxJitter) Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
Swap the custom rand.Rand field and additive-duration jitter for the codebase-standard backoff.JitDuration(delay, coefficient) utility. Changes: - PrefetchJitter DurationPropertyFn → PrefetchJitterCoefficient FloatPropertyFn - Remove rand *rand.Rand struct field and math/rand import - nextPrefetchDelay: apply JitDuration after the MinPrefetchInterval floor - New dynamic config key history.timerProcessorCachePrefetchJitterCoefficient (FloatKey, default 0, backwards-compatible) Matches the pattern used by queue_reader.go:130 and queue_immediate.go:167. Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
| if min := q.options.MinPrefetchInterval(); delay < min { | ||
| delay = min | ||
| } | ||
| return backoff.JitDuration(delay, q.options.PrefetchJitterCoefficient()) |
There was a problem hiding this comment.
⚠️ Bug: JitDuration panics on out-of-range dynamic config coefficient
backoff.JitDuration panics if the coefficient is outside [0, 1]. PrefetchJitterCoefficient is a dynamic config property that can be changed at runtime to any float64 value. A misconfiguration (e.g. setting it to 1.5 or -0.1) will cause a panic in the prefetch goroutine, killing the shard's timer queue processing.
Since this is a per-shard goroutine operating in a hot loop, the blast radius is significant — every shard on the host would be affected if the dynamic config applies uniformly.
Suggested fix:
coeff := q.options.PrefetchJitterCoefficient()
if coeff < 0 {
coeff = 0
} else if coeff > 1 {
coeff = 1
}
return backoff.JitDuration(delay, coeff)
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
Code Review
|
| Auto-apply | Compact |
|
|
Was this helpful? React with 👍 / 👎 | Gitar
What changed?
Adds a cached layer over the timer (scheduled) queue reader that keeps a look-ahead window of upcoming tasks in memory, eliminating repeated DB reads on the hot path. Relates to #7953.
Replaces four stacked PRs (#7954 #7955 #7956 #7957) with a single consolidated draft for easier review iteration.
Components:
InMemQueue (
queue_mem.go) — sorted in-memory task store withPutTasks,GetTasks,LookAHead,LTrim,RTrimBySize,Clear. Deduplicates inserts by task key. RTrimBySize nils evicted interface slots for GC.CachedQueueReader (
queue_reader_cached.go) — wraps anyQueueReaderwith an in-memory look-ahead cache. Three rollout modes:disabled(default),shadow(cache runs, DB always wins, mismatches logged),enabled(cache serves reads). Key behaviours: prefetch loop, time-eviction loop,Injectfor new tasks,UpdateReadLevelfor ack-level eviction.injectAllowedAfteris computed once at construction (now + WarmupGracePeriod) — no mutex needed.CachedScheduledQueue (
queue_scheduled_cached.go) — thin wrapper wiringNotifyNewTask → Inject,updateQueueStateFn → UpdateReadLevel, and reader lifecycle into the existing scheduled queue.Config — nine new global dynamic config properties (no ShardID filter, apply uniformly per host). Feature flag:
TimerProcessorEnableCachedScheduledQueue(default: off).Simulation scenarios — two new scenarios (enabled + shadow mode) requiring #7952 for infra.
Why?
The timer queue currently issues a DB round-trip for every virtual slice read. For scheduled tasks most reads land within a predictable look-ahead window. A pre-fetched in-memory cache removes those round-trips in the common case.
How did you test it?
Unit test coverage for new code: 87.4%. Tests cover prefetch, eviction, shadow mismatch, inject races, lifecycle, RTrim guard (both shrink and raise cases).
Simulation:
Potential risks
TimerProcessorEnableCachedScheduledQueue(default: false) andTimerProcessorCachedQueueReaderMode(default:disabled). Zero code path changes when flags are off.TimerProcessorCacheMaxSize(default: 1000 tasks per shard).Release notes
N/A — internal change behind a feature flag.
Documentation Changes
N/A