@@ -263,50 +263,57 @@ mpsc_queue::try_consume_batch(Function &&f)
263263 throw pmem::transaction_scope_error (
264264 " Function called inside a transaction scope." );
265265
266- size_t offset;
267- size_t len = ringbuf_consume (ring_buffer.get (), &offset);
266+ bool consumed = false ;
267+
268+ /* Need to call try_consume twice, as some data may be at the end
269+ * of buffer, and some may be at the beginning. Ringbuffer does not
270+ * merge those two pats into one try_consume. If all data was
271+ * consumed during first try_consume, second would fail. */
272+ for (int i = 0 ; i < 2 ; i++) {
273+ size_t offset;
274+ size_t len = ringbuf_consume (ring_buffer.get (), &offset);
268275
269276#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
270- ANNOTATE_HAPPENS_AFTER (ring_buffer.get ());
277+ ANNOTATE_HAPPENS_AFTER (ring_buffer.get ());
271278#endif
272279
273- if (!len)
274- return false ;
275-
276- auto data = buf + offset;
280+ if (!len)
281+ return consumed;
277282
278- auto begin = iterator (data, data + len);
279- auto end = iterator (data + len, data + len);
283+ auto data = buf + offset;
284+ auto begin = iterator (data, data + len);
285+ auto end = iterator (data + len, data + len);
280286
281- auto elements_to_consume = begin != end;
287+ pmem::obj::flat_transaction::run (pop, [&] {
288+ if (begin != end) {
289+ consumed = true ;
290+ f (batch_type (begin, end));
291+ }
282292
283- pmem::obj::flat_transaction::run (pop, [&] {
284- if (elements_to_consume)
285- f (batch_type (begin, end));
293+ auto b = reinterpret_cast <first_block *>(data);
294+ clear_cachelines (b, len);
286295
287- auto b = reinterpret_cast <first_block *>(data);
288- clear_cachelines (b, len);
289-
290- if (offset + len < buff_size_)
291- pmem->written = offset + len;
292- else if (offset + len == buff_size_)
293- pmem->written = 0 ;
294- else
295- assert (false );
296- });
296+ if (offset + len < buff_size_)
297+ pmem->written = offset + len;
298+ else if (offset + len == buff_size_)
299+ pmem->written = 0 ;
300+ else
301+ assert (false );
302+ });
297303
298304#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
299- ANNOTATE_HAPPENS_BEFORE (ring_buffer.get ());
305+ ANNOTATE_HAPPENS_BEFORE (ring_buffer.get ());
300306#endif
301307
302- ringbuf_release (ring_buffer.get (), len);
308+ ringbuf_release (ring_buffer.get (), len);
309+
310+ /* XXX: it would be better to call f once - hide
311+ * wraparound behind iterators */
312+ /* XXX: add param to ringbuf_consume and do not
313+ * call store_explicit in consume */
314+ }
303315
304- /* XXX: it would be better to call f once - hide
305- * wraparound behind iterators */
306- /* XXX: add param to ringbuf_consume and do not
307- * call store_explicit in consume */
308- return try_consume_batch (std::forward<Function>(f)) ||
309- elements_to_consume;
316+ return consumed;
310317}
311318
312319inline mpsc_queue::worker::worker (mpsc_queue *q)
0 commit comments