Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions example/http_c++/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/channel.h>
#include "bthread/countdown_event.h"

DEFINE_string(d, "", "POST this data to the http server");
DEFINE_bool(progressive, false, "whether or not progressive read data from server");
DEFINE_int32(progressive_read_timeout_ms, 5000, "progressive read data idle timeout in milliseconds");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 2000, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
Expand All @@ -36,6 +39,25 @@ namespace brpc {
DECLARE_bool(http_verbose);
}

class PartDataReader: public brpc::ProgressiveReader {
public:
explicit PartDataReader(bthread::CountdownEvent* done): _done(done){}

butil::Status OnReadOnePart(const void* data, size_t length) {
memcpy(_buffer, data, length);
LOG(INFO) << "data : " << _buffer << " size : " << length;
return butil::Status::OK();
}

void OnEndOfMessage(const butil::Status& status) {
_done->signal();
LOG(INFO) << "progressive read data final status : " << status;
}
private:
char _buffer[1024];
bthread::CountdownEvent* _done;
};

int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
Expand Down Expand Up @@ -71,13 +93,25 @@ int main(int argc, char* argv[]) {
cntl.request_attachment().append(FLAGS_d);
}

if (FLAGS_progressive) {
cntl.set_progressive_read_timeout_ms(FLAGS_progressive_read_timeout_ms);
cntl.response_will_be_read_progressively();
}

// Because `done'(last parameter) is NULL, this function waits until
// the response comes back or error occurs(including timedout).
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
std::cerr << cntl.ErrorText() << std::endl;
return -1;
}

if (FLAGS_progressive) {
bthread::CountdownEvent done(1);
cntl.ReadProgressiveAttachmentBy(new PartDataReader(&done));
done.wait();
LOG(INFO) << "wait client progressive read done safely";
}
// If -http_verbose is on, brpc already prints the response to stderr.
if (!brpc::FLAGS_http_verbose) {
std::cout << cntl.response_attachment() << std::endl;
Expand Down
7 changes: 7 additions & 0 deletions example/http_c++/http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
DEFINE_string(certificate, "cert.pem", "Certificate file path to enable SSL");
DEFINE_string(private_key, "key.pem", "Private key file path to enable SSL");
DEFINE_string(ciphers, "", "Cipher suite used for SSL connections");
DEFINE_bool(enable_progressive_timeout, false, "whether or not trigger progressive write attachement data timeout");

namespace example {

Expand Down Expand Up @@ -104,6 +105,9 @@ class FileServiceImpl : public FileService {

// sleep a while to send another part.
bthread_usleep(10000);
if (FLAGS_enable_progressive_timeout && i > 50) {
bthread_usleep(100000000UL);
}
}
return NULL;
}
Expand Down Expand Up @@ -194,6 +198,9 @@ class HttpSSEServiceImpl : public HttpSSEService {

// sleep a while to send another part.
bthread_usleep(10000 * 10);
if (FLAGS_enable_progressive_timeout && i > 50) {
bthread_usleep(100000000UL);
}
}
return NULL;
}
Expand Down
134 changes: 133 additions & 1 deletion src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ DEFINE_bool(graceful_quit_on_sigterm, false,
"Register SIGTERM handle func to quit graceful");
DEFINE_bool(graceful_quit_on_sighup, false,
"Register SIGHUP handle func to quit graceful");

DEFINE_bool(log_idle_progressive_read_close, false,
"Print log when an idle progressive read is closed");
const IdlNames idl_single_req_single_res = { "req", "res" };
const IdlNames idl_single_req_multi_res = { "req", "" };
const IdlNames idl_multi_req_single_res = { "", "res" };
Expand Down Expand Up @@ -174,6 +175,86 @@ class IgnoreAllRead : public ProgressiveReader {
void OnEndOfMessage(const butil::Status&) {}
};

