Skip to content
This repository was archived by the owner on Mar 22, 2023. It is now read-only.

Commit 9d4af67

Browse files
Merge pull request #1121 from igchor/queue_size_cacheline
mpsc_queue: do not call ringbuf_consume/release with number of byte
2 parents 3477efe + 217c0db commit 9d4af67

File tree

3 files changed

+106
-32
lines changed

3 files changed

+106
-32
lines changed

include/libpmemobj++/detail/ringbuf.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,17 @@ struct ringbuf_t {
109109
/* Set by ringbuf_consume, reset by ringbuf_release. */
110110
bool consume_in_progress;
111111

112+
/**
113+
* Creates new ringbuf_t instance.
114+
*
115+
* Length must be < RBUF_OFF_MASK
116+
*/
112117
ringbuf_t(size_t max_workers, size_t length)
113118
: workers(new ringbuf_worker_t[max_workers])
114119
{
120+
if (length >= RBUF_OFF_MASK)
121+
throw std::out_of_range("ringbuf length too big");
122+
115123
written.store(0);
116124
next.store(0);
117125
end.store(0);

include/libpmemobj++/experimental/mpsc_queue.hpp

Lines changed: 81 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,19 @@ class mpsc_queue {
7272

7373
void clear_cachelines(first_block *block, size_t size);
7474
void restore_offsets();
75+
76+
size_t consume_cachelines(size_t *offset);
77+
void release_cachelines(size_t len);
78+
7579
inline pmem::detail::id_manager &get_id_manager();
7680

81+
/* ringbuf_t handle. Important: mpsc_queue operates on cachelines hence
82+
* ringbuf_produce/release functions are called with number of
83+
* cachelines, not bytes. */
7784
std::unique_ptr<ringbuf::ringbuf_t> ring_buffer;
7885
char *buf;
7986
pmem::obj::pool_base pop;
80-
size_t buff_size_;
87+
size_t buf_size;
8188
pmem_log_type *pmem;
8289

8390
/* Stores offset and length of next message to be consumed. Only
@@ -123,7 +130,12 @@ class mpsc_queue {
123130
mpsc_queue *queue;
124131
ringbuf::ringbuf_worker_t *w;
125132
size_t id;
133+
134+
ptrdiff_t acquire_cachelines(size_t len);
135+
void produce_cachelines();
126136
void store_to_log(pmem::obj::string_view data, char *log_data);
137+
138+
friend class mpsc_queue;
127139
};
128140

129141
class pmem_log_type {
@@ -147,25 +159,66 @@ mpsc_queue::mpsc_queue(pmem_log_type &pmem, size_t max_workers)
147159
auto buf_data = pmem.data();
148160

149161
buf = const_cast<char *>(buf_data.data());
150-
buff_size_ = buf_data.size();
162+
buf_size = buf_data.size();
163+
164+
assert(buf_size % pmem::detail::CACHELINE_SIZE == 0);
151165

152-
ring_buffer = std::unique_ptr<ringbuf::ringbuf_t>(
153-
new ringbuf::ringbuf_t(max_workers, buff_size_));
166+
ring_buffer =
167+
std::unique_ptr<ringbuf::ringbuf_t>(new ringbuf::ringbuf_t(
168+
max_workers, buf_size / pmem::detail::CACHELINE_SIZE));
154169

155170
this->pmem = &pmem;
156171

157172
restore_offsets();
158173
}
159174

175+
ptrdiff_t
176+
mpsc_queue::worker::acquire_cachelines(size_t len)
177+
{
178+
assert(len % pmem::detail::CACHELINE_SIZE == 0);
179+
auto ret = ringbuf_acquire(queue->ring_buffer.get(), w,
180+
len / pmem::detail::CACHELINE_SIZE);
181+
182+
if (ret < 0)
183+
return ret;
184+
185+
return ret * static_cast<ptrdiff_t>(pmem::detail::CACHELINE_SIZE);
186+
}
187+
188+
void
189+
mpsc_queue::worker::produce_cachelines()
190+
{
191+
ringbuf_produce(queue->ring_buffer.get(), w);
192+
}
193+
194+
size_t
195+
mpsc_queue::consume_cachelines(size_t *offset)
196+
{
197+
auto ret = ringbuf_consume(ring_buffer.get(), offset);
198+
if (ret) {
199+
*offset *= pmem::detail::CACHELINE_SIZE;
200+
return ret * pmem::detail::CACHELINE_SIZE;
201+
}
202+
203+
return 0;
204+
}
205+
206+
void
207+
mpsc_queue::release_cachelines(size_t len)
208+
{
209+
assert(len % pmem::detail::CACHELINE_SIZE == 0);
210+
ringbuf_release(ring_buffer.get(), len / pmem::detail::CACHELINE_SIZE);
211+
}
212+
160213
void
161214
mpsc_queue::restore_offsets()
162215
{
163216
/* Invariant */
164-
assert(pmem->written < buff_size_);
217+
assert(pmem->written < buf_size);
165218

166219
/* XXX: implement restore_offset function in ringbuf */
167220

168-
auto w = ringbuf_register(ring_buffer.get(), 0);
221+
auto w = register_worker();
169222

170223
if (!pmem->written) {
171224
/* If pmem->written == 0 it means that consumer should start
@@ -174,14 +227,12 @@ mpsc_queue::restore_offsets()
174227
* from overwriting the original content - mark the entire log
175228
* as produced. */
176229

177-
auto acq = ringbuf_acquire(
178-
ring_buffer.get(), w,
179-
buff_size_ - pmem::detail::CACHELINE_SIZE);
230+
auto acq = w.acquire_cachelines(buf_size -
231+
pmem::detail::CACHELINE_SIZE);
180232
assert(acq == 0);
181233
(void)acq;
182-
ringbuf_produce(ring_buffer.get(), w);
183234

184-
ringbuf_unregister(ring_buffer.get(), w);
235+
w.produce_cachelines();
185236

186237
return;
187238
}
@@ -201,31 +252,29 @@ mpsc_queue::restore_offsets()
201252
* CACHELINE_SIZE and consumer offset equal to pmem->written.
202253
*/
203254

204-
auto acq = ringbuf_acquire(ring_buffer.get(), w, pmem->written);
255+
auto acq = w.acquire_cachelines(pmem->written);
205256
assert(acq == 0);
206-
ringbuf_produce(ring_buffer.get(), w);
257+
w.produce_cachelines();
207258

208259
/* Restore consumer offset */
209260
size_t offset;
210-
auto len = ringbuf_consume(ring_buffer.get(), &offset);
261+
auto len = consume_cachelines(&offset);
211262
assert(len == pmem->written);
212-
ringbuf_release(ring_buffer.get(), len);
263+
release_cachelines(len);
213264

214265
assert(offset == 0);
215266
assert(len == pmem->written);
216267

217-
acq = ringbuf_acquire(ring_buffer.get(), w, buff_size_ - pmem->written);
218-
assert(acq != -1);
268+
acq = w.acquire_cachelines(buf_size - pmem->written);
269+
assert(acq >= 0);
219270
assert(static_cast<size_t>(acq) == pmem->written);
220-
ringbuf_produce(ring_buffer.get(), w);
271+
w.produce_cachelines();
221272

222-
acq = ringbuf_acquire(ring_buffer.get(), w,
223-
pmem->written - pmem::detail::CACHELINE_SIZE);
273+
acq = w.acquire_cachelines(pmem->written -
274+
pmem::detail::CACHELINE_SIZE);
224275
assert(acq == 0);
225-
ringbuf_produce(ring_buffer.get(), w);
226-
227-
ringbuf_unregister(ring_buffer.get(), w);
228276
(void)acq;
277+
w.produce_cachelines();
229278
}
230279

231280
mpsc_queue::pmem_log_type::pmem_log_type(size_t size)
@@ -280,7 +329,7 @@ mpsc_queue::try_consume_batch(Function &&f)
280329
* ringbuf_consume. */
281330
if (!ring_buffer->consume_in_progress) {
282331
size_t offset;
283-
auto len = ringbuf_consume(ring_buffer.get(), &offset);
332+
auto len = consume_cachelines(&offset);
284333
if (!len)
285334
return consumed;
286335

@@ -307,9 +356,9 @@ mpsc_queue::try_consume_batch(Function &&f)
307356
auto b = reinterpret_cast<first_block *>(data);
308357
clear_cachelines(b, consume_len);
309358

310-
if (consume_offset + consume_len < buff_size_)
359+
if (consume_offset + consume_len < buf_size)
311360
pmem->written = consume_offset + consume_len;
312-
else if (consume_offset + consume_len == buff_size_)
361+
else if (consume_offset + consume_len == buf_size)
313362
pmem->written = 0;
314363
else
315364
assert(false);
@@ -319,7 +368,7 @@ mpsc_queue::try_consume_batch(Function &&f)
319368
ANNOTATE_HAPPENS_BEFORE(ring_buffer.get());
320369
#endif
321370

322-
ringbuf_release(ring_buffer.get(), consume_len);
371+
release_cachelines(consume_len);
323372

324373
assert(!ring_buffer->consume_in_progress);
325374

@@ -391,7 +440,7 @@ mpsc_queue::worker::try_produce(size_t size, Function &&f)
391440

392441
auto req_size = pmem::detail::align_up(size + sizeof(first_block::size),
393442
pmem::detail::CACHELINE_SIZE);
394-
auto offset = ringbuf_acquire(queue->ring_buffer.get(), w, req_size);
443+
auto offset = acquire_cachelines(req_size);
395444

396445
#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
397446
ANNOTATE_HAPPENS_AFTER(queue->ring_buffer.get());
@@ -413,7 +462,7 @@ mpsc_queue::worker::try_produce(size_t size, Function &&f)
413462
ANNOTATE_HAPPENS_BEFORE(queue->ring_buffer.get());
414463
#endif
415464

416-
ringbuf_produce(queue->ring_buffer.get(), w);
465+
produce_cachelines();
417466

418467
return true;
419468
}
@@ -426,7 +475,7 @@ mpsc_queue::worker::try_produce(pmem::obj::string_view data,
426475
auto req_size =
427476
pmem::detail::align_up(data.size() + sizeof(first_block::size),
428477
pmem::detail::CACHELINE_SIZE);
429-
auto offset = ringbuf_acquire(queue->ring_buffer.get(), w, req_size);
478+
auto offset = acquire_cachelines(req_size);
430479

431480
#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
432481
ANNOTATE_HAPPENS_AFTER(queue->ring_buffer.get());
@@ -444,7 +493,7 @@ mpsc_queue::worker::try_produce(pmem::obj::string_view data,
444493
on_produce(pmem::obj::string_view(
445494
queue->buf + offset + sizeof(first_block::size), data.size()));
446495

447-
ringbuf_produce(queue->ring_buffer.get(), w);
496+
produce_cachelines();
448497

449498
return true;
450499
}
@@ -578,7 +627,7 @@ mpsc_queue::clear_cachelines(first_block *block, size_t size)
578627
block++;
579628
}
580629

581-
assert(end <= reinterpret_cast<first_block *>(buf + buff_size_));
630+
assert(end <= reinterpret_cast<first_block *>(buf + buf_size));
582631
}
583632

584633
mpsc_queue::iterator &

tests/mpsc_queue/ringbuf.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,22 @@ test_random(void)
314314
delete r;
315315
}
316316

317+
static void
318+
test_size()
319+
{
320+
try {
321+
auto size = (1ULL << 32) + 1;
322+
auto r = new ringbuf_t(1, size);
323+
(void)r;
324+
325+
ASSERT_UNREACHABLE;
326+
} catch (std::out_of_range &) {
327+
328+
} catch (...) {
329+
ASSERT_UNREACHABLE;
330+
}
331+
}
332+
317333
int
318334
main(void)
319335
{
@@ -326,5 +342,6 @@ main(void)
326342
test_multi();
327343
test_overlap();
328344
test_random();
345+
test_size();
329346
return 0;
330347
}

0 commit comments

Comments
 (0)