Skip to content
This repository was archived by the owner on Mar 22, 2023. It is now read-only.

Commit 485353b

Browse files
authored
Merge pull request #1062 from igchor/fix_parallel_xexec
Make syncthreads() in parallel_xexec multi-use
2 parents 2bc7289 + 79ed022 commit 485353b

5 files changed

Lines changed: 96 additions & 16 deletions

File tree

tests/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ else()
149149
skip_test("aggregate_initialization" "SKIPPED_BECAUSE_OF_NO_COMPILER_SUPPORT")
150150
endif()
151151

152+
build_test(helpers_test common/helpers_test.cpp)
153+
add_test_generic(NAME helpers_test TRACERS none memcheck)
154+
152155
build_test(allocator allocator/allocator.cpp)
153156
add_test_generic(NAME allocator TRACERS none memcheck pmemcheck)
154157

tests/common/helpers_test.cpp

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// SPDX-License-Identifier: BSD-3-Clause
2+
/* Copyright 2021, Intel Corporation */
3+
4+
/*
5+
* helpers_test - test for all helper classes/functions used in testing
6+
* framework
7+
*/
8+
9+
#include "thread_helpers.hpp"
10+
#include "unittest.hpp"
11+
12+
#include <atomic>
13+
14+
static size_t const concurrency = 4;
15+
16+
/* Verify if syncthreads is working correctly */
17+
static void
18+
test()
19+
{
20+
std::atomic<int> counter;
21+
counter = 0;
22+
23+
parallel_xexec(concurrency,
24+
[&](size_t id, std::function<void()> syncthreads) {
25+
counter++;
26+
27+
syncthreads();
28+
UT_ASSERTeq(counter.load(), concurrency);
29+
syncthreads();
30+
31+
counter++;
32+
33+
syncthreads();
34+
UT_ASSERTeq(counter.load(), concurrency * 2);
35+
syncthreads();
36+
UT_ASSERTeq(counter.load(), concurrency * 2);
37+
});
38+
}
39+
40+
int
41+
main()
42+
{
43+
return run_test(test);
44+
}

tests/common/thread_helpers.hpp

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
// SPDX-License-Identifier: BSD-3-Clause
2-
/* Copyright 2020, Intel Corporation */
2+
/* Copyright 2020-2021, Intel Corporation */
33
#ifndef THREAD_HELPERS_COMMON_HPP
44
#define THREAD_HELPERS_COMMON_HPP
55

@@ -25,31 +25,58 @@ parallel_exec(size_t concurrency, Function f)
2525
}
2626
}
2727

28+
class latch {
29+
public:
30+
latch(size_t desired) : counter(desired)
31+
{
32+
}
33+
34+
/* Returns true for the last thread arriving at the latch, false for all
35+
* other threads. */
36+
bool
37+
wait(std::unique_lock<std::mutex> &lock)
38+
{
39+
counter--;
40+
if (counter > 0) {
41+
cv.wait(lock, [&] { return counter == 0; });
42+
return false;
43+
} else {
44+
/*
45+
* notify_call could be called outside of a lock
46+
* (it would perform better) but drd complains
47+
* in that case
48+
*/
49+
cv.notify_all();
50+
return true;
51+
}
52+
}
53+
54+
private:
55+
std::condition_variable cv;
56+
size_t counter = 0;
57+
};
58+
2859
/*
2960
* This function executes 'concurrency' threads and provides
30-
* 'syncthreads' method (synchronization barrier) for f()
61+
* 'syncthreads' method (multi-use synchronization barrier) for f()
3162
*/
3263
template <typename Function>
3364
void
3465
parallel_xexec(size_t concurrency, Function f)
3566
{
36-
std::condition_variable cv;
3767
std::mutex m;
38-
std::unique_ptr<size_t> counter =
39-
std::unique_ptr<size_t>(new size_t(0));
68+
std::shared_ptr<latch> current_latch =
69+
std::shared_ptr<latch>(new latch(concurrency));
4070

71+
/* Implements multi-use barrier (latch). Once all threads arrive at the
72+
* latch, a new latch is allocated and used by all subsequent calls to
73+
* syncthreads. */
4174
auto syncthreads = [&] {
4275
std::unique_lock<std::mutex> lock(m);
43-
(*counter)++;
44-
if (*counter < concurrency)
45-
cv.wait(lock, [&] { return *counter >= concurrency; });
46-
else
47-
/*
48-
* notify_call could be called outside of a lock
49-
* (it would perform better) but drd complains
50-
* in that case
51-
*/
52-
cv.notify_all();
76+
auto l = current_latch;
77+
if (l->wait(lock))
78+
current_latch =
79+
std::shared_ptr<latch>(new latch(concurrency));
5380
};
5481

5582
parallel_exec(concurrency, [&](size_t tid) { f(tid, syncthreads); });

tests/drd.supp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
Conditional variable destruction false-positive
3+
drd:CondErr
4+
fun:pthread_cond_destroy_intercept
5+
fun:pthread_cond_destroy@*
6+
}

tests/helpers.cmake

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ function(execute_common expect_success output_file name)
110110
set(TRACE valgrind --error-exitcode=99 --tool=helgrind)
111111
set(ENV{LIBPMEMOBJ_CPP_TRACER_HELGRIND} 1)
112112
elseif(${TRACER} STREQUAL drd)
113-
set(TRACE valgrind --error-exitcode=99 --tool=drd)
113+
set(TRACE valgrind --error-exitcode=99 --tool=drd --suppressions=${TEST_ROOT_DIR}/drd.supp)
114114
set(ENV{LIBPMEMOBJ_CPP_TRACER_DRD} 1)
115115
elseif(${TRACER} STREQUAL gdb)
116116
set(TRACE gdb --batch --command=${GDB_BATCH_FILE} --args)

0 commit comments

Comments
 (0)