class ProgressiveTimeoutReader : public ProgressiveReader {
public:
explicit ProgressiveTimeoutReader(SocketId id, int32_t read_timeout_ms, ProgressiveReader* reader):
_socket_id(id),
_read_timeout_ms(read_timeout_ms),
_reader(reader),
_timeout_id(0),
_latest_add_timer_ms(0),
_add_timer_delay_ms(1000),
_is_read_timeout(false) {
AddIdleReadTimeoutMonitor();
}
butil::Status OnReadOnePart(const void* data, size_t length) {
auto status = _reader->OnReadOnePart(data, length);
AddIdleReadTimeoutMonitor();
return status;
}
void OnEndOfMessage(const butil::Status& status) {
if (_is_read_timeout) {
_reader->OnEndOfMessage(butil::Status(ECONNRESET, "The progressive read timeout"));
Comment thread
zchuango marked this conversation as resolved.
Outdated
} else {
_reader->OnEndOfMessage(status);
}
if(_timeout_id > 0) {
bthread_timer_del(_timeout_id);
}
}

SocketId GetSocketId() {
return _socket_id;
}

int32_t read_timeout_ms() {
return _read_timeout_ms;
}

void set_read_timeout(bool read_timeout = true) {
_is_read_timeout = read_timeout;
}

private:
void AddToTimer() {
if (_timeout_id > 0) {
bthread_timer_del(_timeout_id);
}
bthread_timer_add(&_timeout_id,
butil::milliseconds_from_now(_read_timeout_ms),
Controller::HandleIdleProgressiveReader,
Comment thread
zchuango marked this conversation as resolved.
Outdated
this
);
}

void AddIdleReadTimeoutMonitor() {
if (_read_timeout_ms <= 0) {
return;
}
if(_read_timeout_ms < _add_timer_delay_ms) {
AddToTimer();
return;
}
auto current_ms = butil::cpuwide_time_ms();
if (current_ms - _latest_add_timer_ms < _add_timer_delay_ms) {
return;
}
_latest_add_timer_ms = current_ms;
AddToTimer();
Comment thread
zchuango marked this conversation as resolved.
Outdated
}

private:
SocketId _socket_id;
int32_t _read_timeout_ms;
ProgressiveReader* _reader;
// Timer registered to trigger progressive timeout event
bthread_timer_t _timeout_id;
int64_t _latest_add_timer_ms;
// avoid frequently add timer for idle handler
int32_t _add_timer_delay_ms;
bool _is_read_timeout;
};

static IgnoreAllRead* s_ignore_all_read = NULL;
static pthread_once_t s_ignore_all_read_once = PTHREAD_ONCE_INIT;
static void CreateIgnoreAllRead() { s_ignore_all_read = new IgnoreAllRead; }
Expand Down Expand Up @@ -331,6 +412,15 @@ void Controller::Call::Reset() {
stream_user_data = NULL;
}

void Controller::set_progressive_read_timeout_ms(int32_t progressive_read_timeout_ms){
if(progressive_read_timeout_ms <= 0x7fffffff){
_progressive_read_timeout_ms = progressive_read_timeout_ms;
} else {
_progressive_read_timeout_ms = 0x7fffffff;
LOG(WARNING) << "progressive_read_timeout_seconds is limited to 0x7fffffff";
}
}

void Controller::set_timeout_ms(int64_t timeout_ms) {
if (timeout_ms <= 0x7fffffff) {
_timeout_ms = timeout_ms;
Expand Down Expand Up @@ -1028,6 +1118,44 @@ void Controller::SubmitSpan() {
_span = NULL;
}

void Controller::HandleIdleProgressiveReader(void* arg) {
Comment thread
zchuango marked this conversation as resolved.
Outdated
if(arg == nullptr){
LOG(ERROR) << "Controller::HandleIdleProgressiveReader arg is null.";
return;
}
ProgressiveTimeoutReader* reader = static_cast<ProgressiveTimeoutReader*>(arg);
SocketUniquePtr s;
if (Socket::Address(reader->GetSocketId(), &s) != 0) {
LOG(ERROR) << "not found the socket id : " << reader->GetSocketId();
return;
}
auto log_idle = FLAGS_log_idle_progressive_read_close;
int64_t progressive_read_timeout_us = reader->read_timeout_ms() * 1000;
int64_t pre_idle_duration_us = 0;
int64_t idle_duration_us = butil::cpuwide_time_us() - s->last_active_time_us();
while (progressive_read_timeout_us > idle_duration_us && idle_duration_us > pre_idle_duration_us) {
auto sleep_ms = (progressive_read_timeout_us - idle_duration_us) / 1000;
bthread_usleep(sleep_ms > 0 ? sleep_ms : 1);
pre_idle_duration_us = idle_duration_us;
idle_duration_us = butil::cpuwide_time_us() - s->last_active_time_us();
}
if (idle_duration_us <= pre_idle_duration_us) {
LOG_IF(INFO, log_idle) << "stop progressive read timeout checking process!"
<< " progressive_read_timeout_us : " << progressive_read_timeout_us
<< " idle_duration_us : " << idle_duration_us
<< " pre_idle_duration_us : " << pre_idle_duration_us;
return;
}
reader->set_read_timeout();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a thread safety issue here. The process will crash if the reader has already been destructed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue deserves attention.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This problem doesn't seem to be resolved yet.

LOG_IF(INFO, log_idle) << "progressive read timeout socket id : " << reader->GetSocketId()
<< " progressive read timeout us : " << progressive_read_timeout_us
<< " progressive read idle duration : " << idle_duration_us;
if (s->parsing_context() != NULL) {
s->parsing_context()->Destroy();
}
s->ReleaseReferenceIfIdle(0);
}

void Controller::HandleSendFailed() {
if (!FailedInline()) {
SetFailed("Must be SetFailed() before calling HandleSendFailed()");
Expand Down Expand Up @@ -1543,6 +1671,10 @@ void Controller::ReadProgressiveAttachmentBy(ProgressiveReader* r) {
__FUNCTION__));
}
add_flag(FLAGS_PROGRESSIVE_READER);
if (progressive_read_timeout_ms() > 0) {
auto reader = new ProgressiveTimeoutReader(_rpa->GetSocketId(), _progressive_read_timeout_ms, r);
Comment thread
zchuango marked this conversation as resolved.
return _rpa->ReadProgressiveAttachmentBy(reader);
}
return _rpa->ReadProgressiveAttachmentBy(r);
}

Expand Down
11 changes: 8 additions & 3 deletions src/brpc/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
#include "brpc/grpc.h"
#include "brpc/kvmap.h"
#include "brpc/rpc_dump.h"

// EAUTH is defined in MAC
#ifndef EAUTH
#define EAUTH ERPCAUTH
Expand Down Expand Up @@ -163,7 +162,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
uint64_t log_id;
std::string request_id;
};

