Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1459,7 +1459,15 @@ void Controller::HandleStreamConnection(Socket *host_socket) {
if(!ptrs[i]) continue;
Stream* extra_stream = (Stream *) ptrs[i]->conn();
_remote_stream_settings->set_stream_id(extra_stream_ids[i - 1]);
s->SetHostSocket(host_socket);
if (s->SetHostSocket(host_socket) != 0) {
const int ec = errno;
Stream::SetFailed(_request_streams, ec,
"Fail to bind response stream to %s",
host_socket->description().c_str());
SetFailed(ec, "Fail to bind response stream to %s",
host_socket->description().c_str());
return;
}
extra_stream->SetConnected(_remote_stream_settings);
}
}
Expand Down
45 changes: 37 additions & 8 deletions src/brpc/policy/baidu_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,19 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
Stream* s = (Stream *) stream_ptr->conn();
StreamSettings *stream_settings = meta.mutable_stream_settings();
s->FillSettings(stream_settings);
s->SetHostSocket(sock);
if (s->SetHostSocket(sock) != 0) {
const int errcode = errno;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use errno as the reason for SetHostSocket failure? SetHostSocket failure does not set errno.

LOG_IF(WARNING, errcode != EPIPE)
<< "Fail to bind response stream=" << response_stream_id
<< " to " << sock->description() << ": "
<< berror(errcode);
cntl->SetFailed(errcode, "Fail to bind response stream to %s",
sock->description().c_str());
Comment thread
chenBright marked this conversation as resolved.
Stream::SetFailed(response_stream_ids, errcode,
"Fail to bind response stream to %s",
sock->description().c_str());
Comment thread
chenBright marked this conversation as resolved.
return;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that if binding the response stream fails, we shouldn't just return directly; we should still try writing a response.

}
for (size_t i = 1; i < response_stream_ids.size(); ++i) {
stream_settings->mutable_extra_stream_ids()->Add(response_stream_ids[i]);
}
Expand Down Expand Up @@ -390,6 +402,15 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,

ResponseWriteInfo args;
bthread_id_t response_id = INVALID_BTHREAD_ID;
auto response_write_guard = butil::MakeScopeGuard([&response_id, &args, span] {
if (response_id == INVALID_BTHREAD_ID) {
return;
}
bthread_id_join(response_id);
// Do not care about the result of background writing.
// TODO: this is not sent
span->set_sent_us(args.sent_us);
});
if (span) {
span->set_response_size(res_buf.size());
CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten));
Expand Down Expand Up @@ -426,7 +447,21 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
SocketUniquePtr extra_stream_ptr;
if (Socket::Address(extra_stream_id, &extra_stream_ptr) == 0) {
Stream* extra_stream = (Stream *) extra_stream_ptr->conn();
extra_stream->SetHostSocket(sock);
if (extra_stream->SetHostSocket(sock) != 0) {
const int errcode = errno;
LOG_IF(WARNING, errcode != EPIPE)
<< "Fail to bind response stream=" << extra_stream_id
<< " to " << sock->description() << ": "
<< berror(errcode);
cntl->SetFailed(errcode, "Fail to bind response stream to %s",
sock->description().c_str());
Comment on lines +456 to +457
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just call cntl->SetFailed once.

StreamIds remaining_stream_ids(response_stream_ids.begin() + i,
response_stream_ids.end());
Stream::SetFailed(remaining_stream_ids, errcode,
"Fail to bind response stream to %s",
sock->description().c_str());
Comment thread
chenBright marked this conversation as resolved.
return;
}
extra_stream->SetConnected();
} else {
LOG(WARNING) << "Stream=" << extra_stream_id
Expand All @@ -451,12 +486,6 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
}
}

if (span) {
bthread_id_join(response_id);
// Do not care about the result of background writing.
// TODO: this is not sent
span->set_sent_us(args.sent_us);
}
}

