Skip to content
This repository was archived by the owner on Mar 22, 2023. It is now read-only.

Commit 322aedd

Browse files
committed
mpsc_queue: clear dirty cachelines on recovery
1 parent 3dabc13 commit 322aedd

2 files changed

Lines changed: 55 additions & 4 deletions

File tree

include/libpmemobj++/experimental/mpsc_queue.hpp

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,22 @@ mpsc_queue::worker::store_to_log(pmem::obj::string_view data, char *log_data)
404404
pmem::detail::CACHELINE_SIZE ==
405405
0);
406406

407+
/* Invariant: producer can only produce data to cachelines which have
408+
* first 8 bytes zeroed.
409+
*/
410+
#ifndef NDEBUG
411+
auto b = reinterpret_cast<first_block *>(log_data);
412+
auto s = pmem::detail::align_up(data.size() + sizeof(first_block::size),
413+
pmem::detail::CACHELINE_SIZE);
414+
auto e = b + s / pmem::detail::CACHELINE_SIZE;
415+
while (b < e) {
416+
assert(b->size == 0);
417+
b++;
418+
}
419+
#endif
420+
421+
assert(reinterpret_cast<first_block *>(log_data)->size == 0);
422+
407423
first_block fblock;
408424
fblock.size = data.size() | size_t(first_block::DIRTY_FLAG);
409425

@@ -591,21 +607,28 @@ mpsc_queue::read_accessor::iterator::skip_consumed(mpsc_queue::first_block *b)
591607
* size bytes are junk.
592608
* 3. First 8 bytes (size) are non-zero and have dirty flag unset - next
593609
* size bytes are ready to be consumed (they represent consistent data).
610+
*
611+
* Invariant: producer can only produce data to cachelines which have
612+
* first 8 bytes zeroed. If we detect that there was a crash during
613+
* producing data (DIRTY_FLAG is set) we must clear those cachline in
614+
* consume.
594615
*/
595616
while (b < e) {
596617
if (b->size == 0) {
597618
b++;
598619
} else if (b->size & size_t(first_block::DIRTY_FLAG)) {
599-
// XXX - we should clear the cachelines here!!!! (add
600-
// test for this)
601-
602620
auto size =
603621
b->size & (~size_t(first_block::DIRTY_FLAG));
604622
auto aligned_size = pmem::detail::align_up(
605623
size + sizeof(b->size),
606624
pmem::detail::CACHELINE_SIZE);
625+
auto e =
626+
b + aligned_size / pmem::detail::CACHELINE_SIZE;
607627

608-
b += aligned_size / pmem::detail::CACHELINE_SIZE;
628+
while (b < e) {
629+
b->size = 0;
630+
b++;
631+
}
609632
} else {
610633
break;
611634
}

tests/mpsc_queue/recovery_after_consume.cpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,34 @@ check_consistency(pmem::obj::pool<root> pop)
113113
for (auto &str : values_on_pmem) {
114114
UT_ASSERT(str == std::string(produce_size, fill_pattern));
115115
}
116+
117+
auto worker = queue.register_worker();
118+
119+
static const auto overwrite_pattern = char(2);
120+
static const auto overwrite_size = 64;
121+
122+
while (true) {
123+
auto ret = worker.try_produce(
124+
overwrite_size, [&](pmem::obj::slice<char *> range) {
125+
std::fill_n(range.begin(), overwrite_size,
126+
overwrite_pattern);
127+
});
128+
129+
if (!ret)
130+
break;
131+
}
132+
133+
values_on_pmem.clear();
134+
auto ret = queue.try_consume([&](queue_type::read_accessor rd_acc) {
135+
for (auto entry : rd_acc)
136+
values_on_pmem.emplace_back(entry.data(), entry.size());
137+
});
138+
UT_ASSERT(ret);
139+
140+
for (auto &str : values_on_pmem) {
141+
UT_ASSERT(str ==
142+
std::string(overwrite_size, overwrite_pattern));
143+
}
116144
}
117145

118146
void

0 commit comments

Comments
 (0)