Skip to content
This repository was archived by the owner on Mar 22, 2023. It is now read-only.

Commit f67e2b3

Browse files
committed
mpsc_queue: refactor try_consume
- rename try_consume to try_consume_batch - rename read_accessor to batch_type - clear cachelines only after user callback executes: this allows iterating over the data arbitrary number of times (including 0) - call user callback within a transaction to simplify logic
1 parent 322aedd commit f67e2b3

10 files changed

Lines changed: 272 additions & 190 deletions

File tree

include/libpmemobj++/experimental/mpsc_queue.hpp

Lines changed: 115 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -30,58 +30,70 @@ namespace experimental
3030

3131
/* XXX: Add documentation */
3232
class mpsc_queue {
33-
private:
34-
struct first_block;
35-
3633
public:
37-
class read_accessor;
3834
class worker;
3935
class pmem_log_type;
36+
class batch_type;
4037

4138
mpsc_queue(pmem_log_type &pmem, size_t max_workers = 1);
4239

4340
worker register_worker();
4441

4542
template <typename Function>
46-
bool try_consume(Function &&f);
43+
bool try_consume_batch(Function &&f);
4744

48-
class read_accessor {
49-
private:
50-
struct iterator {
51-
iterator(mpsc_queue *queue, char *data, char *end);
45+
private:
46+
struct first_block {
47+
static constexpr size_t CAPACITY =
48+
pmem::detail::CACHELINE_SIZE - sizeof(size_t);
49+
static constexpr size_t DIRTY_FLAG = (1ULL << 63);
5250

53-
iterator &operator++();
51+
pmem::obj::p<size_t> size;
52+
char data[CAPACITY];
53+
};
5454

55-
bool operator==(const iterator &rhs);
56-
bool operator!=(const iterator &rhs);
55+
struct iterator {
56+
iterator(char *data, char *end);
5757

58-
pmem::obj::string_view operator*() const;
58+
iterator &operator++();
5959

60-
private:
61-
first_block *skip_consumed(first_block *);
60+
bool operator==(const iterator &rhs);
61+
bool operator!=(const iterator &rhs);
6262

63-
mpsc_queue *queue;
64-
char *data;
65-
char *end;
66-
};
63+
pmem::obj::string_view operator*() const;
6764

68-
iterator begin_;
69-
iterator end_;
65+
private:
66+
first_block *seek_next(first_block *);
67+
68+
char *data;
69+
char *end;
70+
};
71+
72+
void clear_cachelines(first_block *block, size_t size);
73+
void restore_offsets();
74+
inline pmem::detail::id_manager &get_id_manager();
75+
76+
std::unique_ptr<ringbuf::ringbuf_t> ring_buffer;
77+
char *buf;
78+
pmem::obj::pool_base pop;
79+
size_t buff_size_;
80+
pmem_log_type *pmem;
7081

82+
public:
83+
class batch_type {
7184
public:
72-
read_accessor(mpsc_queue *queue, char *data, size_t len);
85+
batch_type(iterator begin, iterator end);
7386

7487
iterator begin();
7588
iterator end();
89+
90+
private:
91+
iterator begin_;
92+
iterator end_;
7693
};
7794

7895
/* All workers should be destroyed before destruction of mpsc_queue */
7996
class worker {
80-
private:
81-
mpsc_queue *queue;
82-
ringbuf::ringbuf_worker_t *w;
83-
size_t id;
84-
8597
public:
8698
worker(mpsc_queue *q);
8799
~worker();
@@ -97,6 +109,9 @@ class mpsc_queue {
97109
bool try_produce(pmem::obj::string_view data);
98110

99111
private:
112+
mpsc_queue *queue;
113+
ringbuf::ringbuf_worker_t *w;
114+
size_t id;
100115
void store_to_log(pmem::obj::string_view data, char *log_data);
101116
};
102117

@@ -110,26 +125,6 @@ class mpsc_queue {
110125

111126
friend class mpsc_queue;
112127
};
113-
114-
private:
115-
struct first_block {
116-
static constexpr size_t CAPACITY =
117-
pmem::detail::CACHELINE_SIZE - sizeof(size_t);
118-
static constexpr size_t DIRTY_FLAG = (1ULL << 63);
119-
120-
pmem::obj::p<size_t> size;
121-
char data[CAPACITY];
122-
};
123-
124-
inline pmem::detail::id_manager &get_id_manager();
125-
126-
std::unique_ptr<ringbuf::ringbuf_t> ring_buffer;
127-
char *buf;
128-
pmem::obj::pool_base pop;
129-
size_t buff_size_;
130-
pmem_log_type *pmem;
131-
132-
void restore_offsets();
133128
};
134129

135130
mpsc_queue::mpsc_queue(pmem_log_type &pmem, size_t max_workers)
@@ -244,7 +239,7 @@ mpsc_queue::register_worker()
244239

245240
template <typename Function>
246241
inline bool
247-
mpsc_queue::try_consume(Function &&f)
242+
mpsc_queue::try_consume_batch(Function &&f)
248243
{
249244
if (pmemobj_tx_stage() != TX_STAGE_NONE)
250245
throw pmem::transaction_scope_error(
@@ -257,32 +252,43 @@ mpsc_queue::try_consume(Function &&f)
257252
ANNOTATE_HAPPENS_AFTER(ring_buffer.get());
258253
#endif
259254

260-
if (len != 0) {
261-
pmem->written = offset;
262-
pop.persist(pmem->written);
255+
if (!len)
256+
return false;
257+
258+
auto data = buf + offset;
259+
260+
auto begin = iterator(data, data + len);
261+
auto end = iterator(data + len, data + len);
263262

264-
auto acc = read_accessor(this, buf + offset, len);
263+
auto elements_to_consume = begin != end;
265264

266-
// XXX - we can mark begin/end as && (can only by called on
267-
// std::move(acc))
268-
bool elements_to_consume = (acc.begin() != acc.end());
265+
pmem::obj::flat_transaction::run(pop, [&] {
269266
if (elements_to_consume)
270-
f(acc);
267+
f(batch_type(begin, end));
268+
269+
auto b = reinterpret_cast<first_block *>(data);
270+
clear_cachelines(b, len);
271+
272+
if (offset + len < buff_size_)
273+
pmem->written = offset + len;
274+
else if (offset + len == buff_size_)
275+
pmem->written = 0;
276+
else
277+
assert(false);
278+
});
271279

272280
#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
273-
ANNOTATE_HAPPENS_BEFORE(ring_buffer.get());
281+
ANNOTATE_HAPPENS_BEFORE(ring_buffer.get());
274282
#endif
275283

276-
ringbuf_release(ring_buffer.get(), len);
284+
ringbuf_release(ring_buffer.get(), len);
277285

278-
/* XXX: it would be better to call f once - hide
279-
* wraparound behind iterators */
280-
/* XXX: add param to ringbuf_consume and do not
281-
* call store_explicit in consume */
282-
return try_consume(std::forward<Function>(f)) ||
283-
elements_to_consume;
284-
}
285-
return false;
286+
/* XXX: it would be better to call f once - hide
287+
* wraparound behind iterators */
288+
/* XXX: add param to ringbuf_consume and do not
289+
* call store_explicit in consume */
290+
return try_consume_batch(std::forward<Function>(f)) ||
291+
elements_to_consume;
286292
}
287293

288294
inline mpsc_queue::worker::worker(mpsc_queue *q)
@@ -484,119 +490,93 @@ mpsc_queue::worker::store_to_log(pmem::obj::string_view data, char *log_data)
484490
pmem::detail::CACHELINE_SIZE, PMEMOBJ_F_MEM_NONTEMPORAL);
485491
}
486492

487-
inline mpsc_queue::read_accessor::read_accessor(mpsc_queue *queue, char *data,
488-
size_t len)
489-
: begin_(queue, data, data + len), end_(queue, data + len, data + len)
493+
inline mpsc_queue::batch_type::batch_type(iterator begin_, iterator end_)
494+
: begin_(begin_), end_(end_)
490495
{
491496
}
492497

493-
inline mpsc_queue::read_accessor::iterator
494-
mpsc_queue::read_accessor::begin()
498+
inline mpsc_queue::iterator
499+
mpsc_queue::batch_type::begin()
495500
{
496501
return begin_;
497502
}
498503

499-
inline mpsc_queue::read_accessor::iterator
500-
mpsc_queue::read_accessor::end()
504+
inline mpsc_queue::iterator
505+
mpsc_queue::batch_type::end()
501506
{
502507
return end_;
503508
}
504509

505-
inline mpsc_queue::read_accessor::iterator::iterator(mpsc_queue *queue,
506-
char *data, char *end)
507-
: queue(queue), data(data), end(end)
510+
mpsc_queue::iterator::iterator(char *data, char *end) : data(data), end(end)
508511
{
509-
auto pop = pmem::obj::pool_by_vptr(data);
510-
pmem::obj::flat_transaction::run(pop, [&] {
511-
auto b = reinterpret_cast<first_block *>(data);
512-
auto unconsumed = skip_consumed(b);
512+
auto b = reinterpret_cast<first_block *>(data);
513+
auto next = seek_next(b);
514+
assert(next >= b);
515+
this->data = reinterpret_cast<char *>(next);
516+
}
513517

514-
assert(unconsumed >= b);
515-
queue->pmem->written += static_cast<size_t>(unconsumed - b) *
516-
sizeof(first_block);
517-
if (queue->pmem->written == queue->buff_size_)
518-
queue->pmem->written = 0;
518+
void
519+
mpsc_queue::clear_cachelines(first_block *block, size_t size)
520+
{
521+
assert(size % pmem::detail::CACHELINE_SIZE == 0);
522+
assert(pmemobj_tx_stage() == TX_STAGE_WORK);
519523

520-
this->data = reinterpret_cast<char *>(unconsumed);
521-
});
524+
auto end = block +
525+
static_cast<ptrdiff_t>(size / pmem::detail::CACHELINE_SIZE);
526+
527+
while (block < end) {
528+
/* data in block might be uninitialized. */
529+
detail::conditional_add_to_tx(&block->size, 1,
530+
POBJ_XADD_ASSUME_INITIALIZED);
531+
block->size = 0;
532+
block++;
533+
}
534+
535+
assert(end <= reinterpret_cast<first_block *>(buf + buff_size_));
522536
}
523537

524-
/* Invalidates data after increment */
525-
inline mpsc_queue::read_accessor::iterator &
526-
mpsc_queue::read_accessor::iterator::operator++()
538+
mpsc_queue::iterator &
539+
mpsc_queue::iterator::operator++()
527540
{
528-
auto pop = pmem::obj::pool_by_vptr(data);
529-
530541
auto block = reinterpret_cast<first_block *>(data);
531-
532542
assert(block->size != 0);
533543

534544
auto element_size =
535545
pmem::detail::align_up(block->size + sizeof(block->size),
536546
pmem::detail::CACHELINE_SIZE);
537-
auto element_end = reinterpret_cast<first_block *>(
538-
data + static_cast<ptrdiff_t>(element_size));
539547

540-
assert(element_end <=
541-
reinterpret_cast<first_block *>(queue->buf + queue->buff_size_));
548+
block += element_size / pmem::detail::CACHELINE_SIZE;
542549

543-
/* Mark all cachelines as consumed. */
544-
pmem::obj::flat_transaction::run(pop, [&] {
545-
while (block < element_end) {
546-
/* data in block might be uninitialized. */
547-
detail::conditional_add_to_tx(
548-
&block->size, 1, POBJ_XADD_ASSUME_INITIALIZED);
549-
block->size = 0;
550-
block++;
551-
}
552-
553-
queue->pmem->written += pmem::detail::align_up(
554-
element_size, pmem::detail::CACHELINE_SIZE);
555-
556-
/* Go to the next, unconsumed element. */
557-
auto unconsumed = skip_consumed(block);
558-
559-
assert(unconsumed >= block);
560-
queue->pmem->written +=
561-
static_cast<size_t>(unconsumed - block) *
562-
sizeof(first_block);
563-
if (queue->pmem->written == queue->buff_size_)
564-
queue->pmem->written = 0;
565-
566-
block = unconsumed;
567-
});
550+
auto next = seek_next(block);
551+
assert(next >= block);
552+
block = next;
568553

569554
data = reinterpret_cast<char *>(block);
570555

571556
return *this;
572557
}
573558

574-
inline bool
575-
mpsc_queue::read_accessor::iterator::operator==(
576-
const mpsc_queue::read_accessor::iterator &rhs)
559+
bool
560+
mpsc_queue::iterator::operator==(const mpsc_queue::iterator &rhs)
577561
{
578562
return data == rhs.data;
579563
}
580564

581-
inline bool
582-
mpsc_queue::read_accessor::iterator::operator!=(
583-
const mpsc_queue::read_accessor::iterator &rhs)
565+
bool
566+
mpsc_queue::iterator::operator!=(const mpsc_queue::iterator &rhs)
584567
{
585568
return data != rhs.data;
586569
}
587570

588-
inline pmem::obj::string_view
589-
mpsc_queue::read_accessor::iterator::operator*() const
571+
pmem::obj::string_view mpsc_queue::iterator::operator*() const
590572
{
591573
auto b = reinterpret_cast<first_block *>(data);
592574
return pmem::obj::string_view(b->data, b->size);
593575
}
594576

595-
inline mpsc_queue::first_block *
596-
mpsc_queue::read_accessor::iterator::skip_consumed(mpsc_queue::first_block *b)
577+
mpsc_queue::first_block *
578+
mpsc_queue::iterator::seek_next(mpsc_queue::first_block *b)
597579
{
598-
assert(pmemobj_tx_stage() == TX_STAGE_WORK);
599-
600580
auto e = reinterpret_cast<first_block *>(end);
601581

602582
/* Advance to first, unconsumed element. Each cacheline can be in one of
@@ -607,11 +587,6 @@ mpsc_queue::read_accessor::iterator::skip_consumed(mpsc_queue::first_block *b)
607587
* size bytes are junk.
608588
* 3. First 8 bytes (size) are non-zero and have dirty flag unset - next
609589
* size bytes are ready to be consumed (they represent consistent data).
610-
*
611-
* Invariant: producer can only produce data to cachelines which have
612-
* first 8 bytes zeroed. If we detect that there was a crash during
613-
* producing data (DIRTY_FLAG is set) we must clear those cachline in
614-
* consume.
615590
*/
616591
while (b < e) {
617592
if (b->size == 0) {
@@ -622,13 +597,8 @@ mpsc_queue::read_accessor::iterator::skip_consumed(mpsc_queue::first_block *b)
622597
auto aligned_size = pmem::detail::align_up(
623598
size + sizeof(b->size),
624599
pmem::detail::CACHELINE_SIZE);
625-
auto e =
626-
b + aligned_size / pmem::detail::CACHELINE_SIZE;
627600

628-
while (b < e) {
629-
b->size = 0;
630-
b++;
631-
}
601+
b += aligned_size / pmem::detail::CACHELINE_SIZE;
632602
} else {
633603
break;
634604
}

0 commit comments

Comments
 (0)