@@ -46,7 +46,8 @@ class mpsc_queue {
4646 struct first_block {
4747 static constexpr size_t CAPACITY =
4848 pmem::detail::CACHELINE_SIZE - sizeof (size_t );
49- static constexpr size_t DIRTY_FLAG = (1ULL << 63 );
49+ static constexpr size_t DIRTY_FLAG =
50+ (1ULL << (sizeof (size_t ) * 8 - 1 ));
5051
5152 pmem::obj::p<size_t > size;
5253 char data[CAPACITY];
@@ -106,7 +107,12 @@ class mpsc_queue {
106107
107108 template <typename Function>
108109 bool try_produce (size_t size, Function &&f);
109- bool try_produce (pmem::obj::string_view data);
110+
111+ template <typename Function = void (*)(pmem::obj::string_view)>
112+ bool try_produce (
113+ pmem::obj::string_view data,
114+ Function &&on_produce =
115+ [](pmem::obj::string_view target) {});
110116
111117 private:
112118 mpsc_queue *queue;
@@ -119,8 +125,10 @@ class mpsc_queue {
119125 public:
120126 pmem_log_type (size_t size);
121127
128+ pmem::obj::string_view data ();
129+
122130 private:
123- pmem::obj::vector<char > data ;
131+ pmem::obj::vector<char > data_ ;
124132 pmem::obj::p<size_t > written;
125133
126134 friend class mpsc_queue ;
@@ -131,14 +139,10 @@ mpsc_queue::mpsc_queue(pmem_log_type &pmem, size_t max_workers)
131139{
132140 pop = pmem::obj::pool_by_vptr (&pmem);
133141
134- auto addr = reinterpret_cast <uintptr_t >(&pmem.data [0 ]);
135- auto aligned_addr =
136- pmem::detail::align_up (addr, pmem::detail::CACHELINE_SIZE);
142+ auto buf_data = pmem.data ();
137143
138- buf = reinterpret_cast <char *>(aligned_addr);
139- buff_size_ = pmem.data .size () - (aligned_addr - addr);
140- buff_size_ = pmem::detail::align_down (buff_size_,
141- pmem::detail::CACHELINE_SIZE);
144+ buf = const_cast <char *>(buf_data.data ());
145+ buff_size_ = buf_data.size ();
142146
143147 ring_buffer = std::unique_ptr<ringbuf::ringbuf_t >(
144148 new ringbuf::ringbuf_t (max_workers, buff_size_));
@@ -220,8 +224,23 @@ mpsc_queue::restore_offsets()
220224}
221225
222226mpsc_queue::pmem_log_type::pmem_log_type (size_t size)
223- : data(size, 0 ), written(0 )
227+ : data_(size, 0 ), written(0 )
228+ {
229+ }
230+
231+ inline pmem::obj::string_view
232+ mpsc_queue::pmem_log_type::data ()
224233{
234+ auto addr = reinterpret_cast <uintptr_t >(&data_[0 ]);
235+ auto aligned_addr =
236+ pmem::detail::align_up (addr, pmem::detail::CACHELINE_SIZE);
237+
238+ auto size = data_.size () - (aligned_addr - addr);
239+ auto aligned_size =
240+ pmem::detail::align_down (size, pmem::detail::CACHELINE_SIZE);
241+
242+ return pmem::obj::string_view (
243+ reinterpret_cast <const char *>(aligned_addr), aligned_size);
225244}
226245
227246inline pmem::detail::id_manager &
@@ -245,50 +264,57 @@ mpsc_queue::try_consume_batch(Function &&f)
245264 throw pmem::transaction_scope_error (
246265 " Function called inside a transaction scope." );
247266
248- size_t offset;
249- size_t len = ringbuf_consume (ring_buffer.get (), &offset);
267+ bool consumed = false ;
268+
269+ /* Need to call try_consume twice, as some data may be at the end
270+ * of buffer, and some may be at the beginning. Ringbuffer does not
271+ * merge those two parts into one try_consume. If all data was
272+ * consumed during first try_consume, second will do nothing. */
273+ for (int i = 0 ; i < 2 ; i++) {
274+ size_t offset;
275+ size_t len = ringbuf_consume (ring_buffer.get (), &offset);
250276
251277#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
252- ANNOTATE_HAPPENS_AFTER (ring_buffer.get ());
278+ ANNOTATE_HAPPENS_AFTER (ring_buffer.get ());
253279#endif
254280
255- if (!len)
256- return false ;
257-
258- auto data = buf + offset;
281+ if (!len)
282+ return consumed;
259283
260- auto begin = iterator (data, data + len);
261- auto end = iterator (data + len, data + len);
284+ auto data = buf + offset;
285+ auto begin = iterator (data, data + len);
286+ auto end = iterator (data + len, data + len);
262287
263- auto elements_to_consume = begin != end;
288+ pmem::obj::flat_transaction::run (pop, [&] {
289+ if (begin != end) {
290+ consumed = true ;
291+ f (batch_type (begin, end));
292+ }
264293
265- pmem::obj::flat_transaction::run (pop, [&] {
266- if (elements_to_consume)
267- f (batch_type (begin, end));
294+ auto b = reinterpret_cast <first_block *>(data);
295+ clear_cachelines (b, len);
268296
269- auto b = reinterpret_cast <first_block *>(data);
270- clear_cachelines (b, len);
271-
272- if (offset + len < buff_size_)
273- pmem->written = offset + len;
274- else if (offset + len == buff_size_)
275- pmem->written = 0 ;
276- else
277- assert (false );
278- });
297+ if (offset + len < buff_size_)
298+ pmem->written = offset + len;
299+ else if (offset + len == buff_size_)
300+ pmem->written = 0 ;
301+ else
302+ assert (false );
303+ });
279304
280305#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
281- ANNOTATE_HAPPENS_BEFORE (ring_buffer.get ());
306+ ANNOTATE_HAPPENS_BEFORE (ring_buffer.get ());
282307#endif
283308
284- ringbuf_release (ring_buffer.get (), len);
309+ ringbuf_release (ring_buffer.get (), len);
285310
286- /* XXX: it would be better to call f once - hide
287- * wraparound behind iterators */
288- /* XXX: add param to ringbuf_consume and do not
289- * call store_explicit in consume */
290- return try_consume_batch (std::forward<Function>(f)) ||
291- elements_to_consume;
311+ /* XXX: it would be better to call f once - hide
312+ * wraparound behind iterators */
313+ /* XXX: add param to ringbuf_consume and do not
314+ * call store_explicit in consume */
315+ }
316+
317+ return consumed;
292318}
293319
294320inline mpsc_queue::worker::worker (mpsc_queue *q)
@@ -377,8 +403,10 @@ mpsc_queue::worker::try_produce(size_t size, Function &&f)
377403 return true ;
378404}
379405
380- inline bool
381- mpsc_queue::worker::try_produce (pmem::obj::string_view data)
406+ template <typename Function>
407+ bool
408+ mpsc_queue::worker::try_produce (pmem::obj::string_view data,
409+ Function &&on_produce)
382410{
383411 auto req_size =
384412 pmem::detail::align_up (data.size () + sizeof (first_block::size),
@@ -398,6 +426,9 @@ mpsc_queue::worker::try_produce(pmem::obj::string_view data)
398426 ANNOTATE_HAPPENS_BEFORE (queue->ring_buffer .get ());
399427#endif
400428
429+ on_produce (pmem::obj::string_view (
430+ queue->buf + offset + sizeof (first_block::size), data.size ()));
431+
401432 ringbuf_produce (queue->ring_buffer .get (), w);
402433
403434 return true ;
0 commit comments