-
Notifications
You must be signed in to change notification settings - Fork 4.1k
Support progressive read timeout bthread handler #3163
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 6 commits
a5cdfc1
ae7bc0f
e41c37b
f08f266
f373b45
9bd8016
8830eab
6f301da
f46c3b3
cbdbb52
c3b095f
6c06fd4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -95,7 +95,8 @@ DEFINE_bool(graceful_quit_on_sigterm, false, | |
| "Register SIGTERM handle func to quit graceful"); | ||
| DEFINE_bool(graceful_quit_on_sighup, false, | ||
| "Register SIGHUP handle func to quit graceful"); | ||
|
|
||
| DEFINE_bool(log_idle_progressive_read_close, false, | ||
| "Print log when an idle progressive read is closed"); | ||
| const IdlNames idl_single_req_single_res = { "req", "res" }; | ||
| const IdlNames idl_single_req_multi_res = { "req", "" }; | ||
| const IdlNames idl_multi_req_single_res = { "", "res" }; | ||
|
|
@@ -174,6 +175,86 @@ class IgnoreAllRead : public ProgressiveReader { | |
| void OnEndOfMessage(const butil::Status&) {} | ||
| }; | ||
|
|
||
| class ProgressiveTimeoutReader : public ProgressiveReader { | ||
| public: | ||
| explicit ProgressiveTimeoutReader(SocketId id, int32_t read_timeout_ms, ProgressiveReader* reader): | ||
| _socket_id(id), | ||
| _read_timeout_ms(read_timeout_ms), | ||
| _reader(reader), | ||
| _timeout_id(0), | ||
| _latest_add_timer_ms(0), | ||
| _add_timer_delay_ms(1000), | ||
| _is_read_timeout(false) { | ||
| AddIdleReadTimeoutMonitor(); | ||
| } | ||
| butil::Status OnReadOnePart(const void* data, size_t length) { | ||
| auto status = _reader->OnReadOnePart(data, length); | ||
| AddIdleReadTimeoutMonitor(); | ||
| return status; | ||
| } | ||
| void OnEndOfMessage(const butil::Status& status) { | ||
| if (_is_read_timeout) { | ||
| _reader->OnEndOfMessage(butil::Status(ECONNRESET, "The progressive read timeout")); | ||
| } else { | ||
| _reader->OnEndOfMessage(status); | ||
| } | ||
| if(_timeout_id > 0) { | ||
| bthread_timer_del(_timeout_id); | ||
| } | ||
| } | ||
|
|
||
| SocketId GetSocketId() { | ||
| return _socket_id; | ||
| } | ||
|
|
||
| int32_t read_timeout_ms() { | ||
| return _read_timeout_ms; | ||
| } | ||
|
|
||
| void set_read_timeout(bool read_timeout = true) { | ||
| _is_read_timeout = read_timeout; | ||
| } | ||
|
|
||
| private: | ||
| void AddToTimer() { | ||
| if (_timeout_id > 0) { | ||
| bthread_timer_del(_timeout_id); | ||
| } | ||
| bthread_timer_add(&_timeout_id, | ||
| butil::milliseconds_from_now(_read_timeout_ms), | ||
| Controller::HandleIdleProgressiveReader, | ||
|
zchuango marked this conversation as resolved.
Outdated
|
||
| this | ||
| ); | ||
| } | ||
|
|
||
| void AddIdleReadTimeoutMonitor() { | ||
| if (_read_timeout_ms <= 0) { | ||
| return; | ||
| } | ||
| if(_read_timeout_ms < _add_timer_delay_ms) { | ||
| AddToTimer(); | ||
| return; | ||
| } | ||
| auto current_ms = butil::cpuwide_time_ms(); | ||
| if (current_ms - _latest_add_timer_ms < _add_timer_delay_ms) { | ||
| return; | ||
| } | ||
| _latest_add_timer_ms = current_ms; | ||
| AddToTimer(); | ||
|
zchuango marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
| private: | ||
| SocketId _socket_id; | ||
| int32_t _read_timeout_ms; | ||
| ProgressiveReader* _reader; | ||
| // Timer registered to trigger progressive timeout event | ||
| bthread_timer_t _timeout_id; | ||
| int64_t _latest_add_timer_ms; | ||
| // avoid frequently add timer for idle handler | ||
| int32_t _add_timer_delay_ms; | ||
| bool _is_read_timeout; | ||
| }; | ||
|
|
||
| static IgnoreAllRead* s_ignore_all_read = NULL; | ||
| static pthread_once_t s_ignore_all_read_once = PTHREAD_ONCE_INIT; | ||
| static void CreateIgnoreAllRead() { s_ignore_all_read = new IgnoreAllRead; } | ||
|
|
@@ -331,6 +412,15 @@ void Controller::Call::Reset() { | |
| stream_user_data = NULL; | ||
| } | ||
|
|
||
| void Controller::set_progressive_read_timeout_ms(int32_t progressive_read_timeout_ms){ | ||
| if(progressive_read_timeout_ms <= 0x7fffffff){ | ||
| _progressive_read_timeout_ms = progressive_read_timeout_ms; | ||
| } else { | ||
| _progressive_read_timeout_ms = 0x7fffffff; | ||
| LOG(WARNING) << "progressive_read_timeout_seconds is limited to 0x7fffffff"; | ||
| } | ||
| } | ||
|
|
||
| void Controller::set_timeout_ms(int64_t timeout_ms) { | ||
| if (timeout_ms <= 0x7fffffff) { | ||
| _timeout_ms = timeout_ms; | ||
|
|
@@ -1028,6 +1118,44 @@ void Controller::SubmitSpan() { | |
| _span = NULL; | ||
| } | ||
|
|
||
| void Controller::HandleIdleProgressiveReader(void* arg) { | ||
|
zchuango marked this conversation as resolved.
Outdated
|
||
| if(arg == nullptr){ | ||
| LOG(ERROR) << "Controller::HandleIdleProgressiveReader arg is null."; | ||
| return; | ||
| } | ||
| ProgressiveTimeoutReader* reader = static_cast<ProgressiveTimeoutReader*>(arg); | ||
| SocketUniquePtr s; | ||
| if (Socket::Address(reader->GetSocketId(), &s) != 0) { | ||
| LOG(ERROR) << "not found the socket id : " << reader->GetSocketId(); | ||
| return; | ||
| } | ||
| auto log_idle = FLAGS_log_idle_progressive_read_close; | ||
| int64_t progressive_read_timeout_us = reader->read_timeout_ms() * 1000; | ||
| int64_t pre_idle_duration_us = 0; | ||
| int64_t idle_duration_us = butil::cpuwide_time_us() - s->last_active_time_us(); | ||
| while (progressive_read_timeout_us > idle_duration_us && idle_duration_us > pre_idle_duration_us) { | ||
| auto sleep_ms = (progressive_read_timeout_us - idle_duration_us) / 1000; | ||
| bthread_usleep(sleep_ms > 0 ? sleep_ms : 1); | ||
| pre_idle_duration_us = idle_duration_us; | ||
| idle_duration_us = butil::cpuwide_time_us() - s->last_active_time_us(); | ||
| } | ||
| if (idle_duration_us <= pre_idle_duration_us) { | ||
| LOG_IF(INFO, log_idle) << "stop progressive read timeout checking process!" | ||
| << " progressive_read_timeout_us : " << progressive_read_timeout_us | ||
| << " idle_duration_us : " << idle_duration_us | ||
| << " pre_idle_duration_us : " << pre_idle_duration_us; | ||
| return; | ||
| } | ||
| reader->set_read_timeout(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a thread safety issue here. The process will crash if the reader has already been destructed.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This issue deserves attention.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This problem doesn't seem to be resolved yet. |
||
| LOG_IF(INFO, log_idle) << "progressive read timeout socket id : " << reader->GetSocketId() | ||
| << " progressive read timeout us : " << progressive_read_timeout_us | ||
| << " progressive read idle duration : " << idle_duration_us; | ||
| if (s->parsing_context() != NULL) { | ||
| s->parsing_context()->Destroy(); | ||
| } | ||
| s->ReleaseReferenceIfIdle(0); | ||
| } | ||
|
|
||
| void Controller::HandleSendFailed() { | ||
| if (!FailedInline()) { | ||
| SetFailed("Must be SetFailed() before calling HandleSendFailed()"); | ||
|
|
@@ -1543,6 +1671,10 @@ void Controller::ReadProgressiveAttachmentBy(ProgressiveReader* r) { | |
| __FUNCTION__)); | ||
| } | ||
| add_flag(FLAGS_PROGRESSIVE_READER); | ||
| if (progressive_read_timeout_ms() > 0) { | ||
| auto reader = new ProgressiveTimeoutReader(_rpa->GetSocketId(), _progressive_read_timeout_ms, r); | ||
|
zchuango marked this conversation as resolved.
|
||
| return _rpa->ReadProgressiveAttachmentBy(reader); | ||
| } | ||
| return _rpa->ReadProgressiveAttachmentBy(r); | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.