Skip to content
This repository was archived by the owner on Mar 22, 2023. It is now read-only.

Commit ba8f96e

Browse files
Merge pull request #1125 from karczex/mpsc_queue_remove_overload_with_tests
Mpsc queue remove overload with copying data in lambda function
2 parents 9ccad59 + dab89d3 commit ba8f96e

File tree

10 files changed

+52
-162
lines changed

10 files changed

+52
-162
lines changed

include/libpmemobj++/experimental/mpsc_queue.hpp

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
#include <libpmemobj++/detail/ringbuf.hpp>
1010
#include <libpmemobj++/make_persistent.hpp>
1111
#include <libpmemobj++/persistent_ptr.hpp>
12-
#include <libpmemobj++/slice.hpp>
1312
#include <libpmemobj++/string_view.hpp>
1413
#include <libpmemobj++/transaction.hpp>
1514

@@ -117,9 +116,6 @@ class mpsc_queue {
117116
worker(worker &&other);
118117
worker &operator=(worker &&other);
119118

120-
template <typename Function>
121-
bool try_produce(size_t size, Function &&f);
122-
123119
template <typename Function = void (*)(pmem::obj::string_view)>
124120
bool try_produce(
125121
pmem::obj::string_view data,
@@ -427,46 +423,6 @@ inline mpsc_queue::worker::~worker()
427423
}
428424
}
429425

430-
/**
431-
* @param f cannot fail. Any exception thrown from f will result
432-
* in terminate().
433-
*/
434-
template <typename Function>
435-
inline bool
436-
mpsc_queue::worker::try_produce(size_t size, Function &&f)
437-
{
438-
auto data = std::unique_ptr<char[]>(new char[size]);
439-
auto range = pmem::obj::slice<char *>(data.get(), data.get() + size);
440-
441-
auto req_size = pmem::detail::align_up(size + sizeof(first_block::size),
442-
pmem::detail::CACHELINE_SIZE);
443-
auto offset = acquire_cachelines(req_size);
444-
445-
#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
446-
ANNOTATE_HAPPENS_AFTER(queue->ring_buffer.get());
447-
#endif
448-
449-
if (offset == -1)
450-
return false;
451-
452-
try {
453-
f(range);
454-
} catch (...) {
455-
std::terminate();
456-
}
457-
458-
store_to_log(pmem::obj::string_view(data.get(), size),
459-
queue->buf + offset);
460-
461-
#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
462-
ANNOTATE_HAPPENS_BEFORE(queue->ring_buffer.get());
463-
#endif
464-
465-
produce_cachelines();
466-
467-
return true;
468-
}
469-
470426
template <typename Function>
471427
bool
472428
mpsc_queue::worker::try_produce(pmem::obj::string_view data,

tests/mpsc_queue/basic.cpp

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,7 @@ basic_test(pmem::obj::pool<root> pop, bool create)
4949

5050
/* Insert the data */
5151
for (const auto &e : values) {
52-
auto ret = worker.try_produce(
53-
e.size(), [&](pmem::obj::slice<char *> range) {
54-
std::copy_n(e.begin(), e.size(),
55-
range.begin());
56-
});
52+
auto ret = worker.try_produce(e);
5753
UT_ASSERT(ret);
5854
}
5955

@@ -77,13 +73,7 @@ basic_test(pmem::obj::pool<root> pop, bool create)
7773

7874
/* Insert new data, which may be recovered in next run of
7975
* application */
80-
ret = worker.try_produce(
81-
store_to_next_run.size(),
82-
[&](pmem::obj::slice<char *> range) {
83-
std::copy_n(store_to_next_run.begin(),
84-
store_to_next_run.size(),
85-
range.begin());
86-
});
76+
ret = worker.try_produce(store_to_next_run);
8777
UT_ASSERT(ret);
8878
} else {
8979
std::vector<std::string> values_on_pmem;

tests/mpsc_queue/consume_interrupt.cpp

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,7 @@ consume_interrupt(pmem::obj::pool<root> pop, bool create)
6363

6464
/* Insert the data */
6565
for (const auto &e : values) {
66-
ret = worker.try_produce(
67-
e.size(), [&](pmem::obj::slice<char *> range) {
68-
std::copy_n(e.begin(), e.size(),
69-
range.begin());
70-
});
66+
ret = worker.try_produce(e);
7167
UT_ASSERT(ret);
7268
}
7369

tests/mpsc_queue/consume_multipass.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,7 @@ consume_multipass(pmem::obj::pool<root> pop, size_t n_iters)
4545

4646
/* Insert the data */
4747
for (const auto &e : values) {
48-
auto ret = worker.try_produce(
49-
e.size(), [&](pmem::obj::slice<char *> range) {
50-
std::copy_n(e.begin(), e.size(), range.begin());
51-
});
48+
auto ret = worker.try_produce(e);
5249
UT_ASSERT(ret);
5350
}
5451

tests/mpsc_queue/empty.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,14 @@ consume_empty_after_insertion(pmem::obj::pool<root> pop)
5151
[&](queue_type::batch_type rd_acc) { ASSERT_UNREACHABLE; });
5252
UT_ASSERTeq(consumed, false);
5353