namespace {
Expand Down
31 changes: 22 additions & 9 deletions src/brpc/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "brpc/stream.h"

#include <gflags/gflags.h>
#include "butil/string_printf.h"
#include "butil/time.h"
#include "butil/object_pool.h"
#include "butil/unique_ptr.h"
Expand Down Expand Up @@ -57,6 +58,7 @@ Stream::Stream()
, _pending_buf(NULL)
, _start_idle_timer_us(0)
, _idle_timer(0)
, _set_host_socket_ec(0)
{
_connect_meta.on_connect = NULL;
CHECK_EQ(0, bthread_mutex_init(&_connect_mutex, NULL));
Expand Down Expand Up @@ -665,13 +667,16 @@ int Stream::SetHostSocket(Socket *host_socket) {
std::call_once(_set_host_socket_flag, [this, host_socket]() {
SocketUniquePtr ptr;
host_socket->ReAddress(&ptr);
// TODO add *this to host socke
if (ptr->AddStream(id()) != 0) {
CHECK(false) << id() << " fail to add stream to host socket";
_set_host_socket_ec = errno ? errno : ptr->non_zero_error_code();
return;
}
_host_socket = ptr.release();
});
if (_host_socket == NULL) {
errno = _set_host_socket_ec ? _set_host_socket_ec : EFAILEDSOCKET;
return -1;
Comment thread
chenBright marked this conversation as resolved.
}
return 0;
}

Expand Down Expand Up @@ -731,27 +736,35 @@ void Stream::Close(int error_code, const char* reason_fmt, ...) {
return TriggerOnConnectIfNeed();
}

int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) {
int Stream::SetFailedWithReason(StreamId id, int error_code,
const std::string& reason) {
SocketUniquePtr ptr;
if (Socket::AddressFailedAsWell(id, &ptr) == -1) {
// Don't care recycled stream
return 0;
}
Stream* s = (Stream*)ptr->conn();
s->Close(error_code, "%s", reason.c_str());
return 0;
}

int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) {
va_list ap;
va_start(ap, reason_fmt);
s->Close(error_code, reason_fmt, ap);
std::string reason;
butil::string_vprintf(&reason, reason_fmt, ap);
va_end(ap);
return 0;
return SetFailedWithReason(id, error_code, reason);
}

int Stream::SetFailed(const StreamIds& ids, int error_code, const char* reason_fmt, ...) {
va_list ap;
va_start(ap, reason_fmt);
for(size_t i = 0; i< ids.size(); ++i) {
Stream::SetFailed(ids[i], error_code, reason_fmt, ap);
}
std::string reason;
butil::string_vprintf(&reason, reason_fmt, ap);
va_end(ap);
for (size_t i = 0; i < ids.size(); ++i) {
Stream::SetFailedWithReason(ids[i], error_code, reason);
}
return 0;
}

Expand Down
4 changes: 4 additions & 0 deletions src/brpc/stream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#define BRPC_STREAM_IMPL_H

#include <mutex>
#include <string>
#include "bthread/bthread.h"
#include "bthread/execution_queue.h"
#include "brpc/socket.h"
Expand Down Expand Up @@ -91,6 +92,8 @@ friend struct butil::DefaultDeleter<Stream>;
static int TriggerOnWritable(bthread_id_t id, void *data, int error_code);
static void *RunOnWritable(void* arg);
static void* RunOnConnect(void* arg);
static int SetFailedWithReason(StreamId id, int error_code,
const std::string& reason);

struct ConnectMeta {
int (*on_connect)(int, int, void*);
Expand Down Expand Up @@ -136,6 +139,7 @@ friend struct butil::DefaultDeleter<Stream>;
int64_t _start_idle_timer_us;
bthread_timer_t _idle_timer;
std::once_flag _set_host_socket_flag;
int _set_host_socket_ec;
};

} // namespace brpc
Expand Down
100 changes: 92 additions & 8 deletions test/brpc_streaming_rpc_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@

#include <gtest/gtest.h>
#include <atomic>
#include <errno.h>
#include "brpc/server.h"

