Skip to content

Commit a645ef7

Browse files
authored
Merge pull request #277 from austin-beer/fix_sync_queue_time_jump_issues
Fix sync_timed_queue time jump issues
2 parents 9a20deb + c6863c4 commit a645ef7

3 files changed

Lines changed: 241 additions & 33 deletions

File tree

include/boost/thread/concurrent_queues/sync_timed_queue.hpp

Lines changed: 79 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
#include <boost/chrono/system_clocks.hpp>
1717
#include <boost/chrono/chrono_io.hpp>
1818

19+
#include <algorithm> // std::min
20+
1921
#include <boost/config/abi_prefix.hpp>
2022

2123
namespace boost
@@ -59,6 +61,45 @@ namespace detail
5961
}
6062
}; //end struct
6163

64+
template <class Duration>
65+
chrono::time_point<chrono::steady_clock,Duration>
66+
limit_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)
67+
{
68+
// Clock == chrono::steady_clock
69+
return tp;
70+
}
71+
72+
template <class Clock, class Duration>
73+
chrono::time_point<Clock,Duration>
74+
limit_timepoint(chrono::time_point<Clock,Duration> const& tp)
75+
{
76+
// Clock != chrono::steady_clock
77+
// The system time may jump while wait_until() is waiting. To compensate for this and time out near
78+
// the correct time, we limit how long wait_until() can wait before going around the loop again.
79+
const chrono::time_point<Clock,Duration> tpmax(chrono::time_point_cast<Duration>(Clock::now() + chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS)));
80+
return (std::min)(tp, tpmax);
81+
}
82+
83+
template <class Duration>
84+
chrono::steady_clock::time_point
85+
convert_to_steady_clock_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)
86+
{
87+
// Clock == chrono::steady_clock
88+
return chrono::time_point_cast<chrono::steady_clock::duration>(tp);
89+
}
90+
91+
template <class Clock, class Duration>
92+
chrono::steady_clock::time_point
93+
convert_to_steady_clock_timepoint(chrono::time_point<Clock,Duration> const& tp)
94+
{
95+
// Clock != chrono::steady_clock
96+
// The system time may jump while wait_until() is waiting. To compensate for this and time out near
97+
// the correct time, we limit how long wait_until() can wait before going around the loop again.
98+
const chrono::steady_clock::duration dura(chrono::duration_cast<chrono::steady_clock::duration>(tp - Clock::now()));
99+
const chrono::steady_clock::duration duramax(chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS));
100+
return chrono::steady_clock::now() + (std::min)(dura, duramax);
101+
}
102+
62103
} //end detail namespace
63104

64105
template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
@@ -88,8 +129,8 @@ namespace detail
88129
T pull();
89130
void pull(T& elem);
90131

91-
template <class WClock, class Duration>
92-
queue_op_status pull_until(chrono::time_point<WClock,Duration> const& tp, T& elem);
132+
template <class Duration>
133+
queue_op_status pull_until(chrono::time_point<clock,Duration> const& tp, T& elem);
93134
template <class Rep, class Period>
94135
queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem);
95136

@@ -122,8 +163,9 @@ namespace detail
122163
inline bool not_empty_and_time_reached(lock_guard<mutex>& lk) const;
123164

124165
bool wait_to_pull(unique_lock<mutex>&);
125-
template <class WClock, class Duration>
126-
queue_op_status wait_to_pull_until(unique_lock<mutex>&, chrono::time_point<WClock, Duration> const& tp);
166+
queue_op_status wait_to_pull_until(unique_lock<mutex>&, TimePoint const& tp);
167+
template <class Rep, class Period>
168+
queue_op_status wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura);
127169

128170
T pull(unique_lock<mutex>&);
129171
T pull(lock_guard<mutex>&);
@@ -228,14 +270,13 @@ namespace detail
228270
if (not_empty_and_time_reached(lk)) return false; // success
229271
if (super::closed(lk)) return true; // closed
230272

231-
const time_point tp(super::data_.top().time);
232-
super::cond_.wait_until(lk, tp);
273+
const time_point tpmin(detail::limit_timepoint(super::data_.top().time));
274+
super::cond_.wait_until(lk, tpmin);
233275
}
234276
}
235277

