Skip to content
This repository was archived by the owner on Mar 22, 2023. It is now read-only.

Commit 4e7692d

Browse files
author
Pawel Karczewski
committed
Add mpsc_queue basic example
1 parent 4daab96 commit 4e7692d

File tree

3 files changed

+145
-0
lines changed

3 files changed

+145
-0
lines changed

examples/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,3 +173,5 @@ add_example(make_persistent make_persistent/make_persistent.cpp)
173173
add_example(persistent persistent/persistent.cpp)
174174

175175
add_example(transaction transaction/transaction.cpp)
176+
177+
add_example(mpsc_queue mpsc_queue/mpsc_queue.cpp)

examples/mpsc_queue/CMakeLists.txt

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# SPDX-License-Identifier: BSD-3-Clause
2+
# Copyright 2021, Intel Corporation
3+
4+
cmake_minimum_required(VERSION 3.3)
5+
project(mpsc_queue CXX)
6+
7+
set(CXX_STANDARD_REQUIRED ON)
8+
set(CMAKE_CXX_STANDARD 11)
9+
10+
find_package(PkgConfig QUIET)
11+
if(PKG_CONFIG_FOUND)
12+
pkg_check_modules(LIBPMEMOBJ++ REQUIRED libpmemobj++)
13+
else()
14+
find_package(LIBPMEMOBJ++ REQUIRED)
15+
endif()
16+
17+
link_directories(${LIBPMEMOBJ++_LIBRARY_DIRS})
18+
19+
add_executable(mpsc_queue.cpp)
20+
target_include_directories(mpsc_queue PUBLIC ${LIBPMEMOBJ++_INCLUDE_DIRS} . ..)
21+
target_link_libraries(mpsc_queue ${LIBPMEMOBJ++_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})

examples/mpsc_queue/mpsc_queue.cpp

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
// SPDX-License-Identifier: BSD-3-Clause
2+
/* Copyright 2021, Intel Corporation */
3+
4+
/**
5+
* mpsc_queue.cpp -- example which shows how to use
6+
* pmem::obj::experimental::mpsc_queue
7+
*/
8+
9+
#include <libpmemobj++/experimental/mpsc_queue.hpp>
10+
#include <libpmemobj++/make_persistent.hpp>
11+
#include <libpmemobj++/persistent_ptr.hpp>
12+
#include <libpmemobj++/transaction.hpp>
13+
14+
#include <iostream>
15+
#include <string>
16+
17+
void
18+
show_usage(char *argv[])
19+
{
20+
std::cerr << "usage: " << argv[0] << " file-name" << std::endl;
21+
}
22+
23+
//! [mpsc_queue_single_threaded_example]
24+
25+
struct root {
26+
pmem::obj::persistent_ptr<
27+
pmem::obj::experimental::mpsc_queue::pmem_log_type>
28+
log;
29+
};
30+
31+
void
32+
single_threaded(pmem::obj::pool<root> pop)
33+
{
34+
35+
std::vector<std::string> values_to_produce = {"xxx", "aaaaaaa", "bbbbb",
36+
"cccc", "ddddddddddd"};
37+
pmem::obj::persistent_ptr<root> proot = pop.root();
38+
39+
/* Create mpsc_queue, which uses pmem_log_type object to store
40+
* data. */
41+
auto queue = pmem::obj::experimental::mpsc_queue(*proot->log, 1);
42+
43+
/* Consume data, which was stored in the queue in the previous run of
44+
* the application. */
45+
//! [try_consume_batch]
46+
queue.try_consume_batch(
47+
[&](pmem::obj::experimental::mpsc_queue::batch_type rd_acc) {
48+
for (pmem::obj::string_view str : rd_acc) {
49+
std::cout << std::string(str.data(), str.size())
50+
<< std::endl;
51+
}
52+
});
53+
//! [try_consume_batch]
54+
/* Produce and consume data. */
55+
//! [register_worker]
56+
pmem::obj::experimental::mpsc_queue::worker worker =
57+
queue.register_worker();
58+
//! [register_worker]
59+
for (std::string &value : values_to_produce) {
60+
//! [try_produce]
61+
/* Produce data. */
62+
worker.try_produce(value);
63+
//! [try_produce]
64+
/* Consume produced data. */
65+
queue.try_consume_batch(
66+
[&](pmem::obj::experimental::mpsc_queue::batch_type
67+
rd_acc) {
68+
for (pmem::obj::string_view str : rd_acc) {
69+
std::cout << std::string(str.data(),
70+
str.size())
71+
<< std::endl;
72+
}
73+
});
74+
}
75+
//! [try_produce_string_view]
76+
/* Produce data to be consumed in next run of the application. */
77+
worker.try_produce("Left for next run");
78+
//! [try_produce_string_view]
79+
}
80+
81+
//! [mpsc_queue_single_threaded_example]
82+
83+
int
84+
main(int argc, char *argv[])
85+
{
86+
if (argc < 2) {
87+
show_usage(argv);
88+
return 1;
89+
}
90+
91+
const char *path = argv[1];
92+
93+
static constexpr size_t QUEUE_SIZE = 1000;
94+
95+
pmem::obj::pool<root> pop;
96+
try {
97+
pop = pmem::obj::pool<root>::open(path, "mpsc_queue");
98+
if (pop.root()->log == nullptr) {
99+
pmem::obj::transaction::run(pop, [&] {
100+
pop.root()->log = pmem::obj::make_persistent<
101+
pmem::obj::experimental::mpsc_queue::
102+
pmem_log_type>(QUEUE_SIZE);
103+
});
104+
}
105+
single_threaded(pop);
106+
107+
} catch (pmem::pool_error &e) {
108+
std::cerr << e.what() << std::endl;
109+
std::cerr
110+
<< "To create pool run: pmempool create obj --layout=mpsc_queue -s 100M path_to_pool"
111+
<< std::endl;
112+
} catch (std::exception &e) {
113+
std::cerr << e.what() << std::endl;
114+
}
115+
116+
try {
117+
pop.close();
118+
} catch (const std::logic_error &e) {
119+
std::cerr << e.what() << std::endl;
120+
}
121+
return 0;
122+
}

0 commit comments

Comments
 (0)