Skip to content

feat(queuev2): cached timer queue reader with prefetch and time-based eviction#7962

Draft
arzonus wants to merge 29 commits intocadence-workflow:masterfrom
arzonus:implement-cached-queue-reader-and-scheduled-queue
Draft

feat(queuev2): cached timer queue reader with prefetch and time-based eviction#7962
arzonus wants to merge 29 commits intocadence-workflow:masterfrom
arzonus:implement-cached-queue-reader-and-scheduled-queue

Conversation

@arzonus
Copy link
Copy Markdown
Contributor

@arzonus arzonus commented Apr 16, 2026

What changed?

Adds a cached layer over the timer (scheduled) queue reader that keeps a look-ahead window of upcoming tasks in memory, eliminating repeated DB reads on the hot path. Relates to #7953.

Replaces four stacked PRs (#7954 #7955 #7956 #7957) with a single consolidated draft for easier review iteration.

Components:

InMemQueue (queue_mem.go) — sorted in-memory task store with PutTasks, GetTasks, LookAHead, LTrim, RTrimBySize, Clear. Deduplicates inserts by task key. RTrimBySize nils evicted interface slots for GC.

CachedQueueReader (queue_reader_cached.go) — wraps any QueueReader with an in-memory look-ahead cache. Three rollout modes: disabled (default), shadow (cache runs, DB always wins, mismatches logged), enabled (cache serves reads). Key behaviours: prefetch loop, time-eviction loop, Inject for new tasks, UpdateReadLevel for ack-level eviction. injectAllowedAfter is computed once at construction (now + WarmupGracePeriod) — no mutex needed.

CachedScheduledQueue (queue_scheduled_cached.go) — thin wrapper wiring NotifyNewTask → Inject, updateQueueStateFn → UpdateReadLevel, and reader lifecycle into the existing scheduled queue.

Config — nine new global dynamic config properties (no ShardID filter, apply uniformly per host). Feature flag: TimerProcessorEnableCachedScheduledQueue (default: off).

Simulation scenarios — two new scenarios (enabled + shadow mode) requiring #7952 for infra.

Why?

The timer queue currently issues a DB round-trip for every virtual slice read. For scheduled tasks most reads land within a predictable look-ahead window. A pre-fetched in-memory cache removes those round-trips in the common case.

How did you test it?

go test -race -count=100 -timeout=600s ./service/history/queuev2/...
make pr GEN_DIR=service/history/queuev2
make build

Unit test coverage for new code: 87.4%. Tests cover prefetch, eviction, shadow mismatch, inject races, lifecycle, RTrim guard (both shrink and raise cases).

Simulation:

./simulation/history/run.sh --scenario queuev2_scheduled_cache_shadow --dockerfile-suffix .local
./simulation/history/run.sh --scenario queuev2_scheduled_cache_enabled --dockerfile-suffix .local

Potential risks

  • Feature-flagged behind TimerProcessorEnableCachedScheduledQueue (default: false) and TimerProcessorCachedQueueReaderMode (default: disabled). Zero code path changes when flags are off.
  • When enabled: two background goroutines per shard (prefetch + time-eviction). Memory bounded by TimerProcessorCacheMaxSize (default: 1000 tasks per shard).

Release notes

N/A — internal change behind a feature flag.

Documentation Changes

N/A

Comment thread service/history/queuev2/queue_reader_cached.go
@arzonus arzonus force-pushed the implement-cached-queue-reader-and-scheduled-queue branch from 180eedb to 206586b Compare April 16, 2026 07:29
Comment thread service/history/queuev2/queue_reader_cached.go Outdated
Comment thread service/history/queuev2/queue_mem.go
@arzonus arzonus force-pushed the implement-cached-queue-reader-and-scheduled-queue branch from 206586b to 308bee5 Compare April 16, 2026 08:20
Comment thread service/history/queuev2/queue_reader_cached.go Outdated
Comment thread service/history/queuev2/queue_reader_cached.go Outdated
Comment thread service/history/queuev2/queue_mem.go
@arzonus arzonus force-pushed the implement-cached-queue-reader-and-scheduled-queue branch from 308bee5 to 4bee2d2 Compare April 16, 2026 10:34
Comment thread service/history/queuev2/queue_reader_cached.go Outdated
Comment thread service/history/queuev2/queue_reader_cached.go Outdated
Comment thread service/history/queuev2/queue_reader_cached.go
arzonus added 10 commits May 1, 2026 14:51
1. extraInCache moved to mismatchTags (logged on mismatch only)
2. All mismatch log fields prefixed shadowMismatch.*
3. HasMismatches restored to include ExtraInCache — still a mismatch
   even if benign, for consistent observability
4. ack-level-update → read-level-update in updateInclusiveLowerBound
5. LookAHead now delegates to base during warmup, matching GetTask

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
Collapse the partial/full-page if/else into a single target computation:
  target := exclusiveMaxKey (partial page)
  target = NextTaskKey     (full page)

One updateExclusiveUpperBound call, one reason tag 'prefetch-advance'.
Drop the verbose comment block — the early return on trim is self-explanatory.

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…efetch

putTasks now returns true when RTrimBySize fired and updated the upper
bound. prefetch uses this directly instead of comparing exclusiveUpperBound
before and after the call, removing the prevUpper variable and the
verbose !Equal guard.

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
… reset

updateExclusiveUpperBound: now logs inclusiveLowerBound + cacheSize
for context, and records the size histogram.

updateInclusiveLowerBound: now a pure setter with log + metrics.
No advance guard, no LTrim — safe to call for resets too.

advanceInclusiveLowerBound: new helper wrapping the advance-only
logic (upper-bound cap, advance guard, LTrim) then calls
updateInclusiveLowerBound.

Gap detection: uses updateInclusiveLowerBound directly after Clear()
since going backwards is intentional and no LTrim is needed.

timeEvict + UpdateReadLevel: fixed to call advanceInclusiveLowerBound
(were calling updateInclusiveLowerBound after the rename, which skipped
the LTrim — tasks below the eviction point were never removed).

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…fy Inject guard

Gap detection now calls updateInclusiveLowerBound (the simple setter)
instead of assigning inclusiveLowerBound directly. No LTrim needed since
the queue is already cleared, and the setter accepts backwards resets.

Inject: combine isDisabled and isInWarmup into a single guard condition.
Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
ExtraInCache (tasks in cache not in DB) is benign given task independence.
It now logs at Info without incrementing the mismatch counter, so it is
still observable without polluting the Warn metric used for alerting.

MissingFromCache and NextKeyMismatch are real divergences: they log at Warn
and increment the counter as before.

shadowMismatch.dbTaskCount and shadowMismatch.cacheTaskCount moved into the
mismatch tag set so they are only emitted when there is something to act on.
The match path logs a bare Debug with no extra fields.

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
Adds a readLevelSyncLoop goroutine to cachedScheduledQueue that ticks
every TimerProcessorCacheReadLevelSyncInterval (default 1s) and calls
reader.UpdateReadLevel(virtualQueueManager.GetMinReadLevel()).

This keeps the cache lower bound within ~1s of actual processing
progress, compared to the previous ~30s lag from updateQueueStateFn
which is gated on DB writes.

New config: TimerProcessorCacheReadLevelSyncInterval (global, default 1s).

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…vent startup false coverage

At shard takeover, the cached reader starts with inclusiveLowerBound =
MinimumHistoryTaskKey and exclusiveUpperBound = MinimumHistoryTaskKey.
After the first prefetch, exclusiveUpperBound jumps to now + MaxLookAheadWindow
but inclusiveLowerBound stays at MinimumHistoryTaskKey until timeEvict
catches up (~60s).

During this window, isRangeCovered returns true for ALL historical tasks
(from the beginning of time to the cache ceiling). Tasks from the previous
shard owner exist in DB but not in the fresh cache. GetTask returns 0,
the virtual slice pops its progress entry, and those tasks are permanently
skipped at shard takeover.

Fix: after the first prefetch, advance inclusiveLowerBound to the fetch
anchor (now - EvictionSafeWindow). Tasks before that anchor now correctly
miss the cache and fall through to the DB.

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…adow comparison

Add two debug logs in Inject — one when a task is accepted into the
cache window and one when it is skipped — each carrying the task's
visibilityTimestamp so misses are traceable in logs.

Remove the second cache re-read (liveResp) from getTaskInShadow.
The re-read was filtering tasks as "benign inject races", but the
scheduledTaskMaxReadLevel invariant guarantees every newly-committed
task has visibilityTimestamp > readLevel, so no in-flight task can
appear in the GetTask range at commit time. The filter was hiding
real cache misses; dropping it makes the shadow comparison strict.

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
@arzonus arzonus force-pushed the implement-cached-queue-reader-and-scheduled-queue branch from 91ce398 to fc34e25 Compare May 1, 2026 13:00
@arzonus arzonus marked this pull request as draft May 1, 2026 13:00
arzonus added 15 commits May 1, 2026 15:05
Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
taskID=0 is reserved for range-boundary keys (e.g. NewHistoryTaskKey(ts, 0)
used for exclusiveUpperBound/inclusiveLowerBound). Real tasks always get
non-zero IDs from generateTaskIDLocked (RangeID << RangeSizeBits). Inserting
sentinels corrupts queue ordering; filter them at putTasks, the single
insertion point for both Inject and prefetch paths.

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…guous fetched data

Previously gap detection called queue.Clear() and reset both bounds to
MinimumHistoryTaskKey. This wiped previously-injected tasks and forced the
re-prefetch to start fresh, creating a Cassandra read-after-write race window
where tasks committed just before the gap could be missed.

Now gap detection only discards the in-flight fetched batch (which is
non-contiguous with the current window). Existing cache contents and the
updated exclusiveUpperBound set by the concurrent Inject/RTrimBySize are
preserved. The next prefetch starts from the current exclusiveUpperBound and
fills the gap correctly.

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…inject race

During a prefetch DB call, exclusiveUpperBound has not yet advanced to
prefetchTargetUpper, so isTaskCovered returns false for tasks in
[exclusiveUpperBound, prefetchTargetUpper). If such a task is also committed
to Cassandra after the DB snapshot, it ends up in neither the DB results nor
the cache, while the cache later claims coverage of the range — a permanent
miss in enabled mode.

Fix: record prefetchTargetUpper before the DB call under mu.Lock. In Inject,
tasks in [exclusiveUpperBound, prefetchTargetUpper) are placed in a staging
buffer instead of being skipped. After the DB call (success, error, or gap
detection), drainPendingInjectBuffer inserts any buffered tasks that are now
within the extended window. putTask deduplicates by key so tasks returned by
both the DB and Inject are inserted only once.

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…date logs

timeEvictionLoop starts immediately on Start() with no warmup guard, so
timeEvict fires every TimeEvictionInterval even during the warmup grace
period. During warmup the in-memory queue is empty (inject is a no-op and
the prefetch loop hasn't fired yet), so LTrim has nothing to remove. The
only observable effect is spurious 'lower bound is updated' log lines —
especially noisy if log levels are raised to Info.

Add an isInWarmup() guard at the top of timeEvict() to return early when
still in warmup. This mirrors the existing guards in Inject, GetTask, and
LookAHead. UpdateReadLevel (called by readLevelSyncLoop) is intentionally
not guarded: it tracks real processing progress and should update lb even
during warmup.

Update tests: existing TimeEvict table tests now set injectAllowedAfter to
the past to bypass warmup; TimeEvictLoop lifecycle test does the same while
keeping the 10-minute WarmupGracePeriod to suppress the prefetch timer.
Add TestCachedQueueReader_TimeEvict_DuringWarmup to pin the new behaviour.

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…ject

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
taskID=0 is reserved for range-boundary sentinel keys (e.g. exclusiveUpperBound).
Real tasks always receive a non-zero ID from generateTaskIDLocked. putTasks()
already filtered them as a safety net, but the correct rejection point is the
Inject boundary so sentinels cannot corrupt pendingInjectBuffer ordering or
produce spurious log lines.

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…nject loop

Replace the inline if/else chain in Inject with two focused boolean helpers:
- shouldInjectTask: returns true when a task falls in [lb, ub) and is not a sentinel
- shouldBufferTask: returns true when a task falls in the pending-prefetch window
  [ub, prefetchTargetUpper) and is not a sentinel

Also removes the unused injectDecision enum and classifyInjectTask function
that were partially written in a previous (interrupted) session.

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…nject loop

Replace the inline if/else chain in Inject with two focused boolean helpers:
- shouldInjectTask: returns true when a task falls in [lb, ub) and is not a sentinel
- shouldBufferTask: returns true when a task falls in the pending-prefetch window
  [ub, prefetchTargetUpper) and is not a sentinel; called only when shouldInjectTask
  returned false

Also removes the unused injectDecision enum and classifyInjectTask function
that were left over from a previous interrupted session.

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
Add uniform random jitter [0, PrefetchJitter) to the computed prefetch delay
in nextPrefetchDelay(). When many shards start simultaneously (e.g. after a
node restart), their prefetch loops would otherwise fire at near-identical
intervals and create a thundering-herd on Cassandra. Jitter breaks this
synchronisation without changing the minimum-interval floor.

Implementation:
- New PrefetchJitter DurationPropertyFn in cachedQueueReaderOptions and the
  history.timerProcessorCachePrefetchJitter dynamic config key (default 0,
  which disables jitter for backwards compat)
- rand.Rand seeded at construction; accessed only from the single-goroutine
  prefetchLoop so no mutex is needed
- defaultTestOptions sets PrefetchJitter=0 so existing exact-delay assertions
  are unaffected; new TestCachedQueueReader_NextPrefetchDelay_Jitter verifies
  that 20 consecutive calls produce varied delays within [0, maxJitter)

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
Swap the custom rand.Rand field and additive-duration jitter for the
codebase-standard backoff.JitDuration(delay, coefficient) utility.

Changes:
- PrefetchJitter DurationPropertyFn → PrefetchJitterCoefficient FloatPropertyFn
- Remove rand *rand.Rand struct field and math/rand import
- nextPrefetchDelay: apply JitDuration after the MinPrefetchInterval floor
- New dynamic config key history.timerProcessorCachePrefetchJitterCoefficient
  (FloatKey, default 0, backwards-compatible)

Matches the pattern used by queue_reader.go:130 and queue_immediate.go:167.

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
if min := q.options.MinPrefetchInterval(); delay < min {
delay = min
}
return backoff.JitDuration(delay, q.options.PrefetchJitterCoefficient())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: JitDuration panics on out-of-range dynamic config coefficient

backoff.JitDuration panics if the coefficient is outside [0, 1]. PrefetchJitterCoefficient is a dynamic config property that can be changed at runtime to any float64 value. A misconfiguration (e.g. setting it to 1.5 or -0.1) will cause a panic in the prefetch goroutine, killing the shard's timer queue processing.

Since this is a per-shard goroutine operating in a hot loop, the blast radius is significant — every shard on the host would be affected if the dynamic config applies uniformly.

Suggested fix:

coeff := q.options.PrefetchJitterCoefficient()
if coeff < 0 {
    coeff = 0
} else if coeff > 1 {
    coeff = 1
}
return backoff.JitDuration(delay, coeff)

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

@gitar-bot
Copy link
Copy Markdown

gitar-bot Bot commented May 8, 2026

Code Review ⚠️ Changes requested 10 resolved / 11 findings

Implements a cached layer over the timer queue to eliminate repeated DB reads, but requires a fix for potential panics in JitDuration due to out-of-range dynamic configuration coefficients. Several issues concerning cache integrity, state consistency, and efficiency were resolved during this session.

⚠️ Bug: JitDuration panics on out-of-range dynamic config coefficient

📄 service/history/queuev2/queue_reader_cached.go:273

backoff.JitDuration panics if the coefficient is outside [0, 1]. PrefetchJitterCoefficient is a dynamic config property that can be changed at runtime to any float64 value. A misconfiguration (e.g. setting it to 1.5 or -0.1) will cause a panic in the prefetch goroutine, killing the shard's timer queue processing.

Since this is a per-shard goroutine operating in a hot loop, the blast radius is significant — every shard on the host would be affected if the dynamic config applies uniformly.

Suggested fix
coeff := q.options.PrefetchJitterCoefficient()
if coeff < 0 {
    coeff = 0
} else if coeff > 1 {
    coeff = 1
}
return backoff.JitDuration(delay, coeff)
✅ 10 resolved
Edge Case: putTasks doesn't reset exclusiveUpperBound when RTrim empties cache

📄 service/history/queuev2/queue_reader_cached.go:426-435 📄 service/history/queuev2/queue_mem.go:191-195
When RTrimBySize(maxSize) is called with maxSize <= 0 (e.g. a dynamic config misconfiguration), it empties the queue and returns (MinimumHistoryTaskKey, true). In putTasks, the guard trimmed && newUpper.Greater(persistence.MinimumHistoryTaskKey) evaluates to true && false = false, so exclusiveUpperBound is not reset. The cache then thinks it covers [inclusiveLowerBound, oldExclusiveUpperBound) but holds zero tasks, causing GetTask to return empty results for ranges that may contain tasks in the DB.

This only triggers with MaxSize <= 0, which is unlikely with the default of 1000, but a defensive check would prevent silent data loss on misconfiguration.

Quality: Mode string comparison has no validation for unknown values

📄 service/history/queuev2/queue_reader_cached.go:271-272
isDisabled() checks Mode() == "disabled" and isShadow() checks Mode() == "shadow". Any typo or unknown mode value (e.g. "enabeld") silently falls through to the enabled code path, which serves reads from cache. This could cause unintended behavior if someone misconfigures the dynamic config string.

Consider adding a log warning or defaulting to disabled for unknown values, particularly in GetTask and LookAHead where the mode drives correctness.

Edge Case: Shadow mode false mismatches from cache eviction between reads

📄 service/history/queuev2/queue_reader_cached.go:604-618
In getTaskInShadow, the initial cache snapshot is taken under RLock (line 588-589), then the lock is released before querying the DB. If a mismatch is found, the cache is re-read (line 641-647) to filter benign races. However, between the two cache reads, timeEvict or UpdateReadLevel can advance inclusiveLowerBound and LTrim the cache, removing tasks that were in the original snapshot. A DB task that was in the first snapshot but got evicted before the second read would appear as a realMissingKey, creating a false mismatch alarm.

Since shadow mode is observational-only (DB result is always returned), this doesn't affect correctness — only alert noise. But spurious mismatch counters could erode confidence in the cache during rollout.

Quality: InMemQueue.PutTasks is O(n*m) for unsorted bulk inserts

📄 service/history/queuev2/queue_mem.go:98-102
PutTasks calls putTask in a loop, and each putTask for an out-of-order insert does a binary search + copy shift. When inserting m tasks into a queue of n elements, and the tasks aren't pre-sorted, this is O(m * (log n + n)) in the worst case. For prefetch pages (up to 100 tasks by default into a 1000-element queue), this is acceptable, but if page sizes or cache sizes grow it could become a bottleneck.

The fast-path append optimization handles the common case of sorted inserts well. This is just a note for future scaling — a batch sort-merge would be O((n+m) log(n+m)).

Edge Case: Non-atomic dynamic config Mode() reads can cause inconsistency

📄 service/history/queuev2/queue_reader_cached.go:567 📄 service/history/queuev2/queue_reader_cached.go:594
In GetTask (line 567), isDisabled() is checked first, then isShadow() is checked at line 594. Both call q.options.Mode() independently. Since this is a dynamic config property that can change between calls, a mode transition (e.g., shadow→enabled) between the two reads could cause the code to serve a cache result directly when the operator intended shadow mode, or vice versa. The window is small and the impact is transient (one request), but it's worth noting.

Similarly, prefetch() checks isDisabled() at line 303, and Inject() checks it at line 529 — both independently reading the mode.

...and 5 more resolved from earlier reviews

🤖 Prompt for agents
Code Review: Implements a cached layer over the timer queue to eliminate repeated DB reads, but requires a fix for potential panics in `JitDuration` due to out-of-range dynamic configuration coefficients. Several issues concerning cache integrity, state consistency, and efficiency were resolved during this session.

1. ⚠️ Bug: JitDuration panics on out-of-range dynamic config coefficient
   Files: service/history/queuev2/queue_reader_cached.go:273

   backoff.JitDuration panics if the coefficient is outside [0, 1]. PrefetchJitterCoefficient is a dynamic config property that can be changed at runtime to any float64 value. A misconfiguration (e.g. setting it to 1.5 or -0.1) will cause a panic in the prefetch goroutine, killing the shard's timer queue processing.
   
   Since this is a per-shard goroutine operating in a hot loop, the blast radius is significant — every shard on the host would be affected if the dynamic config applies uniformly.

   Suggested fix:
   coeff := q.options.PrefetchJitterCoefficient()
   if coeff < 0 {
       coeff = 0
   } else if coeff > 1 {
       coeff = 1
   }
   return backoff.JitDuration(delay, coeff)

Rules ✅ All requirements met

Repository Rules

GitHub Issue Linking Requirement: The PR description includes a reference to #7953, which satisfies the issue linking requirement for cadence-workflow.
PR Description Quality Standards: The PR description is comprehensive, includes all required sections with substantive content, follows the template guidance, and clearly explains the 'why' behind the technical implementation.

1 rule not applicable. Show all rules by commenting gitar display:verbose.

Options

Auto-apply is off → Gitar will not commit updates to this branch.
Display: compact → Showing less information.

Comment with these commands to change:

Auto-apply Compact
gitar auto-apply:on         
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant