Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions example/http_c++/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/channel.h>
#include "bthread/countdown_event.h"

DEFINE_string(d, "", "POST this data to the http server");
DEFINE_bool(progressive, false, "whether or not progressive read data from server");
DEFINE_int32(progressive_read_timeout_ms, 5000, "progressive read data idle timeout in milliseconds");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 2000, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
Expand All @@ -36,6 +39,25 @@ namespace brpc {
DECLARE_bool(http_verbose);
}

class PartDataReader: public brpc::ProgressiveReader {
public:
explicit PartDataReader(bthread::CountdownEvent* done): _done(done){}

butil::Status OnReadOnePart(const void* data, size_t length) {
memcpy(_buffer, data, length);
LOG(INFO) << "data : " << _buffer << " size : " << length;
return butil::Status::OK();
}

void OnEndOfMessage(const butil::Status& status) {
_done->signal();
LOG(INFO) << "progressive read data final status : " << status;
}
private:
char _buffer[1024];
bthread::CountdownEvent* _done;
};

int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
Expand Down Expand Up @@ -71,13 +93,25 @@ int main(int argc, char* argv[]) {
cntl.request_attachment().append(FLAGS_d);
}

if (FLAGS_progressive) {
cntl.set_progressive_read_timeout_ms(FLAGS_progressive_read_timeout_ms);
cntl.response_will_be_read_progressively();
}

// Because `done'(last parameter) is NULL, this function waits until
// the response comes back or error occurs(including timedout).
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
std::cerr << cntl.ErrorText() << std::endl;
return -1;
}

if (FLAGS_progressive) {
bthread::CountdownEvent done(1);
cntl.ReadProgressiveAttachmentBy(new PartDataReader(&done));
done.wait();
LOG(INFO) << "wait client progressive read done safely";
}
// If -http_verbose is on, brpc already prints the response to stderr.
if (!brpc::FLAGS_http_verbose) {
std::cout << cntl.response_attachment() << std::endl;
Expand Down
7 changes: 7 additions & 0 deletions example/http_c++/http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
DEFINE_string(certificate, "cert.pem", "Certificate file path to enable SSL");
DEFINE_string(private_key, "key.pem", "Private key file path to enable SSL");
DEFINE_string(ciphers, "", "Cipher suite used for SSL connections");
DEFINE_bool(enable_progressive_timeout, false, "whether or not trigger progressive write attachement data timeout");

namespace example {

Expand Down Expand Up @@ -104,6 +105,9 @@ class FileServiceImpl : public FileService {

// sleep a while to send another part.
bthread_usleep(10000);
if (FLAGS_enable_progressive_timeout && i > 50) {
bthread_usleep(100000000UL);
}
}
return NULL;
}
Expand Down Expand Up @@ -194,6 +198,9 @@ class HttpSSEServiceImpl : public HttpSSEService {

// sleep a while to send another part.
bthread_usleep(10000 * 10);
if (FLAGS_enable_progressive_timeout && i > 50) {
bthread_usleep(100000000UL);
}
}
return NULL;
}
Expand Down
123 changes: 122 additions & 1 deletion src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ DEFINE_bool(graceful_quit_on_sigterm, false,
"Register SIGTERM handle func to quit graceful");
DEFINE_bool(graceful_quit_on_sighup, false,
"Register SIGHUP handle func to quit graceful");

DEFINE_bool(log_idle_progressive_read_close, false,
"Print log when an idle progressive read is closed");
const IdlNames idl_single_req_single_res = { "req", "res" };
const IdlNames idl_single_req_multi_res = { "req", "" };
const IdlNames idl_multi_req_single_res = { "", "res" };
Expand Down Expand Up @@ -174,6 +175,61 @@ class IgnoreAllRead : public ProgressiveReader {
void OnEndOfMessage(const butil::Status&) {}
};

class ProgressiveTimeoutRead : public ProgressiveReader {
Comment thread
zchuango marked this conversation as resolved.
Outdated
public:
explicit ProgressiveTimeoutRead(Controller* cntl, ProgressiveReader* reader):
_cntl(cntl), _reader(reader), _timeout_id(0), _latest_add_timer_ms(0), _add_timer_delay_ms(1000) {
AddIdleReadTimeoutMonitor();
}
butil::Status OnReadOnePart(const void* data, size_t length) {
auto status = _reader->OnReadOnePart(data, length);
AddIdleReadTimeoutMonitor();
return status;
}
void OnEndOfMessage(const butil::Status& status) {
_reader->OnEndOfMessage(status);
if(_timeout_id > 0) {
bthread_timer_del(_timeout_id);
}
}
private:
void AddToTimer() {
if (_timeout_id > 0) {
bthread_timer_del(_timeout_id);
}
bthread_timer_add(&_timeout_id,
butil::milliseconds_from_now(_cntl->progressive_read_timeout_ms()),
Controller::HandleIdleProgressiveReader,
Comment thread
zchuango marked this conversation as resolved.
Outdated
_cntl
);
}

void AddIdleReadTimeoutMonitor() {
if (_cntl->progressive_read_timeout_ms() <= 0) {
return;
}
if(_cntl->progressive_read_timeout_ms() < _add_timer_delay_ms) {
AddToTimer();
return;
}
auto current_ms = butil::cpuwide_time_ms();
if (current_ms - _latest_add_timer_ms < _add_timer_delay_ms) {
return;
}
_latest_add_timer_ms = current_ms;
AddToTimer();
Comment thread
zchuango marked this conversation as resolved.
Outdated
}

private:
Controller* _cntl;
ProgressiveReader* _reader;
// Timer registered to trigger progressive timeout event
bthread_timer_t _timeout_id;
int64_t _latest_add_timer_ms;
// avoid frequently add timer for idle handler
int32_t _add_timer_delay_ms;
};

static IgnoreAllRead* s_ignore_all_read = NULL;
static pthread_once_t s_ignore_all_read_once = PTHREAD_ONCE_INIT;
static void CreateIgnoreAllRead() { s_ignore_all_read = new IgnoreAllRead; }
Expand Down Expand Up @@ -331,6 +387,15 @@ void Controller::Call::Reset() {
stream_user_data = NULL;
}

void Controller::set_progressive_read_timeout_ms(int32_t progressive_read_timeout_ms){
if(progressive_read_timeout_ms <= 0x7fffffff){
_progressive_read_timeout_ms = progressive_read_timeout_ms;
} else {
_progressive_read_timeout_ms = 0x7fffffff;
LOG(WARNING) << "progressive_read_timeout_seconds is limited to 0x7fffffff";
}
}

void Controller::set_timeout_ms(int64_t timeout_ms) {
if (timeout_ms <= 0x7fffffff) {
_timeout_ms = timeout_ms;
Expand Down Expand Up @@ -1027,6 +1092,54 @@ void Controller::SubmitSpan() {
_span = NULL;
}

void Controller::HandleIdleProgressiveReader(void* arg) {
Comment thread
zchuango marked this conversation as resolved.
Outdated
if(arg == nullptr){
LOG(ERROR) << "Controller::HandleIdleProgressiveReader arg is null.";
return;
}
auto* cntl = static_cast<Controller*>(arg);
auto log_idle = FLAGS_log_idle_progressive_read_close;
int64_t progressive_read_timeout_us = cntl->_progressive_read_timeout_ms * 1000;
std::vector<SocketId> remove_socket_ids;
butil::AutoLock guard(cntl->_progressive_read_lock);
auto socketIds = cntl->_checking_progressive_read_fds;
for (auto socket_id : socketIds){
SocketUniquePtr s;
if (Socket::Address(socket_id, &s) == 0) {
int64_t pre_idle_duration_us = 0;
int64_t idle_duration_us = butil::cpuwide_time_us() - s->last_active_time_us();
while (progressive_read_timeout_us > idle_duration_us && idle_duration_us > pre_idle_duration_us) {
auto sleep_ms = (progressive_read_timeout_us - idle_duration_us) / 1000;
bthread_usleep(sleep_ms > 0 ? sleep_ms : 1);
Comment thread
zchuango marked this conversation as resolved.
Outdated
pre_idle_duration_us = idle_duration_us;
idle_duration_us = butil::cpuwide_time_us() - s->last_active_time_us();
}
if (idle_duration_us <= pre_idle_duration_us) {
LOG_IF(INFO, log_idle) << "stop pgressive read timeout checking process!"
<< " progressive_read_timeout_us : " << progressive_read_timeout_us
<< " idle_duration_us : " << idle_duration_us
<< " pre_idle_duration_us : " << pre_idle_duration_us;
return;
}
LOG_IF(INFO, log_idle) << "progressive read timeout socket id : " << socket_id
<< " progressive read timeout us : " << progressive_read_timeout_us
<< " progressive read idle duration : " << idle_duration_us;
if (s->parsing_context() != NULL) {
s->parsing_context()->Destroy();
}
s->ReleaseReferenceIfIdle(0);
Comment thread
zchuango marked this conversation as resolved.
Outdated
cntl->CloseConnection("progressive read timeout");
remove_socket_ids.push_back(socket_id);
} else {
LOG(ERROR) << "not found the socket id : " << socket_id;
remove_socket_ids.push_back(socket_id);
}
}
for (auto remove_socket_id : remove_socket_ids) {
socketIds.erase(remove_socket_id);
}
}

void Controller::HandleSendFailed() {
if (!FailedInline()) {
SetFailed("Must be SetFailed() before calling HandleSendFailed()");
Expand Down Expand Up @@ -1179,6 +1292,11 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
// Tag the socket so that when the response comes back, the parser will
// stop before reading all body.
_current_call.sending_sock->read_will_be_progressive(_connection_type);
auto socket_id = _current_call.sending_sock->id();
butil::AutoLock guard(_progressive_read_lock);
if (_progressive_read_timeout_ms > 0 && _checking_progressive_read_fds.seek(socket_id) == NULL) {
_checking_progressive_read_fds.insert(socket_id);
}
}

// Handle authentication
Expand Down Expand Up @@ -1542,6 +1660,9 @@ void Controller::ReadProgressiveAttachmentBy(ProgressiveReader* r) {
__FUNCTION__));
}
add_flag(FLAGS_PROGRESSIVE_READER);
if (progressive_read_timeout_ms() > 0) {
return _rpa->ReadProgressiveAttachmentBy(new ProgressiveTimeoutRead(this, r));
Comment thread
zchuango marked this conversation as resolved.
Outdated
}
return _rpa->ReadProgressiveAttachmentBy(r);
}

Expand Down
13 changes: 10 additions & 3 deletions src/brpc/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
#include "brpc/grpc.h"
#include "brpc/kvmap.h"
#include "brpc/rpc_dump.h"

// EAUTH is defined in MAC
#ifndef EAUTH
#define EAUTH ERPCAUTH
Expand Down Expand Up @@ -163,7 +162,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
uint64_t log_id;
std::string request_id;
};

static void HandleIdleProgressiveReader(void* arg);
public:
Controller();
Controller(const Inheritable& parent_ctx);
Expand All @@ -177,6 +176,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);

// Set/get timeout in milliseconds for the RPC call. Use
// ChannelOptions.timeout_ms on unset.
void set_progressive_read_timeout_ms(int32_t progressive_read_timeout_ms);
int32_t progressive_read_timeout_ms() const { return _progressive_read_timeout_ms; }

void set_timeout_ms(int64_t timeout_ms);
int64_t timeout_ms() const { return _timeout_ms; }

Expand Down Expand Up @@ -323,7 +325,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);

// Make the RPC end when the HTTP response has complete headers and let
// user read the remaining body by using ReadProgressiveAttachmentBy().
void response_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); }
void response_will_be_read_progressively() {
add_flag(FLAGS_READ_PROGRESSIVELY);
}
// Make the RPC end when the HTTP request has complete headers and let
// user read the remaining body by using ReadProgressiveAttachmentBy().
void request_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); }
Expand Down Expand Up @@ -837,6 +841,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
int32_t _timeout_ms;
int32_t _connect_timeout_ms;
int32_t _backup_request_ms;
int32_t _progressive_read_timeout_ms;
butil::FlatSet<SocketId> _checking_progressive_read_fds;
Comment thread
zchuango marked this conversation as resolved.
Outdated
mutable butil::Lock _progressive_read_lock;
// Priority: `_backup_request_policy' > `_backup_request_ms'.
BackupRequestPolicy* _backup_request_policy;
// If this rpc call has retry/backup request,this var save the real timeout for current call
Expand Down
Loading