Skip to content

Commit a0b2557

Browse files
committed
Fix time jump issues that were re-introduced while fixing issue 271
Also fix time jump issues with sync_timed_queue::push_for()
1 parent e5eef80 commit a0b2557

1 file changed

Lines changed: 72 additions & 4 deletions

File tree

include/boost/thread/concurrent_queues/sync_timed_queue.hpp

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
#include <boost/chrono/system_clocks.hpp>
1717
#include <boost/chrono/chrono_io.hpp>
1818

19+
#include <algorithm> // std::min
20+
1921
#include <boost/config/abi_prefix.hpp>
2022

2123
namespace boost
@@ -59,6 +61,45 @@ namespace detail
5961
}
6062
}; //end struct
6163

64+
template <class Duration>
65+
chrono::time_point<chrono::steady_clock,Duration>
66+
limit_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)
67+
{
68+
// Clock == chrono::steady_clock
69+
return tp;
70+
}
71+
72+
template <class Clock, class Duration>
73+
chrono::time_point<Clock,Duration>
74+
limit_timepoint(chrono::time_point<Clock,Duration> const& tp)
75+
{
76+
// Clock != chrono::steady_clock
77+
// The system time may jump while wait_until() is waiting. To compensate for this and time out near
78+
// the correct time, we limit how long wait_until() can wait before going around the loop again.
79+
const chrono::time_point<Clock,Duration> tpmax(chrono::time_point_cast<Duration>(Clock::now() + chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS)));
80+
return (std::min)(tp, tpmax);
81+
}
82+
83+
template <class Duration>
84+
chrono::steady_clock::time_point
85+
convert_to_steady_clock_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)
86+
{
87+
// Clock == chrono::steady_clock
88+
return chrono::time_point_cast<chrono::steady_clock::duration>(tp);
89+
}
90+
91+
template <class Clock, class Duration>
92+
chrono::steady_clock::time_point
93+
convert_to_steady_clock_timepoint(chrono::time_point<Clock,Duration> const& tp)
94+
{
95+
// Clock != chrono::steady_clock
96+
// The system time may jump while wait_until() is waiting. To compensate for this and time out near
97+
// the correct time, we limit how long wait_until() can wait before going around the loop again.
98+
const chrono::steady_clock::duration dura(chrono::duration_cast<chrono::steady_clock::duration>(tp - Clock::now()));
99+
const chrono::steady_clock::duration duramax(chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS));
100+
return chrono::steady_clock::now() + (std::min)(dura, duramax);
101+
}
102+
62103
} //end detail namespace
63104

64105
template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
@@ -123,6 +164,8 @@ namespace detail
123164

124165
bool wait_to_pull(unique_lock<mutex>&);
125166
queue_op_status wait_to_pull_until(unique_lock<mutex>&, TimePoint const& tp);
167+
template <class Rep, class Period>
168+
queue_op_status wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura);
126169

127170
T pull(unique_lock<mutex>&);
128171
T pull(lock_guard<mutex>&);
@@ -227,8 +270,8 @@ namespace detail
227270
if (not_empty_and_time_reached(lk)) return false; // success
228271
if (super::closed(lk)) return true; // closed
229272

230-
const time_point tp(super::data_.top().time);
231-
super::cond_.wait_until(lk, tp);
273+
const time_point tpmin(detail::limit_timepoint(super::data_.top().time));
274+
super::cond_.wait_until(lk, tpmin);
232275
}
233276
}
234277

@@ -247,7 +290,29 @@ namespace detail
247290
if (super::closed(lk)) return queue_op_status::closed;
248291
if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
249292

250-
const time_point tpmin(tp < super::data_.top().time ? tp : super::data_.top().time);
293+
const time_point tpmin((std::min)(tp, detail::limit_timepoint(super::data_.top().time)));
294+
super::cond_.wait_until(lk, tpmin);
295+
}
296+
}
297+
298+
template <class T, class Clock, class TimePoint>
299+
template <class Rep, class Period>
300+
queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura)
301+
{
302+
const chrono::steady_clock::time_point tp(chrono::steady_clock::now() + chrono::duration_cast<chrono::steady_clock::duration>(dura));
303+
for (;;)
304+
{
305+
if (not_empty_and_time_reached(lk)) return queue_op_status::success;
306+
if (super::closed(lk)) return queue_op_status::closed;
307+
if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
308+
309+
super::wait_until_not_empty_or_closed_until(lk, tp);
310+
311+
if (not_empty_and_time_reached(lk)) return queue_op_status::success;
312+
if (super::closed(lk)) return queue_op_status::closed;
313+
if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
314+
315+
const chrono::steady_clock::time_point tpmin((std::min)(tp, detail::convert_to_steady_clock_timepoint(super::data_.top().time)));
251316
super::cond_.wait_until(lk, tpmin);
252317
}
253318
}
@@ -329,7 +394,10 @@ namespace detail
329394
queue_op_status
330395
sync_timed_queue<T, Clock, TimePoint>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem)
331396
{
332-
return pull_until(clock::now() + dura, elem);
397+
unique_lock<mutex> lk(super::mtx_);
398+
const queue_op_status rc = wait_to_pull_for(lk, dura);
399+
if (rc == queue_op_status::success) pull(lk, elem);
400+
return rc;
333401
}
334402

335403
///////////////////////////

0 commit comments

Comments
 (0)