Skip to content

Commit 886cc48

Browse files
author
yannan.wyn
committed
fix(bthread): refactor sharded priority queue with per-ED shard
Each EventDispatcher gets its own WorkStealingQueue, making concurrent push from multiple EDs naturally SPMC-safe without spinlocks.
1 parent 0565d8d commit 886cc48

9 files changed

Lines changed: 299 additions & 12 deletions

src/brpc/event_dispatcher.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "butil/third_party/murmurhash3/murmurhash3.h"// fmix32
2424
#include "bvar/latency_recorder.h" // bvar::LatencyRecorder
2525
#include "bthread/bthread.h" // bthread_start_background
26+
#include "bthread/task_group.h" // TaskGroup::address_meta
2627
#include "brpc/event_dispatcher.h"
2728

2829
DECLARE_int32(task_group_ntags);
@@ -68,6 +69,7 @@ void InitializeGlobalDispatchers() {
6869
bthread_attr_t attr =
6970
FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
7071
attr.tag = (BTHREAD_TAG_DEFAULT + i) % FLAGS_task_group_ntags;
72+
g_edisp[i * FLAGS_event_dispatcher_num + j].set_priority_index(j);
7173
CHECK_EQ(0, g_edisp[i * FLAGS_event_dispatcher_num + j].Start(&attr));
7274
}
7375
}

src/brpc/event_dispatcher.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ template <typename T> friend class IOEvent;
114114
// Stop bthread of this dispatcher.
115115
void Stop();
116116

117+
void set_priority_index(int idx) { _priority_index = idx; }
118+
117119
// Suspend calling thread until bthread of this dispatcher stops.
118120
void Join();
119121

@@ -188,6 +190,8 @@ template <typename T> friend class IOEvent;
188190

189191
// Pipe fds to wakeup EventDispatcher from `epoll_wait' in order to quit
190192
int _wakeup_fds[2];
193+
194+
int _priority_index{-1};
191195
};
192196

193197
EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag);

src/brpc/event_dispatcher_epoll.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,13 @@ int EventDispatcher::RemoveConsumer(int fd) {
190190
}
191191

192192
void* EventDispatcher::RunThis(void* arg) {
193-
((EventDispatcher*)arg)->Run();
193+
EventDispatcher* ed = (EventDispatcher*)arg;
194+
if (ed->_priority_index >= 0) {
195+
bthread::TaskMeta* meta =
196+
bthread::TaskGroup::address_meta(bthread_self());
197+
meta->priority_index = ed->_priority_index;
198+
}
199+
ed->Run();
194200
return NULL;
195201
}
196202

src/bthread/task_control.cpp

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ DEFINE_string(cpu_set, "",
5151
"Set of CPUs to which cores are bound. "
5252
"for example, 0-3,5,7; default: disable");
5353

