11// SPDX-License-Identifier: BSD-3-Clause
22/* Copyright 2021, Intel Corporation */
33
4+ /* *
5+ * @file
6+ * Implementation of persistent multi producer single consumer queue.
7+ */
8+
49#ifndef LIBPMEMOBJ_MPSC_QUEUE_HPP
510#define LIBPMEMOBJ_MPSC_QUEUE_HPP
611
@@ -27,7 +32,21 @@ namespace obj
2732namespace experimental
2833{
2934
30- /* XXX: Add documentation */
35+ /* *
36+ * Persistent memory aware implementation of multi producer single consumer
37+ * queue.
38+ *
39+ * In case of crash or shutdown, reading and writing may be continued
40+ * by new process, from the last position without loss of any, already produced
41+ * data.
42+ *
43+ * @note try_consume_batch() MUST be called after creation of mpsc_queue object
44+ * if pmem_log_type objcect was already used by instance of mpsc_queue - e.g. in
45+ * previous run of application. If try_consume_batch() is not called, produce
46+ * may fail, even if the queue is empty.
47+ *
48+ * @snippet mpsc_queue/mpsc_queue.cpp mpsc_queue_single_threaded_example
49+ */
3150class mpsc_queue {
3251public:
3352 class worker ;
@@ -92,6 +111,10 @@ class mpsc_queue {
92111 size_t consume_len = 0 ;
93112
94113public:
114+ /* *
115+ * Type representing the range of the mpsc_queue elements. May be used
116+ * in the range-based loops over accessed elements.
117+ * */
95118 class batch_type {
96119 public:
97120 batch_type (iterator begin, iterator end);
@@ -104,7 +127,17 @@ class mpsc_queue {
104127 iterator end_;
105128 };
106129
107- /* All workers should be destroyed before destruction of mpsc_queue */
130+ /* *
131+ * mpsc_queue producer worker class. To write data concurrently into the
132+ * mpsc_queue in the multi-threaded application, each producer thread
133+ * have to use its own worker object. Workers might be added
134+ * concurrently to the mpsc_queue.
135+ *
136+ * @note All workers have to be destroyed before destruction of
137+ * the mpsc_queue
138+ *
139+ * @see mpsc_queue:try_produce_batch()
140+ */
108141 class worker {
109142 public:
110143 worker (mpsc_queue *q);
@@ -134,6 +167,16 @@ class mpsc_queue {
134167 friend class mpsc_queue ;
135168 };
136169
170+ /* *
171+ * Type representing persistent data, which may be managed by
172+ * mpsc_queue.
173+ *
174+ * Object of this type has to be managed by pmem::obj::pool, to be
175+ * usable in mpsc_queue.
176+ * Once created, pmem_log_type object cannot be resized.
177+ *
178+ * @param size size of the log.
179+ */
137180 class pmem_log_type {
138181 public:
139182 pmem_log_type (size_t size);
@@ -148,6 +191,13 @@ class mpsc_queue {
148191 };
149192};
150193
194+ /* *
195+ * mpsc_queue constructor.
196+ *
197+ * @param[in] pmem reference to already allocated pmem_log_type object
198+ * @param[in] max_workers maximum number of workers which may be added to
199+ * mpsc_queue at the same time.
200+ */
151201mpsc_queue::mpsc_queue (pmem_log_type &pmem, size_t max_workers)
152202{
153203 pop = pmem::obj::pool_by_vptr (&pmem);
@@ -273,11 +323,22 @@ mpsc_queue::restore_offsets()
273323 w.produce_cachelines ();
274324}
275325
326+ /* *
327+ * Constructs pmem_log_type object
328+ *
329+ * @param size size of the log in bytes
330+ */
276331mpsc_queue::pmem_log_type::pmem_log_type (size_t size)
277332 : data_(size, 0 ), written(0 )
278333{
279334}
280335
336+ /* *
337+ * Returns pmem::obj::string_view which allows to read-only access to the
338+ * underlying buffer.
339+ *
340+ * @return pmem::obj::string_view of the log data.
341+ */
281342inline pmem::obj::string_view
282343mpsc_queue::pmem_log_type::data ()
283344{
@@ -300,12 +361,39 @@ mpsc_queue::get_id_manager()
300361 return manager;
301362}
302363
364+ /* *
365+ * Registers the producer worker. Number of workers have to be less or equal
366+ * to max_workers specified in the mpsc_queue constructor.
367+ *
368+ * @return producer worker object.
369+ *
370+ * @snippet mpsc_queue/mpsc_queue.cpp register_worker
371+ */
303372inline mpsc_queue::worker
304373mpsc_queue::register_worker ()
305374{
306375 return worker (this );
307376}
308377
378+ /* *
379+ * Evaluates callback function f() for the data, which is ready to be
380+ * consumed. try_consume_batch() accesses data, and evaluates callback inside a
381+ * transaction. If an exception is thrown within callback, it gets
382+ * propagated to the caller and causes a transaction abort. In such case, next
383+ * try_consume_batch() call would consume the same data.
384+ *
385+ * @return true if consumed any data, false otherwise.
386+ *
387+ * @throws transaction_scope_error
388+ *
389+ * @note try_consume_batch() MUST be called after creation of mpsc_queue object
390+ * if pmem_log_type objcect was already used by any instance of mpsc_queue.
391+ * Otherwise produce might fail even if the queue is empty)
392+ *
393+ * @see mpsc_queue::worker::try_produce()
394+ *
395+ * @snippet mpsc_queue/mpsc_queue.cpp try_consume_batch
396+ */
309397template <typename Function>
310398inline bool
311399mpsc_queue::try_consume_batch (Function &&f)
@@ -423,6 +511,18 @@ inline mpsc_queue::worker::~worker()
423511 }
424512}
425513
514+ /* *
515+ * Copies data from pmem::obj::string_view into the mpsc_queue.
516+ *
517+ * @param[in] data Data to be copied into mpsc_queue
518+ * @param[in] on_produce Function evaluated on the data in queue, before
519+ * the data is visible for the consumer. By default do nothing.
520+ *
521+ * @return true if f were evaluated, all data copied by it saved in the
522+ * mpsc_queue, and are visible for the consumer.
523+ *
524+ * @snippet mpsc_queue/mpsc_queue.cpp try_produce_string_view
525+ */
426526template <typename Function>
427527bool
428528mpsc_queue::worker::try_produce (pmem::obj::string_view data,
@@ -546,12 +646,24 @@ inline mpsc_queue::batch_type::batch_type(iterator begin_, iterator end_)
546646{
547647}
548648
649+ /* *
650+ * Returns an iterator to the beginning of the accessed range of the
651+ * mpsc_queue.
652+ *
653+ * @return Iterator to the first element.
654+ */
549655inline mpsc_queue::iterator
550656mpsc_queue::batch_type::begin () const
551657{
552658 return begin_;
553659}
554660
661+ /* *
662+ * Returns an iterator to the end of the accessed range of the mpsc_queue.
663+ *
664+ * @return Iterator to the last element.
665+ */
666+
555667inline mpsc_queue::iterator
556668mpsc_queue::batch_type::end () const
557669{
0 commit comments