54-
std::vector<std::string> values = {"xxx", "aaaaaaa", "bbbbb"};
54+
std::vector<std::string> values = {
55+
"xxx", "aaaaaaa", "bbbbb",
56+
std::string(QUEUE_SIZE / 2 - 1, 'a')};
5557

5658
auto worker = queue.register_worker();
5759
/* Insert some data */
5860
for (const auto &e : values) {
59-
auto ret = worker.try_produce(
60-
e.size(), [&](pmem::obj::slice<char *> range) {
61-
std::copy_n(e.begin(), e.size(), range.begin());
62-
});
61+
auto ret = worker.try_produce(e);
6362
UT_ASSERT(ret);
6463
}
6564
/* Consume all of it */

tests/mpsc_queue/mt.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,14 @@ mt_test(pmem::obj::pool<root> pop, size_t concurrency)
7373
bool insert_succeed = false;
7474
while (!insert_succeed) {
7575
insert_succeed = worker.try_produce(
76-
e.size(),
77-
[&](pmem::obj::slice<char *>
78-
range) {
76+
e,
77+
[&](pmem::obj::string_view
78+
target) {
79+
UT_ASSERT(
80+
pmem::obj::string_view(
81+
e) ==
82+
target);
7983
x++;
80-
std::copy_n(
81-
e.begin(),
82-
e.size(),
83-
range.begin());
8484
});
8585
};
8686
}