#include "brpc/controller.h"
#include "brpc/channel.h"
#include "brpc/callback.h"
#include "brpc/socket.h"
#include "brpc/details/controller_private_accessor.h"
#include "brpc/stream_impl.h"
#include "brpc/policy/streaming_rpc_protocol.h"
#include "echo.pb.h"
Expand All @@ -38,12 +40,12 @@ class AfterAcceptStream {

class MyServiceWithStream : public test::EchoService {
public:
MyServiceWithStream(const brpc::StreamOptions& options)
MyServiceWithStream(const brpc::StreamOptions& options)
: _options(options)
, _after_accept_stream(NULL)
{}
MyServiceWithStream(const brpc::StreamOptions& options,
AfterAcceptStream* after_accept_stream)
AfterAcceptStream* after_accept_stream)
: _options(options)
, _after_accept_stream(after_accept_stream)
{}
Expand All @@ -53,9 +55,9 @@ class MyServiceWithStream : public test::EchoService {
{}

void Echo(::google::protobuf::RpcController* controller,
const ::test::EchoRequest* request,
::test::EchoResponse* response,
::google::protobuf::Closure* done) {
const ::test::EchoRequest* request,
::test::EchoResponse* response,
::google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
response->set_message(request->message());
brpc::Controller* cntl = (brpc::Controller*)controller;
Expand Down Expand Up @@ -125,7 +127,8 @@ class BatchStreamClientHandler : public brpc::StreamInputHandler {

void on_closed(brpc::StreamId /*id*/) override {}

void on_failed(brpc::StreamId /*id*/, int /*error_code*/, const std::string& /*error_text*/) override {}
void on_failed(brpc::StreamId /*id*/, int /*error_code*/,
const std::string& /*error_text*/) override {}

private:
BatchStreamFeedbackRaceState* _state;
Expand Down Expand Up @@ -162,7 +165,8 @@ static void* SendTwoMessagesOnServerExtraStream(void* arg) {
std::string payload(64, 'a');
butil::IOBuf out;
out.append(payload);
state->server_first_write_rc.store(brpc::StreamWrite(sid, out), std::memory_order_relaxed);
state->server_first_write_rc.store(brpc::StreamWrite(sid, out),
std::memory_order_relaxed);
}

// 2) Then send another byte. This write should become writable only after
Expand Down Expand Up @@ -226,12 +230,59 @@ static void SetAtomicTrue(std::atomic<bool>* f) {

static bool WaitForTrue(const std::atomic<bool>& f, int timeout_ms) {
const int64_t deadline_us = butil::gettimeofday_us() + (int64_t)timeout_ms * 1000L;
while (!f.load(std::memory_order_acquire) && butil::gettimeofday_us() < deadline_us) {
while (!f.load(std::memory_order_acquire) &&
butil::gettimeofday_us() < deadline_us) {
usleep(1000);
}
return f.load(std::memory_order_acquire);
}

class MyServiceWithStreamAndFailedSocket : public test::EchoService {
public:
explicit MyServiceWithStreamAndFailedSocket(const brpc::StreamOptions& options)
: _options(options) {}

void Echo(::google::protobuf::RpcController* controller,
const ::test::EchoRequest* request,
::test::EchoResponse* response,
::google::protobuf::Closure* done) override {
brpc::ClosureGuard done_guard(done);
response->set_message(request->message());
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
brpc::StreamId response_stream;
ASSERT_EQ(0, StreamAccept(&response_stream, *cntl, &_options));
brpc::ControllerPrivateAccessor accessor(cntl);
ASSERT_TRUE(accessor.get_sending_socket() != NULL);
accessor.get_sending_socket()->SetFailed();
}

private:
brpc::StreamOptions _options;
};

TEST_F(StreamingRpcTest, set_host_socket_returns_error_when_socket_is_failed) {
brpc::SocketOptions socket_options;
brpc::SocketId host_socket_id;
ASSERT_EQ(0, brpc::Socket::Create(socket_options, &host_socket_id));
brpc::SocketUniquePtr host_socket;
ASSERT_EQ(0, brpc::Socket::Address(host_socket_id, &host_socket));
ASSERT_EQ(0, host_socket->SetFailed());

brpc::StreamId stream_id;
brpc::StreamOptions stream_options;
ASSERT_EQ(0, brpc::Stream::Create(stream_options, NULL, &stream_id, false));
brpc::ScopedStream stream_guard(stream_id);

brpc::SocketUniquePtr stream_socket;
ASSERT_EQ(0, brpc::Socket::Address(stream_id, &stream_socket));
brpc::Stream* stream = static_cast<brpc::Stream*>(stream_socket->conn());

errno = 0;
ASSERT_EQ(-1, stream->SetHostSocket(host_socket.get()));
ASSERT_NE(0, errno);
ASSERT_TRUE(stream->_host_socket == NULL);
}

TEST_F(StreamingRpcTest, sanity) {
brpc::Server server;
MyServiceWithStream service;
Expand Down Expand Up @@ -393,6 +444,39 @@ class OrderedInputHandler : public brpc::StreamInputHandler {
HandlerControl* _cntl;
};

TEST_F(StreamingRpcTest, server_failed_socket_before_response_closes_stream_without_abort) {
OrderedInputHandler handler;
brpc::StreamOptions response_stream_options;
response_stream_options.handler = &handler;
brpc::Server server;
MyServiceWithStreamAndFailedSocket service(response_stream_options);
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
ASSERT_EQ(0, server.Start(9007, NULL));

brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL));
brpc::Controller cntl;
brpc::StreamId request_stream;
ASSERT_EQ(0, StreamCreate(&request_stream, cntl, NULL));
brpc::ScopedStream stream_guard(request_stream);

test::EchoService_Stub stub(&channel);
stub.Echo(&cntl, &request, &response, NULL);
ASSERT_TRUE(cntl.Failed());

for (int i = 0; i < 10000 && !handler.stopped(); ++i) {
usleep(100);
}

server.Stop(0);
server.Join();

ASSERT_TRUE(handler.stopped());
ASSERT_TRUE(handler.failed());
ASSERT_EQ(0, handler.idle_times());
ASSERT_EQ(0, handler._expected_next_value);
}

TEST_F(StreamingRpcTest, received_in_order) {
OrderedInputHandler handler;
brpc::StreamOptions opt;
Expand Down
Loading