@@ -27,6 +27,7 @@ using base::CycleClock;
2727namespace {
2828
2929thread_local uint64_t max_busy_squash_cycles_cached = 1ULL << 32 ;
30+ thread_local uint32_t log_squash_threshold_cached = 1ULL << 31 ;
3031
3132void CheckConnStateClean (const ConnectionState& state) {
3233 DCHECK_EQ (state.exec_info .state , ConnectionState::ExecInfo::EXEC_INACTIVE);
@@ -245,6 +246,11 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
245246 Transaction* tx = cntx_->transaction ;
246247 ServerState::tlocal ()->stats .squash_width_freq_arr [num_shards - 1 ]++;
247248 uint64_t start = CycleClock::Now ();
249+ atomic_uint64_t max_sched_cycles{0 }, max_exec_cycles{0 };
250+ base::SpinLock lock;
251+ uint64_t fiber_running_cycles{0 }, proactor_running_cycles{0 };
252+ uint32_t max_sched_thread_id, max_sched_seq_num{0 };
253+ vector<string> past_fibers;
248254
249255 // Atomic transactions (that have all keys locked) perform hops and run squashed commands via
250256 // stubs, non-atomic ones just run the commands in parallel.
@@ -258,19 +264,47 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
258264 fb2::BlockingCounter bc (num_shards);
259265 DVLOG (1 ) << " Squashing " << num_shards << " " << tx->DebugId ();
260266
261- auto cb = [this , bc, rb]() mutable {
267+ // Saves work in case logging is disable (i.e. log_squash_threshold_cached is high).
268+ const uint64_t min_threshold_cycles = CycleClock::FromUsec (log_squash_threshold_cached / 5 );
269+ auto cb = [&, bc, rb]() mutable {
270+ uint64_t sched_time = CycleClock::Now () - start;
271+
272+ // Update max_sched_cycles in lock-free fashion, to avoid contention
273+ uint64_t current = max_sched_cycles.load (memory_order_relaxed);
274+ while (sched_time > min_threshold_cycles && sched_time > current) {
275+ if (max_sched_cycles.compare_exchange_weak (current, sched_time, memory_order_relaxed,
276+ memory_order_relaxed)) {
277+ lock_guard<base::SpinLock> g (lock);
278+
279+ // If it is still the longest scheduling time
280+ if (max_sched_cycles.load (memory_order_relaxed) == sched_time) {
281+ // Store the stats from the callback with longest scheduling time.
282+ fiber_running_cycles = ThisFiber::GetRunningTimeCycles ();
283+ proactor_running_cycles = ProactorBase::me ()->GetCurrentBusyCycles ();
284+ max_sched_thread_id = ProactorBase::me ()->GetPoolIndex ();
285+ max_sched_seq_num = fb2::GetFiberRunSeq ();
286+ past_fibers = fb2::GetPastFiberNames ();
287+ }
288+ break ;
289+ }
290+ // current is updated to the current value of max_sched_cycles, so the loop will retry
291+ // with the new value if sched_time is still greater than it.
292+ }
293+
262294 if (ThisFiber::GetRunningTimeCycles () > max_busy_squash_cycles_cached) {
263295 ThisFiber::Yield ();
264296 stats_.yields ++;
265297 }
266298 this ->SquashedHopCb (EngineShard::tlocal (), rb->GetRespVersion ());
267299 bc->Dec ();
300+ uint64_t exec_time = CycleClock::Now () - start;
301+ current = max_exec_cycles.load (memory_order_relaxed);
302+ while (exec_time > current) {
303+ if (max_exec_cycles.compare_exchange_weak (current, exec_time, memory_order_relaxed,
304+ memory_order_relaxed))
305+ break ;
306+ }
268307 };
269- unsigned run_usec = CycleClock::ToUsec (ThisFiber::GetRunningTimeCycles ());
270- if (run_usec > 5'000 ) {
271- LOG_EVERY_T (WARNING, 1 ) << " Fiber run " << run_usec << " usec, squashed " << cmds_.size ()
272- << " commands" ;
273- }
274308 for (unsigned i = 0 ; i < sharded_.size (); ++i) {
275309 if (!sharded_[i].dispatched .empty ())
276310 shard_set->AddL2 (i, cb);
@@ -297,14 +331,31 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
297331 if (aborted)
298332 break ;
299333 }
334+
300335 uint64_t after_reply = CycleClock::Now ();
301336 uint64_t total_usec = CycleClock::ToUsec (after_reply - start);
302-
303337 stats_.hop_usec += total_usec;
304338 stats_.reply_usec += CycleClock::ToUsec (after_reply - after_hop);
305339 stats_.hops ++;
306340 stats_.squashed_commands += order_.size ();
307341
342+ if (total_usec > log_squash_threshold_cached) {
343+ uint64_t max_sched_usec = CycleClock::ToUsec (max_sched_cycles.load ());
344+ uint64_t fiber_running_usec = CycleClock::ToUsec (fiber_running_cycles);
345+ uint64_t proactor_running_usec = CycleClock::ToUsec (proactor_running_cycles);
346+ uint64_t max_exec_usec = CycleClock::ToUsec (max_exec_cycles.load ());
347+
348+ LOG_EVERY_T (INFO, 0.1 )
349+ << " Squashed " << order_.size () << " commands. "
350+ << " Total/Fanout/MaxSchedTime/ThreadCbTime/ThreadId/FiberCbTime/FiberSeq/MaxExecTime: "
351+ << total_usec << " /" << num_shards_ << " /" << max_sched_usec << " /" << proactor_running_usec
352+ << " /" << max_sched_thread_id << " /" << fiber_running_usec << " /"
353+ << " /" << max_sched_seq_num << " /" << max_exec_usec
354+ << " \n past fibers: " << absl::StrJoin (past_fibers, " , " )
355+ << " \n coordinator thread running time: "
356+ << CycleClock::ToUsec (ProactorBase::me ()->GetCurrentBusyCycles ());
357+ }
358+
308359 tl_facade_stats->reply_stats .squashing_current_reply_size .fetch_sub (total_reply_size,
309360 std::memory_order_release);
310361 for (auto & sinfo : sharded_) {
@@ -364,4 +415,8 @@ void MultiCommandSquasher::SetMaxBusySquashUsec(uint32_t usec) {
364415 max_busy_squash_cycles_cached = CycleClock::FromUsec (usec);
365416}
366417
418+ void MultiCommandSquasher::SetLogSquashThreshold (uint32_t usec) {
419+ log_squash_threshold_cached = usec;
420+ }
421+
367422} // namespace dfly
0 commit comments