Skip to content
This repository was archived by the owner on Mar 22, 2023. It is now read-only.

Commit 1f4030c

Browse files
committed
mpsc_queue: allow interrupting try_consume_batch
If interrupt (exception) happens, next try_consume_batch will resume from the previous position.
1 parent fd6e426 commit 1f4030c

6 files changed

Lines changed: 218 additions & 13 deletions

File tree

include/libpmemobj++/detail/ringbuf.hpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ struct ringbuf_t {
106106
unsigned nworkers;
107107
std::unique_ptr<ringbuf_worker_t[]> workers;
108108

109+
/* Set by ringbuf_consume, reset by ringbuf_release. */
110+
bool consume_in_progress;
111+
109112
ringbuf_t(size_t max_workers, size_t length)
110113
: workers(new ringbuf_worker_t[max_workers])
111114
{
@@ -115,6 +118,7 @@ struct ringbuf_t {
115118
space = length;
116119
end = RBUF_OFF_MAX;
117120
nworkers = max_workers;
121+
consume_in_progress = false;
118122

119123
/* Helgrind/Drd does not understand std::atomic */
120124
#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
@@ -321,10 +325,14 @@ ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t *w)
321325

322326
/*
323327
* ringbuf_consume: get a contiguous range which is ready to be consumed.
328+
*
329+
* Nested consumes are not allowed.
324330
*/
325331
inline size_t
326332
ringbuf_consume(ringbuf_t *rbuf, size_t *offset)
327333
{
334+
assert(!rbuf->consume_in_progress);
335+
328336
ringbuf_off_t written = rbuf->written, next, ready;
329337
size_t towrite;
330338
retry:
@@ -430,6 +438,10 @@ ringbuf_consume(ringbuf_t *rbuf, size_t *offset)
430438

431439
assert(ready >= written);
432440
assert(towrite <= rbuf->space);
441+
442+
if (towrite)
443+
rbuf->consume_in_progress = true;
444+
433445
return towrite;
434446
}
435447

@@ -439,6 +451,8 @@ ringbuf_consume(ringbuf_t *rbuf, size_t *offset)
439451
inline void
440452
ringbuf_release(ringbuf_t *rbuf, size_t nbytes)
441453
{
454+
rbuf->consume_in_progress = false;
455+
442456
const size_t nwritten = rbuf->written + nbytes;
443457

444458
assert(rbuf->written <= rbuf->space);

include/libpmemobj++/experimental/mpsc_queue.hpp

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ class mpsc_queue {
8080
size_t buff_size_;
8181
pmem_log_type *pmem;
8282

83+
/* Stores offset and length of next message to be consumed. Only
84+
* valid if ring_buffer->consume_in_progress. */
85+
size_t consume_offset = 0;
86+
size_t consume_len = 0;
87+
8388
public:
8489
class batch_type {
8590
public:
@@ -271,19 +276,27 @@ mpsc_queue::try_consume_batch(Function &&f)
271276
* merge those two parts into one try_consume. If all data was
272277
* consumed during first try_consume, second will do nothing. */
273278
for (int i = 0; i < 2; i++) {
274-
size_t offset;
275-
size_t len = ringbuf_consume(ring_buffer.get(), &offset);
279+
/* If there is no consume in progress, it's safe to call
280+
* ringbuf_consume. */
281+
if (!ring_buffer->consume_in_progress) {
282+
size_t offset;
283+
auto len = ringbuf_consume(ring_buffer.get(), &offset);
284+
if (!len)
285+
return consumed;
286+
287+
consume_offset = offset;
288+
consume_len = len;
289+
} else {
290+
assert(consume_len != 0);
291+
}
276292

277293
#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
278294
ANNOTATE_HAPPENS_AFTER(ring_buffer.get());
279295
#endif
280296

281-
if (!len)
282-
return consumed;
283-
284-
auto data = buf + offset;
285-
auto begin = iterator(data, data + len);
286-
auto end = iterator(data + len, data + len);
297+
auto data = buf + consume_offset;
298+
auto begin = iterator(data, data + consume_len);
299+
auto end = iterator(data + consume_len, data + consume_len);
287300

288301
pmem::obj::flat_transaction::run(pop, [&] {
289302
if (begin != end) {
@@ -292,11 +305,11 @@ mpsc_queue::try_consume_batch(Function &&f)
292305
}
293306

294307
auto b = reinterpret_cast<first_block *>(data);
295-
clear_cachelines(b, len);
308+
clear_cachelines(b, consume_len);
296309

297-
if (offset + len < buff_size_)
298-
pmem->written = offset + len;
299-
else if (offset + len == buff_size_)
310+
if (consume_offset + consume_len < buff_size_)
311+
pmem->written = consume_offset + consume_len;
312+
else if (consume_offset + consume_len == buff_size_)
300313
pmem->written = 0;
301314
else
302315
assert(false);
@@ -306,7 +319,9 @@ mpsc_queue::try_consume_batch(Function &&f)
306319
ANNOTATE_HAPPENS_BEFORE(ring_buffer.get());
307320
#endif
308321

309-
ringbuf_release(ring_buffer.get(), len);
322+
ringbuf_release(ring_buffer.get(), consume_len);
323+
324+
assert(!ring_buffer->consume_in_progress);
310325

311326
/* XXX: it would be better to call f once - hide
312327
* wraparound behind iterators */

tests/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -975,6 +975,9 @@ if(TEST_MPSC_QUEUE)
975975
build_test(mpsc_queue_consume_multipass mpsc_queue/consume_multipass.cpp)
976976
add_test_generic(NAME mpsc_queue_consume_multipass TRACERS none memcheck pmemcheck)
977977

978+
build_test(mpsc_queue_consume_interrupt mpsc_queue/consume_interrupt.cpp)
979+
add_test_generic(NAME mpsc_queue_consume_interrupt SCRIPT mpsc_queue/mpsc_queue_consume_interrupt.cmake TRACERS none memcheck pmemcheck)
980+
978981
build_test(mpsc_queue_basic mpsc_queue/basic.cpp)
979982
add_test_generic(NAME mpsc_queue_basic SCRIPT mpsc_queue/basic.cmake TRACERS none memcheck pmemcheck)
980983

tests/mpsc_queue/basic.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,13 @@ basic_test(pmem::obj::pool<root> pop, bool create)
6868
});
6969
UT_ASSERT(ret);
7070

71+
UT_ASSERTeq(values_on_pmem.size(), values.size());
72+
for (auto &str : values) {
73+
auto count = std::count(values_on_pmem.begin(),
74+
values_on_pmem.end(), str);
75+
UT_ASSERTeq(count, 1);
76+
}
77+
7178
/* Insert new data, which may be recovered in next run of
7279
* application */
7380
ret = worker.try_produce(
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// SPDX-License-Identifier: BSD-3-Clause
2+
/* Copyright 2021, Intel Corporation */
3+
4+
/*
5+
* consume_interrupt.cpp -- Test for mpsc_queue which interrupts
6+
* try_consume_batch and check if the data is accessible afterwards.
7+
*/
8+
9+
#include "unittest.hpp"
10+
11+
#include <algorithm>
12+
#include <string>
13+
14+
#include <libpmemobj++/experimental/mpsc_queue.hpp>
15+
#include <libpmemobj++/make_persistent.hpp>
16+
#include <libpmemobj++/persistent_ptr.hpp>
17+
#include <libpmemobj++/transaction.hpp>
18+
19+
#define LAYOUT "layout"
20+
21+
using queue_type = pmem::obj::experimental::mpsc_queue;
22+
23+
static constexpr size_t QUEUE_SIZE = 10000;
24+
25+
struct root {
26+
pmem::obj::persistent_ptr<queue_type::pmem_log_type> log;
27+
};
28+
29+
/* Basic try_produce-consume-recovery scenario */
30+
static void
31+
consume_interrupt(pmem::obj::pool<root> pop, bool create)
32+
{
33+
auto proot = pop.root();
34+
35+
auto queue = queue_type(*proot->log, 1);
36+
37+
auto worker = queue.register_worker();
38+
39+
std::vector<std::string> values = {"xxx", "aaaaaaa", "bbbbb",
40+
std::string(120, 'a')};
41+
42+
if (create) {
43+
auto ret = queue.try_consume_batch(
44+
[&](queue_type::batch_type acc) {
45+
ASSERT_UNREACHABLE;
46+
});
47+
UT_ASSERT(!ret);
48+
49+
/* XXX: this is to make sure that try_consume_batch later in the
50+
* test returns all elements within a single callback call. */
51+
ret = worker.try_produce(values[0]);
52+
UT_ASSERT(ret);
53+
ret = queue.try_consume_batch(
54+
[&](queue_type::batch_type rd_acc) {
55+
std::vector<std::string> v;
56+
for (const auto &str : rd_acc)
57+
v.emplace_back(str.data(), str.size());
58+
59+
UT_ASSERTeq(v.size(), 1);
60+
UT_ASSERT(v[0] == values[0]);
61+
});
62+
UT_ASSERT(ret);
63+
64+
/* Insert the data */
65+
for (const auto &e : values) {
66+
ret = worker.try_produce(
67+
e.size(), [&](pmem::obj::slice<char *> range) {
68+
std::copy_n(e.begin(), e.size(),
69+
range.begin());
70+
});
71+
UT_ASSERT(ret);
72+
}
73+
74+
/* Consume all the data */
75+
std::vector<std::string> values_on_pmem;
76+
const int retries = 3;
77+
78+
for (int i = 0; i < retries; i++) {
79+
try {
80+
ret = queue.try_consume_batch(
81+
[&](queue_type::batch_type rd_acc) {
82+
for (const auto &str : rd_acc) {
83+
values_on_pmem.emplace_back(
84+
str.data(),
85+
str.size());
86+
}
87+
88+
throw std::runtime_error("");
89+
});
90+
ASSERT_UNREACHABLE;
91+
} catch (std::runtime_error &) {
92+
} catch (...) {
93+
ASSERT_UNREACHABLE;
94+
}
95+
}
96+
97+
UT_ASSERTeq(values_on_pmem.size(), values.size() * retries);
98+
for (auto &str : values) {
99+
auto count = std::count(values_on_pmem.begin(),
100+
values_on_pmem.end(), str);
101+
UT_ASSERTeq(count, retries);
102+
}
103+
} else {
104+
std::vector<std::string> values_on_pmem;
105+
/* Recover the data in second run of application */
106+
auto ret = queue.try_consume_batch(
107+
[&](queue_type::batch_type acc) {
108+
for (const auto &entry : acc)
109+
values_on_pmem.emplace_back(
110+
entry.data(), entry.size());
111+
});
112+
UT_ASSERT(ret);
113+
UT_ASSERTeq(values_on_pmem.size(), values.size());
114+
for (auto &str : values) {
115+
auto count = std::count(values_on_pmem.begin(),
116+
values_on_pmem.end(), str);
117+
UT_ASSERTeq(count, 1);
118+
}
119+
}
120+
}
121+
122+
static void
123+
test(int argc, char *argv[])
124+
{
125+
if (argc != 3)
126+
UT_FATAL("usage: %s file-name create", argv[0]);
127+
128+
const char *path = argv[1];
129+
bool create = std::string(argv[2]) == "1";
130+
131+
pmem::obj::pool<struct root> pop;
132+
133+
if (create) {
134+
pop = pmem::obj::pool<root>::create(std::string(path), LAYOUT,
135+
PMEMOBJ_MIN_POOL,
136+
S_IWUSR | S_IRUSR);
137+
138+
pmem::obj::transaction::run(pop, [&] {
139+
pop.root()->log = pmem::obj::make_persistent<
140+
queue_type::pmem_log_type>(QUEUE_SIZE);
141+
});
142+
} else {
143+
pop = pmem::obj::pool<root>::open(std::string(path), LAYOUT);
144+
}
145+
146+
consume_interrupt(pop, create);
147+
148+
pop.close();
149+
}
150+
151+
int
152+
main(int argc, char *argv[])
153+
{
154+
return run_test([&] { test(argc, argv); });
155+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# SPDX-License-Identifier: BSD-3-Clause
2+
# Copyright 2021, Intel Corporation
3+
4+
include(${SRC_DIR}/../helpers.cmake)
5+
6+
setup()
7+
8+
execute(${TEST_EXECUTABLE} ${DIR}/testfile 1)
9+
execute(${TEST_EXECUTABLE} ${DIR}/testfile 0)
10+
11+
finish()

0 commit comments

Comments
 (0)