static void HandleIdleProgressiveReader(void* arg);
public:
Controller();
Controller(const Inheritable& parent_ctx);
Expand All @@ -177,6 +176,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);

// Set/get timeout in milliseconds for the RPC call. Use
// ChannelOptions.timeout_ms on unset.
void set_progressive_read_timeout_ms(int32_t progressive_read_timeout_ms);
int32_t progressive_read_timeout_ms() const { return _progressive_read_timeout_ms; }

void set_timeout_ms(int64_t timeout_ms);
int64_t timeout_ms() const { return _timeout_ms; }

Expand Down Expand Up @@ -323,7 +325,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);

// Make the RPC end when the HTTP response has complete headers and let
// user read the remaining body by using ReadProgressiveAttachmentBy().
void response_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); }
void response_will_be_read_progressively() {
add_flag(FLAGS_READ_PROGRESSIVELY);
}
// Make the RPC end when the HTTP request has complete headers and let
// user read the remaining body by using ReadProgressiveAttachmentBy().
void request_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); }
Expand Down Expand Up @@ -837,6 +841,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
int32_t _timeout_ms;
int32_t _connect_timeout_ms;
int32_t _backup_request_ms;
int32_t _progressive_read_timeout_ms;
// Priority: `_backup_request_policy' > `_backup_request_ms'.
BackupRequestPolicy* _backup_request_policy;
// If this rpc call has retry/backup request,this var save the real timeout for current call
Expand Down
1 change: 1 addition & 0 deletions src/brpc/policy/http_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,7 @@ ParseResult ParseHttpMessage(butil::IOBuf *source, Socket *socket,
LOG(FATAL) << "Fail to new HttpContext";
return MakeParseError(PARSE_ERROR_NO_RESOURCE);
}
http_imsg->SetSocketId(socket->id());
// Parsing http is costly, parsing an incomplete http message from the
// beginning repeatedly should be avoided, otherwise the cost may reach
// O(n^2) in the worst case. Save incomplete http messages in sockets
Expand Down
9 changes: 9 additions & 0 deletions src/brpc/policy/http_rpc_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ class HttpContext : public ReadableProgressiveAttachment
, public InputMessageBase
, public HttpMessage {
public:
SocketId GetSocketId() override {
return _socket_id;
}

void SetSocketId(SocketId id) override {
_socket_id = id;
}

explicit HttpContext(bool read_body_progressively,
HttpMethod request_method = HTTP_METHOD_GET)
: InputMessageBase()
Expand Down Expand Up @@ -122,6 +130,7 @@ class HttpContext : public ReadableProgressiveAttachment

private:
bool _is_stage2;
SocketId _socket_id;
Comment thread
zchuango marked this conversation as resolved.
};

// Implement functions required in protocol.h
Expand Down
3 changes: 3 additions & 0 deletions src/brpc/progressive_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#define BRPC_PROGRESSIVE_READER_H

#include "brpc/shared_object.h"
#include "brpc/socket.h"


namespace brpc {
Expand Down Expand Up @@ -84,6 +85,8 @@ class ReadableProgressiveAttachment : public SharedObject {
// Any error occurred should destroy the reader by calling r->Destroy().
// r->Destroy() should be guaranteed to be called once and only once.
virtual void ReadProgressiveAttachmentBy(ProgressiveReader* r) = 0;
virtual SocketId GetSocketId() = 0;
virtual void SetSocketId(SocketId id) = 0;
Comment thread
zchuango marked this conversation as resolved.
Outdated
};

} // namespace brpc
Expand Down
Loading