Skip to content
This repository was archived by the owner on Mar 22, 2023. It is now read-only.

Commit f5552be

Browse files
Merge pull request #1078 from JanDorniak99/radix_concurrent_erase
Radix: concurrent erase
2 parents fd6e426 + d49db42 commit f5552be

12 files changed

Lines changed: 923 additions & 37 deletions

File tree

LICENSE

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,7 @@ following exceptions:
3939
* include/detail/ringbuf.hpp and tests/mpsc_queue/ringbuf.cpp contain
4040
ring buffer implementation and tests adapted from: https://github.com/rmind/ringbuf
4141
which is covered by licence included in those files.
42+
43+
* include/detail/ebr.hpp contain EBR implementation adapted
44+
from: https://github.com/rmind/libqsbr which is covered by
45+
licence included in this file.
Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
/*-
2+
* Copyright (c) 2015-2018 Mindaugas Rasiukevicius <rmind at noxt eu>
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions
7+
* are met:
8+
* 1. Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* 2. Redistributions in binary form must reproduce the above copyright
11+
* notice, this list of conditions and the following disclaimer in the
12+
* documentation and/or other materials provided with the distribution.
13+
*
14+
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15+
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17+
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18+
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19+
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20+
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21+
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22+
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23+
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24+
* SUCH DAMAGE.
25+
*/
26+
27+
// SPDX-License-Identifier: BSD-3-Clause
28+
/* Copyright 2021, Intel Corporation */
29+
30+
/**
31+
* @file
32+
* C++ EBR API.
33+
*/
34+
35+
#ifndef LIBPMEMOBJ_EBR_HPP
36+
#define LIBPMEMOBJ_EBR_HPP
37+
38+
#include <atomic>
39+
#include <cassert>
40+
#include <functional>
41+
#include <mutex>
42+
#include <thread>
43+
#include <unordered_map>
44+
45+
#include <libpmemobj++/detail/common.hpp>
46+
47+
namespace pmem
48+
{
49+
50+
namespace detail
51+
{
52+
53+
/**
54+
* Epoch-based reclamation (EBR). Reference:
55+
*
56+
* K. Fraser, Practical lock-freedom,
57+
* Technical Report UCAM-CL-TR-579, February 2004
58+
* https://www.cl.cam.ac.uk/techreports/UCAM-CL-TR-579.pdf
59+
*
60+
* Summary:
61+
*
62+
* Any workers (threads or processes) actively referencing (accessing)
63+
* the globally visible objects must do that in the critical path covered
64+
* using the dedicated function. The grace period is determined using "epochs"
65+
* implemented as a global counter (and, for example, a dedicated G/C list for
66+
* each epoch). Objects in the current global epoch can be staged for
67+
* reclamation (garbage collection). Then, the objects in the target epoch can
68+
* be reclaimed after two successful increments of the global epoch. Only three
69+
* epochs are needed (e, e-1 and e-2), therefore we use clock arithmetics.
70+
*/
71+
class ebr {
72+
using atomic = std::atomic<size_t>;
73+
using reference = std::reference_wrapper<atomic>;
74+
75+
public:
76+
class worker;
77+
78+
ebr();
79+
80+
worker register_worker();
81+
bool sync();
82+
void full_sync();
83+
size_t staging_epoch();
84+
size_t gc_epoch();
85+
86+
class worker {
87+
public:
88+
worker(const worker &w) = delete;
89+
worker(worker &&w) = default;
90+
~worker();
91+
92+
template <typename F>
93+
void critical(F &&f);
94+
95+
private:
96+
worker(ebr *e_, reference ref);
97+
98+
reference local_epoch;
99+
ebr *e;
100+
101+
friend ebr;
102+
};
103+
104+
private:
105+
static const size_t ACTIVE_FLAG = static_cast<size_t>(1)
106+
<< (sizeof(size_t) * 8 - 1);
107+
static const size_t EPOCHS_NUMBER = 3;
108+
109+
atomic global_epoch;
110+
111+
std::unordered_map<std::thread::id, atomic> workers;
112+
std::mutex mtx;
113+
};
114+
115+
/**
116+
* Default and only ebr constructor.
117+
*/
118+
ebr::ebr() : global_epoch(0)
119+
{
120+
#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
121+
VALGRIND_HG_DISABLE_CHECKING(&global_epoch, sizeof(global_epoch));
122+
#endif
123+
}
124+
125+
/**
126+
* Registers and returns a new worker, which can perform critical operations
127+
* (accessing some shared data that can be removed in other threads). There can
128+
* be only one worker per thread. The worker will be automatically unregistered
129+
* in the destructor.
130+
*
131+
* @throw runtime_error if there is already a registered worker for the current
132+
* thread.
133+
*
134+
* @return new registered worker.
135+
*/
136+
ebr::worker
137+
ebr::register_worker()
138+
{
139+
std::lock_guard<std::mutex> lock(mtx);
140+
auto res = workers.emplace(std::this_thread::get_id(), 0);
141+
if (!res.second) {
142+
throw std::runtime_error(
143+
"There can be only one worker per thread");
144+
}
145+
146+
return worker{this, reference{res.first->second}};
147+
}
148+
149+
/**
150+
* Attempts to synchronise and announce a new epoch.
151+
*
152+
* The synchronisation points must be serialized (e.g. if there are multiple G/C
153+
* workers or other writers). Generally, calls to ebr::staging_epoch() and
154+
* ebr::gc_epoch() would be a part of the same serialized path (calling sync()
155+
* and gc_epoch()/staging_epoch() concurrently in two other threads will cause
156+
* an undefined behavior).
157+
*
158+
* @return true if a new epoch is announced and false if it wasn't possible in
159+
* the current state.
160+
*/
161+
bool
162+
ebr::sync()
163+
{
164+
auto current_epoch = global_epoch.load();
165+
166+
std::lock_guard<std::mutex> lock(mtx);
167+
for (auto &w : workers) {
168+
LIBPMEMOBJ_CPP_ANNOTATE_HAPPENS_BEFORE(
169+
std::memory_order_seq_cst, &w.second);
170+
auto local_e = w.second.load();
171+
bool active = local_e & ACTIVE_FLAG;
172+
if (active && (local_e != (current_epoch | ACTIVE_FLAG))) {
173+
return false;
174+
}
175+
}
176+
177+
LIBPMEMOBJ_CPP_ANNOTATE_HAPPENS_BEFORE(std::memory_order_seq_cst,
178+
&global_epoch);
179+
global_epoch.store((current_epoch + 1) % EPOCHS_NUMBER);
180+
181+
return true;
182+
}
183+
184+
/**
185+
* Perform full synchronisation ensuring that all objects which are no longer
186+
* globally visible (and potentially staged for reclamation) at the time of
187+
* calling this routine will be safe to reclaim/destroy after this
188+
* synchronisation routine completes and returns. Note: the synchronisation may
189+
* take across multiple epochs.
190+
*/
191+
void
192+
ebr::full_sync()
193+
{
194+
size_t syncs_cnt = 0;
195+
while (true) {
196+
if (sync() && ++syncs_cnt == EPOCHS_NUMBER) {
197+
break;
198+
}
199+
}
200+
}
201+
202+
/**
203+
* Returns the epoch where objects can be staged for reclamation. This can be
204+
* used as a reference value for the pending queue/tag, used to postpone the
205+
* reclamation until this epoch becomes available for G/C. Note that this
206+
* function would normally be serialized together with the ebr::sync() calls.
207+
*
208+
* @return the epoch where objects can be staged for reclamation.
209+
*/
210+
size_t
211+
ebr::staging_epoch()
212+
{
213+
auto res = global_epoch.load();
214+
LIBPMEMOBJ_CPP_ANNOTATE_HAPPENS_AFTER(std::memory_order_seq_cst,
215+
&global_epoch);
216+
return res;
217+
}
218+
219+
/**
220+
* Returns the epoch available for reclamation, i.e. the epoch where it is
221+
* guaranteed that the objects are safe to be reclaimed/destroyed. Note that
222+
* this function would normally be serialized together with the ebr::sync()
223+
* calls.
224+
*
225+
* @return the epoch available for reclamation.
226+
*/
227+
size_t
228+
ebr::gc_epoch()
229+
{
230+
auto res = (global_epoch.load() + 1) % EPOCHS_NUMBER;
231+
LIBPMEMOBJ_CPP_ANNOTATE_HAPPENS_AFTER(std::memory_order_seq_cst,
232+
&global_epoch);
233+
return res;
234+
}
235+
236+
ebr::worker::worker(ebr *e_, reference ref) : local_epoch(ref), e(e_)
237+
{
238+
#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
239+
VALGRIND_HG_DISABLE_CHECKING(&ref.get(), sizeof(ref.get()));
240+
#endif
241+
}
242+
243+
/**
244+
* Unregisters the worker from the list of the workers in the ebr. All workers
245+
* should be destroyed before the destruction of ebr object.
246+
*/
247+
ebr::worker::~worker()
248+
{
249+
std::lock_guard<std::mutex> lock(e->mtx);
250+
e->workers.erase(std::this_thread::get_id());
251+
}
252+
253+
/**
254+
* Performs critical operations. Typically, this would be used by the readers
255+
* when accessing some shared data. Reclamation of objects is guaranteed not to
256+
* occur in the critical path.
257+
*
258+
* @param[in] f the function which will be executed as a critical operation.
259+
* This function's signature should be void().
260+
*/
261+
template <typename F>
262+
void
263+
ebr::worker::critical(F &&f)
264+
{
265+
auto new_epoch = e->global_epoch.load() | ACTIVE_FLAG;
266+
LIBPMEMOBJ_CPP_ANNOTATE_HAPPENS_AFTER(std::memory_order_seq_cst,
267+
&(e->global_epoch));
268+
269+
local_epoch.get().store(new_epoch);
270+
LIBPMEMOBJ_CPP_ANNOTATE_HAPPENS_AFTER(std::memory_order_seq_cst,
271+
&local_epoch);
272+
273+
f();
274+
275+
local_epoch.get().store(0);
276+
}
277+
278+
} /* namespace detail */
279+
280+
} /* namespace pmem */
281+
282+
#endif /* LIBPMEMOBJ_EBR_HPP */

0 commit comments

Comments
 (0)