tests/mpsc_queue/pmreorder/recovery.cpp

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ struct root {
3131
static constexpr size_t QUEUE_SIZE = 10000;
3232
static const auto produce_size = 128ULL;
3333
static const size_t concurrency = 4;
34-
static const auto fill_pattern = char(1);
34+
static const auto fill_pattern = std::string(produce_size, 'z');
3535

3636
/* Break application during produce. */
3737
static void
@@ -54,8 +54,8 @@ run_consistent(pmem::obj::pool<root> pop, bool break_produce, bool synchronized)
5454
if (id == 0 && break_produce)
5555
VALGRIND_PMC_EMIT_LOG("PMREORDER_MARKER.BEGIN");
5656

57-
worker.try_produce(produce_size,
58-
[&](pmem::obj::slice<char *> range) {
57+
worker.try_produce(fill_pattern,
58+
[&](pmem::obj::string_view v) {
5959
if (synchronized) {
6060
/* Make sure that all
6161
* other threads
@@ -64,10 +64,6 @@ run_consistent(pmem::obj::pool<root> pop, bool break_produce, bool synchronized)
6464
*/
6565
syncthreads();
6666
}
67-
68-
std::fill_n(range.begin(),
69-
produce_size,
70-
fill_pattern);
7167
});
7268

7369
if (id == 0 && break_produce)
@@ -102,7 +98,7 @@ check_consistency(pmem::obj::pool<root> pop, bool already_consumed)
10298
UT_ASSERT(values_on_pmem.size() >= expected);
10399

104100
for (auto &str : values_on_pmem) {
105-
UT_ASSERT(str == std::string(produce_size, fill_pattern));
101+
UT_ASSERT(str == fill_pattern);
106102
}
107103
}
108104

@@ -131,7 +127,7 @@ run_break_recovery(pmem::obj::pool<root> pop)
131127
UT_ASSERT(values_on_pmem.size() >= expected);
132128

133129
for (auto &str : values_on_pmem) {
134-
UT_ASSERT(str == std::string(produce_size, fill_pattern));
130+
UT_ASSERT(str == fill_pattern);
135131
}
136132
}
137133

tests/mpsc_queue/pmreorder/recovery_after_consume.cpp

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,11 @@ struct root {
3131
static constexpr size_t QUEUE_SIZE = 3000;
3232
static const auto produce_size = 128ULL;
3333
static const size_t concurrency = 4;
34-
static const auto fill_pattern = char(1);
34+
const auto fill_pattern = std::string(produce_size, 'x');
3535

3636
void
3737
run_consistent(pmem::obj::pool<root> pop)
3838
{
39-
const auto produce_size = 128ULL;
40-
const size_t concurrency = 4;
41-
const auto fill_pattern = char(1);
42-
4339
auto proot = pop.root();
4440
auto queue = queue_type(*proot->log, concurrency);
4541

@@ -56,12 +52,7 @@ run_consistent(pmem::obj::pool<root> pop)
5652
proot->capacity = capacity;
5753
pop.persist(proot->capacity);
5854

59-
make_queue_with_first_half_empty(queue, capacity, produce_size,
60-
[&](pmem::obj::slice<char *> range) {
61-
std::fill_n(range.begin(),
62-
produce_size, 1);
63-
});
64-
55+
make_queue_with_first_half_empty(queue, capacity, produce_size);
6556
/* Run this under pmreorder. After crash state of the queue should be
6657
* something like this: | produced | crashed | produced | empty |
6758
* produced |
@@ -73,12 +64,7 @@ run_consistent(pmem::obj::pool<root> pop)
7364
if (id == 0)
7465
VALGRIND_PMC_EMIT_LOG("PMREORDER_MARKER.BEGIN");
7566

76-
auto ret = worker.try_produce(
77-
produce_size,
78-
[&](pmem::obj::slice<char *> range) {
79-
std::fill_n(range.begin(), produce_size,
80-
fill_pattern);
81-
});
67+
auto ret = worker.try_produce(fill_pattern);
8268

8369
if (id == 0)
8470
VALGRIND_PMC_EMIT_LOG("PMREORDER_MARKER.END");
@@ -111,20 +97,16 @@ check_consistency(pmem::obj::pool<root> pop)
11197
UT_ASSERT(values_on_pmem.size() >= expected);
11298

11399
for (auto &str : values_on_pmem) {
114-
UT_ASSERT(str == std::string(produce_size, fill_pattern));
100+
UT_ASSERT(str == fill_pattern);
115101
}
116102

117103
auto worker = queue.register_worker();
118104

119-
static const auto overwrite_pattern = char(2);
120105
static const auto overwrite_size = 64;
106+
static const auto overwrite_pattern = std::string(overwrite_size, 'y');
121107

122108
while (true) {
123-
auto ret = worker.try_produce(
124-
overwrite_size, [&](pmem::obj::slice<char *> range) {
125-
std::fill_n(range.begin(), overwrite_size,
126-
overwrite_pattern);
127-
});
109+
auto ret = worker.try_produce(overwrite_pattern);
128110

129111
if (!ret)
130112
break;
@@ -138,8 +120,7 @@ check_consistency(pmem::obj::pool<root> pop)
138120
UT_ASSERT(ret);
139121

140122
for (auto &str : values_on_pmem) {
141-
UT_ASSERT(str ==
142-
std::string(overwrite_size, overwrite_pattern));
123+
UT_ASSERT(str == overwrite_pattern);
143124
}
144125
}
145126

tests/mpsc_queue/queue.hpp

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,17 @@
77

88
using queue_type = pmem::obj::experimental::mpsc_queue;
99

10+
/* Returns capacity in bytes */
1011
size_t
11-
get_queue_capacity(queue_type &q, size_t element_size)
12+
get_queue_capacity(queue_type &q, size_t element_size = 1)
1213
{
1314
auto worker = q.register_worker();
14-
15+
auto element = std::string(element_size, 'b');
1516
size_t capacity = 0;
16-
1717
/* Check how many elements fit in the log. */
18-
while (worker.try_produce(
19-
element_size,
20-
[&](pmem::obj::slice<char *> range) { capacity++; }))
21-
;
18+
while (worker.try_produce(element)) {
19+
capacity++;
20+
}
2221

2322
/* Clear the queue */
2423
auto ret = q.try_consume_batch([](queue_type::batch_type acc) {
@@ -30,26 +29,20 @@ get_queue_capacity(queue_type &q, size_t element_size)
3029
return capacity;
3130
}
3231

33-
template <typename F>
3432
void
3533
make_queue_with_first_half_empty(queue_type &q, size_t capacity,
36-
size_t element_size, F &&f)
34+
size_t element_size = 1)
3735
{
3836
auto worker = q.register_worker();
39-
37+
auto element = std::string(element_size, 'x');
4038
size_t produced = 0;
41-
4239
while (produced < capacity) {
4340
/* Produce half of the elements, call consume and
4441
* produce the rest. This should result in log being
4542
* consumed at the
4643
* beginning and unconsumed at the end. */
47-
auto ret = worker.try_produce(
48-
element_size, [&](pmem::obj::slice<char *> range) {
49-
f(range);
50-
produced++;
51-
});
52-
44+
auto ret = worker.try_produce(element);
45+
produced++;
5346
UT_ASSERT(ret);
5447

5548
if (produced == capacity / 2) {

0 commit comments

Comments
 (0)