236278
template <class T, class Clock, class TimePoint>
237-
template <class WClock, class Duration>
238-
queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, chrono::time_point<WClock, Duration> const& tp)
279+
queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, TimePoint const& tp)
239280
{
240281
for (;;)
241282
{
@@ -249,7 +290,29 @@ namespace detail
249290
if (super::closed(lk)) return queue_op_status::closed;
250291
if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
251292

252-
const time_point tpmin(tp < super::data_.top().time ? tp : super::data_.top().time);
293+
const time_point tpmin((std::min)(tp, detail::limit_timepoint(super::data_.top().time)));
294+
super::cond_.wait_until(lk, tpmin);
295+
}
296+
}
297+
298+
template <class T, class Clock, class TimePoint>
299+
template <class Rep, class Period>
300+
queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura)
301+
{
302+
const chrono::steady_clock::time_point tp(chrono::steady_clock::now() + chrono::duration_cast<chrono::steady_clock::duration>(dura));
303+
for (;;)
304+
{
305+
if (not_empty_and_time_reached(lk)) return queue_op_status::success;
306+
if (super::closed(lk)) return queue_op_status::closed;
307+
if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
308+
309+
super::wait_until_not_empty_or_closed_until(lk, tp);
310+
311+
if (not_empty_and_time_reached(lk)) return queue_op_status::success;
312+
if (super::closed(lk)) return queue_op_status::closed;
313+
if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
314+
315+
const chrono::steady_clock::time_point tpmin((std::min)(tp, detail::convert_to_steady_clock_timepoint(super::data_.top().time)));
253316
super::cond_.wait_until(lk, tpmin);
254317
}
255318
}
@@ -315,12 +378,12 @@ namespace detail
315378

316379
//////////////////////
317380
template <class T, class Clock, class TimePoint>
318-
template <class WClock, class Duration>
381+
template <class Duration>
319382
queue_op_status
320-
sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<WClock, Duration> const& tp, T& elem)
383+
sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<clock,Duration> const& tp, T& elem)
321384
{
322385
unique_lock<mutex> lk(super::mtx_);
323-
const queue_op_status rc = wait_to_pull_until(lk, tp);
386+
const queue_op_status rc = wait_to_pull_until(lk, chrono::time_point_cast<typename time_point::duration>(tp));
324387
if (rc == queue_op_status::success) pull(lk, elem);
325388
return rc;
326389
}
@@ -331,7 +394,10 @@ namespace detail
331394
queue_op_status
332395
sync_timed_queue<T, Clock, TimePoint>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem)
333396
{
334-
return pull_until(chrono::steady_clock::now() + dura, elem);
397+
unique_lock<mutex> lk(super::mtx_);
398+
const queue_op_status rc = wait_to_pull_for(lk, dura);
399+
if (rc == queue_op_status::success) pull(lk, elem);
400+
return rc;
335401
}
336402

337403
///////////////////////////

test/sync/mutual_exclusion/sync_pq/tq_multi_thread_pass.cpp

Lines changed: 68 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,31 +24,58 @@ using namespace boost::chrono;
2424

2525
typedef boost::concurrent::sync_timed_queue<int> sync_tq;
2626

27-
const int count = 5;
27+
const int cnt = 5;
2828

29-
void call_push(sync_tq* q)
29+
void call_push(sync_tq* q, const steady_clock::time_point start)
3030
{
3131
// push elements onto the queue every 500 milliseconds but with a decreasing delay each time
32-
for (int i = 0; i < count; ++i)
32+
for (int i = 0; i < cnt; ++i)
3333
{
34-
q->push(i, sync_tq::clock::now() + seconds(count - i));
35-
boost::this_thread::sleep_for(milliseconds(500));
34+
boost::this_thread::sleep_until(start + milliseconds(i * 500));
35+
const steady_clock::time_point expected = start + milliseconds(i * 500) + seconds(cnt - i);
36+
q->push(i, expected);
3637
}
3738
}
3839