54+
namespace brpc { DECLARE_int32(event_dispatcher_num); }
55+
5456
namespace bthread {
5557

5658
DEFINE_bool(parking_lot_no_signal_when_no_waiter, false,
@@ -205,11 +207,28 @@ TaskControl::TaskControl()
205207
, _status(print_rq_sizes_in_the_tc, this)
206208
, _nbthreads("bthread_count")
207209
, _enable_priority_queue(FLAGS_enable_bthread_priority_queue)
208-
, _priority_queues(FLAGS_task_group_ntags)
210+
, _pq_num_of_each_tag(brpc::FLAGS_event_dispatcher_num)
211+
, _priority_queues(FLAGS_task_group_ntags * brpc::FLAGS_event_dispatcher_num)
209212
, _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag)
210213
, _tagged_pl(FLAGS_task_group_ntags)
211214
{}
212215

216+
int TaskControl::init_priority_queues() {
217+
if (!_enable_priority_queue) {
218+
return 0;
219+
}
220+
for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
221+
for (int j = 0; j < _pq_num_of_each_tag; ++j) {
222+
if (priority_queue(i, j).init(BTHREAD_MAX_CONCURRENCY) != 0) {
223+
LOG(ERROR) << "Fail to init priority queue for tag=" << i
224+
<< " ed=" << j;
225+
return -1;
226+
}
227+
}
228+
}
229+
return 0;
230+
}
231+
213232
int TaskControl::init(int concurrency) {
214233
if (_concurrency != 0) {
215234
LOG(ERROR) << "Already initialized";
@@ -238,10 +257,10 @@ int TaskControl::init(int concurrency) {
238257
_tagged_worker_usage_second.push_back(new bvar::PerSecond<bvar::PassiveStatus<double>>(
239258
"bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i], 1));
240259
_tagged_nbthreads.push_back(new bvar::Adder<int64_t>("bthread_count", tag_str));
241-
if (_priority_queues[i].init(BTHREAD_MAX_CONCURRENCY) != 0) {
242-
LOG(ERROR) << "Fail to init _priority_q";
243-
return -1;
244-
}
260+
}
261+
262+
if (init_priority_queues() != 0) {
263+
return -1;
245264
}
246265

247266
// Make sure TimerThread is ready.
@@ -445,7 +464,7 @@ TaskControl::~TaskControl() {
445464
_switch_per_second.hide();
446465
_signal_per_second.hide();
447466
_status.hide();
448-
467+
449468
stop_and_join();
450469
}
451470

@@ -528,8 +547,12 @@ int TaskControl::_destroy_group(TaskGroup* g) {
528547
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
529548
auto tag = tls_task_group->tag();
530549

531-
if (_priority_queues[tag].steal(tid)) {
532-
return true;
550+
if (_enable_priority_queue) {
551+
for (int i = 0; i < _pq_num_of_each_tag; ++i) {
552+
if (priority_queue(tag, i).steal(tid)) {
553+
return true;
554+
}
555+
}
533556
}
534557

535558
// 1: Acquiring fence is paired with releasing fence in _add_group to
@@ -689,4 +712,5 @@ std::vector<bthread_t> TaskControl::get_living_bthreads() {
689712
return living_bthread_ids;
690713
}
691714

715+
692716
} // namespace bthread

src/bthread/task_control.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,12 @@ friend bthread_t init_for_pthread_stack_trace();
101101
std::string stack_trace(bthread_t tid);
102102
#endif // BRPC_BTHREAD_TRACER
103103

104-
void push_priority_queue(bthread_tag_t tag, bthread_t tid) {
105-
_priority_queues[tag].push(tid);
104+
void push_priority_queue(bthread_tag_t tag, int priority_index, bthread_t tid) {
105+
priority_queue(tag, priority_index).push(tid);
106106
}
107107

108108
std::vector<bthread_t> get_living_bthreads();
109+
109110
private:
110111
typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;
111112
typedef std::array<ParkingLot, BTHREAD_MAX_PARKINGLOT> TaggedParkingLot;
@@ -123,6 +124,13 @@ friend bthread_t init_for_pthread_stack_trace();
123124
// Tag parking slot
124125
TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _tagged_pl[tag]; }
125126

127+
// Priority queue for a specific ED within a tag
128+
WorkStealingQueue<bthread_t>& priority_queue(bthread_tag_t tag, int index) {
129+
return _priority_queues[tag * _pq_num_of_each_tag + index];
130+
}
131+
132+
int init_priority_queues();
133+
126134
static void delete_task_group(void* arg);
127135

128136
static void* worker_thread(void* task_control);
@@ -164,6 +172,7 @@ friend bthread_t init_for_pthread_stack_trace();
164172
std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;
165173

166174
bool _enable_priority_queue;
175+
int _pq_num_of_each_tag;
167176
std::vector<WorkStealingQueue<bthread_t>> _priority_queues;
168177

169178
size_t _pl_num_of_each_tag;

src/bthread/task_group.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,7 @@ void TaskGroup::ending_sched(TaskGroup** pg) {
640640
TaskGroup* g = *pg;
641641
bthread_t next_tid = 0;
642642
// Find next task to run, if none, switch to idle thread of the group.
643+
643644
#ifndef BTHREAD_FAIR_WSQ
644645
// When BTHREAD_FAIR_WSQ is defined, profiling shows that cpu cost of
645646
// WSQ::steal() in example/multi_threaded_echo_c++ changes from 1.9%
@@ -908,7 +909,8 @@ void TaskGroup::priority_to_run(void* args_in) {
908909
tls_task_group->_control->_task_tracer.set_status(
909910
TASK_STATUS_READY, args->meta);
910911
#endif // BRPC_BTHREAD_TRACER
911-
return tls_task_group->control()->push_priority_queue(args->tag, args->meta->tid);
912+
return tls_task_group->control()->push_priority_queue(
913+
args->tag, args->meta->priority_index, args->meta->tid);
912914
}
913915

914916
struct SleepArgs {

src/bthread/task_meta.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ struct TaskMeta {
8888
// simplified if they can get tid from TaskMeta.
8989
bthread_t tid{INVALID_BTHREAD};
9090

91+
int priority_index{-1};
92+
9193
// User function and argument
9294
void* (*fn)(void*){NULL};
9395
void* arg{NULL};

test/BUILD.bazel

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,8 @@ cc_test(
240240
"bthread_butex_multi_tag_unittest.cpp",
241241
"bthread_rwlock_unittest.cpp",
242242
"bthread_semaphore_unittest.cpp",
243+
# Have custom main() that conflicts with gtest_main
244+
"bthread_priority_queue_unittest.cpp",
243245
],
244246
),
245247
copts = COPTS,
@@ -252,6 +254,17 @@ cc_test(
252254
],
253255
)
254256

257+
cc_test(
258+
name = "bthread_priority_queue_test",
259+
srcs = ["bthread_priority_queue_unittest.cpp"],
260+
copts = COPTS,
261+
deps = [
262+
":sstream_workaround",
263+
"//:brpc",
264+
"@com_google_googletest//:gtest",
265+
],
266+
)
267+
255268
cc_test(
256269
name = "brpc_prometheus_test",
257270
srcs = glob(

0 commit comments

Comments
 (0)