39-
void call_pull(sync_tq* q)
40+
void call_pull(sync_tq* q, const steady_clock::time_point start)
4041
{
4142
// pull elements off of the queue (earliest element first)
42-
steady_clock::time_point start = steady_clock::now();
43-
for (int i = count - 1; i >= 0; --i)
43+
for (int i = cnt - 1; i >= 0; --i)
4444
{
4545
int j;
4646
q->pull(j);
4747
BOOST_TEST_EQ(i, j);
48-
milliseconds elapsed = duration_cast<milliseconds>(steady_clock::now() - start);
49-
milliseconds expected = milliseconds(i * 500) + seconds(count - i);
50-
BOOST_TEST_GE(elapsed, expected - milliseconds(BOOST_THREAD_TEST_TIME_MS));
51-
BOOST_TEST_LE(elapsed, expected + milliseconds(BOOST_THREAD_TEST_TIME_MS));
48+
const steady_clock::time_point expected = start + milliseconds(i * 500) + seconds(cnt - i);
49+
BOOST_TEST_GE(steady_clock::now(), expected - milliseconds(BOOST_THREAD_TEST_TIME_MS));
50+
BOOST_TEST_LE(steady_clock::now(), expected + milliseconds(BOOST_THREAD_TEST_TIME_MS));
51+
}
52+
}
53+
54+
void call_pull_until(sync_tq* q, const steady_clock::time_point start)
55+
{
56+
// pull elements off of the queue (earliest element first)
57+
for (int i = cnt - 1; i >= 0; --i)
58+
{
59+
int j;
60+
q->pull_until(steady_clock::now() + hours(1), j);
61+
BOOST_TEST_EQ(i, j);
62+
const steady_clock::time_point expected = start + milliseconds(i * 500) + seconds(cnt - i);
63+
BOOST_TEST_GE(steady_clock::now(), expected - milliseconds(BOOST_THREAD_TEST_TIME_MS));
64+
BOOST_TEST_LE(steady_clock::now(), expected + milliseconds(BOOST_THREAD_TEST_TIME_MS));
65+
}
66+
}
67+
68+
void call_pull_for(sync_tq* q, const steady_clock::time_point start)
69+
{
70+
// pull elements off of the queue (earliest element first)
71+
for (int i = cnt - 1; i >= 0; --i)
72+
{
73+
int j;
74+
q->pull_for(hours(1), j);
75+
BOOST_TEST_EQ(i, j);
76+
const steady_clock::time_point expected = start + milliseconds(i * 500) + seconds(cnt - i);
77+
BOOST_TEST_GE(steady_clock::now(), expected - milliseconds(BOOST_THREAD_TEST_TIME_MS));
78+
BOOST_TEST_LE(steady_clock::now(), expected + milliseconds(BOOST_THREAD_TEST_TIME_MS));
5279
}
5380
}
5481

@@ -57,14 +84,41 @@ void test_push_while_pull()
5784
sync_tq tq;
5885
BOOST_TEST(tq.empty());
5986
boost::thread_group tg;
60-
tg.create_thread(boost::bind(call_push, &tq));
61-
tg.create_thread(boost::bind(call_pull, &tq));
87+
const steady_clock::time_point start = steady_clock::now();
88+
tg.create_thread(boost::bind(call_push, &tq, start));
89+
tg.create_thread(boost::bind(call_pull, &tq, start));
90+
tg.join_all();
91+
BOOST_TEST(tq.empty());
92+
}
93+
94+
void test_push_while_pull_until()
95+
{
96+
sync_tq tq;
97+
BOOST_TEST(tq.empty());
98+
boost::thread_group tg;
99+
const steady_clock::time_point start = steady_clock::now();
100+
tg.create_thread(boost::bind(call_push, &tq, start));
101+
tg.create_thread(boost::bind(call_pull_until, &tq, start));
102+
tg.join_all();
103+
BOOST_TEST(tq.empty());
104+
}
105+
106+
void test_push_while_pull_for()
107+
{
108+
sync_tq tq;
109+
BOOST_TEST(tq.empty());
110+
boost::thread_group tg;
111+
const steady_clock::time_point start = steady_clock::now();
112+
tg.create_thread(boost::bind(call_push, &tq, start));
113+
tg.create_thread(boost::bind(call_pull_for, &tq, start));
62114
tg.join_all();
63115
BOOST_TEST(tq.empty());
64116
}
65117

66118
int main()
67119
{
68120
test_push_while_pull();
121+
test_push_while_pull_until();
122+
test_push_while_pull_for();
69123
return boost::report_errors();
70124
}

0 commit comments

Comments
 (0)