Skip to content

Commit

Permalink
Patch svn r35199
Browse files Browse the repository at this point in the history
  • Loading branch information
gejun committed Sep 1, 2017
1 parent 1c83875 commit 02ed49d
Show file tree
Hide file tree
Showing 22 changed files with 79 additions and 64 deletions.
2 changes: 1 addition & 1 deletion example/selective_echo_c++/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ DEFINE_int32(attachment_size, 0, "Carry so many byte attachment along with reque
DEFINE_int32(request_size, 16, "Bytes of each request");
DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short");
DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in protocol/brpc/options.proto");
DEFINE_string(starting_server, "0.0.0.0:8014", "IP Address of the first server, port of i-th server is `first-port + i'");
DEFINE_string(starting_server, "0.0.0.0:8114", "IP Address of the first server, port of i-th server is `first-port + i'");
DEFINE_string(load_balancer, "rr", "Name of load balancer");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(backup_ms, -1, "backup timeout in milliseconds");
Expand Down
2 changes: 1 addition & 1 deletion example/selective_echo_c++/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include "echo.pb.h"

DEFINE_bool(echo_attachment, true, "Echo attachment as well");
DEFINE_int32(port, 8014, "TCP Port of this server");
DEFINE_int32(port, 8114, "TCP Port of this server");
DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'");
DEFINE_int32(logoff_ms, 2000, "Maximum duration of server's LOGOFF state "
Expand Down
53 changes: 34 additions & 19 deletions src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info,

END_OF_RPC:
if (new_bthread) {
// [Essential for -usercode_in_pthread=true, nice to have otherwise]
// [ Essential for -usercode_in_pthread=true ]
// When -usercode_in_pthread is on, the reserved threads (set by
// -usercode_backup_threads) may all block on bthread_id_lock in
// ProcessXXXResponse(), until the id is unlocked or destroyed which
Expand All @@ -591,8 +591,24 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
// Make the id unlockable before creating the bthread fixes the issue.
// When -usercode_in_pthread is false, this also removes some useless
// waiting of the bthreads processing responses.
bthread_id_about_to_destroy(info.id);

// Note[_done]: callid is destroyed after _done which possibly takes
// a lot of time, stop useless locking

// Note[cid]: When the callid needs to be destroyed in done->Run(),
// it does not mean that it will be destroyed directly in done->Run(),
// conversely the callid may still be locked/unlocked for many times
// before destroying. E.g. in slective channel, the callid is referenced
// by multiple sub-done and only destroyed by the last one. Calling
// bthread_id_about_to_destroy right here which makes the id unlockable
// anymore, is wrong. On the other hand, the combo channles setting
// FLAGS_DESTROY_CID_IN_DONE to true must be aware of
// -usercode_in_pthread and avoid deadlock by their own (TBR)

if ((FLAGS_usercode_in_pthread || _done != NULL/*Note[_done]*/) &&
!has_flag(FLAGS_DESTROY_CID_IN_DONE)/*Note[cid]*/) {
bthread_id_about_to_destroy(info.id);
}
// No need to join this bthread since RPC caller won't wake up
// (or user's done won't be called) until this bthread finishes
bthread_t bt;
Expand All @@ -605,9 +621,8 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
LOG(FATAL) << "Fail to start bthread";
EndRPC(info);
} else {
if (_done != NULL) {
// [Optional] _done possibly takes a lot of time, stop
// useless locking.
if (_done != NULL/*Note[_done]*/ &&
!has_flag(FLAGS_DESTROY_CID_IN_DONE)/*Note[cid]*/) {
bthread_id_about_to_destroy(info.id);
}
EndRPC(info);
Expand Down Expand Up @@ -809,14 +824,14 @@ void Controller::EndRPC(const CompletionInfo& info) {
// Join is not signalled when the done does not Run() and the done
// can't Run() because all backup threads are blocked by Join().

// Call OnRPCEnd for async RPC. The one for sync RPC is called in
// Channel::CallMethod to count in latency of the context-switch.
OnRPCEnd(base::gettimeofday_us());
const bool destroy_cid_in_done = has_flag(FLAGS_DESTROY_CID_IN_DONE);
_done->Run();
// NOTE: Don't touch this Controller anymore, because it's likely to be
// deleted by done.
if (!destroy_cid_in_done) {
// Make this thread not scheduling itself when launching new
// bthreads, saving signalings.
// FIXME: We're assuming the calling thread is about to quit.
bthread_about_to_quit();
CHECK_EQ(0, bthread_id_unlock_and_destroy(saved_cid));
Expand All @@ -825,8 +840,10 @@ void Controller::EndRPC(const CompletionInfo& info) {
RunUserCode(RunDoneInBackupThread, this);
}
} else {
// OnRPCEnd() of sync RPC is called in the caller's thread.
// FIXME: We're assuming the calling thread is about to quit.
// OnRPCEnd for sync RPC is called in Channel::CallMethod to count in
// latency of the context-switch.

// Check comments in above branch on bthread_about_to_quit.
bthread_about_to_quit();
add_flag(FLAGS_DESTROYED_CID);
CHECK_EQ(0, bthread_id_unlock_and_destroy(saved_cid));
Expand All @@ -837,14 +854,13 @@ void Controller::RunDoneInBackupThread(void* arg) {
}

void Controller::DoneInBackupThread() {
// Call OnRPCEnd for async RPC. The one for sync RPC is called in
// Channel::CallMethod to count in latency of the context-switch.
// OnRPCEnd for sync RPC is called in Channel::CallMethod to count in
// latency of the context-switch.
OnRPCEnd(base::gettimeofday_us());
const CallId saved_cid = _correlation_id;
const bool destroy_cid_in_done = has_flag(FLAGS_DESTROY_CID_IN_DONE);
_done->Run();
// NOTE: Don't touch this Controller anymore, because it's likely to be
// deleted by done.
// NOTE: Don't touch fields of controller anymore, it may be deleted.
if (!destroy_cid_in_done) {
CHECK_EQ(0, bthread_id_unlock_and_destroy(saved_cid));
}
Expand All @@ -862,12 +878,11 @@ void Controller::SubmitSpan() {

void Controller::HandleSendFailed() {
// NOTE: Must launch new thread to run the callback in an asynchronous
// call. Users are likely to hold a lock before async CallMethod returns
// and grab the same lock inside the callback. If we call the callback
// on top of the same stack of CallMethod, we're deadlocked.
// Sync call is opposite: We cannot run the callback with new thread
// in a sync call otherwise we can't leave CallMethod before the
// callback is done.
// call. Users may hold a lock before asynchronus CallMethod returns and
// grab the same lock inside done->Run(). If we call done->Run() in the
// same stack of CallMethod, we're deadlocked.
// We don't need to run the callback with new thread in a sync call since
// the created thread needs to be joined anyway before CallMethod ends.
if (!FailedInline()) {
SetFailed("Must be SetFailed() before calling HandleSendFailed()");
LOG(FATAL) << ErrorText();
Expand Down
3 changes: 2 additions & 1 deletion src/brpc/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,8 @@ friend void policy::ProcessMongoRequest(InputMessageBase*);

// Append server information to `_error_text'
void AppendServerIdentiy();


// Used by ParallelChannel
static const int8_t CALLMETHOD_CANNOT_RUN_DONE = 0;
static const int8_t CALLMETHOD_CAN_RUN_DONE = 1;
static const int8_t CALLMETHOD_DID_RUN_DONE = 2;
Expand Down
18 changes: 9 additions & 9 deletions src/brpc/memcache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -556,8 +556,8 @@ bool MemcacheResponse::PopGet(
return false;
}
if (header.status != (uint16_t)STATUS_SUCCESS) {
LOG_IF(FATAL, header.extras_length != 0) << "GET response must not have flags";
LOG_IF(FATAL, header.key_length != 0) << "GET response must not have key";
LOG_IF(ERROR, header.extras_length != 0) << "GET response must not have flags";
LOG_IF(ERROR, header.key_length != 0) << "GET response must not have key";
const int value_size = (int)header.total_body_length - (int)header.extras_length
- (int)header.key_length;
if (value_size < 0) {
Expand Down Expand Up @@ -690,8 +690,8 @@ bool MemcacheResponse::PopStore(uint8_t command, uint64_t* cas_value) {
base::string_printf(&_err, "Not enough data");
return false;
}
LOG_IF(FATAL, header.extras_length != 0) << "STORE response must not have flags";
LOG_IF(FATAL, header.key_length != 0) << "STORE response must not have key";
LOG_IF(ERROR, header.extras_length != 0) << "STORE response must not have flags";
LOG_IF(ERROR, header.key_length != 0) << "STORE response must not have key";
int value_size = (int)header.total_body_length - (int)header.extras_length
- (int)header.key_length;
if (header.status != (uint16_t)STATUS_SUCCESS) {
Expand All @@ -700,7 +700,7 @@ bool MemcacheResponse::PopStore(uint8_t command, uint64_t* cas_value) {
_buf.cutn(&_err, value_size);
return false;
}
LOG_IF(FATAL, value_size != 0) << "STORE response must not have value, actually="
LOG_IF(ERROR, value_size != 0) << "STORE response must not have value, actually="
<< value_size;
_buf.pop_front(sizeof(header) + header.total_body_length);
if (cas_value) {
Expand Down Expand Up @@ -855,8 +855,8 @@ bool MemcacheResponse::PopCounter(
(unsigned)n, (unsigned)sizeof(header), header.total_body_length);
return false;
}
LOG_IF(FATAL, header.extras_length != 0) << "INCR/DECR response must not have flags";
LOG_IF(FATAL, header.key_length != 0) << "INCR/DECR response must not have key";
LOG_IF(ERROR, header.extras_length != 0) << "INCR/DECR response must not have flags";
LOG_IF(ERROR, header.key_length != 0) << "INCR/DECR response must not have key";
const int value_size = (int)header.total_body_length - (int)header.extras_length
- (int)header.key_length;
_buf.pop_front(sizeof(header) + header.extras_length + header.key_length);
Expand Down Expand Up @@ -975,8 +975,8 @@ bool MemcacheResponse::PopVersion(std::string* version) {
(unsigned)n, (unsigned)sizeof(header), header.total_body_length);
return false;
}
LOG_IF(FATAL, header.extras_length != 0) << "VERSION response must not have flags";
LOG_IF(FATAL, header.key_length != 0) << "VERSION response must not have key";
LOG_IF(ERROR, header.extras_length != 0) << "VERSION response must not have flags";
LOG_IF(ERROR, header.key_length != 0) << "VERSION response must not have key";
const int value_size = (int)header.total_body_length - (int)header.extras_length
- (int)header.key_length;
_buf.pop_front(sizeof(header) + header.extras_length + header.key_length);
Expand Down
5 changes: 2 additions & 3 deletions src/brpc/parallel_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ class ParallelChannelDone : public google::protobuf::Closure {
for (int i = 0; i < ndone; ++i) {
new (d->sub_done(i)) SubDone;
d->sub_done(i)->cntl.ApplyClientSettings(settings);
d->sub_done(i)->cntl._run_done_state =
Controller::CALLMETHOD_CAN_RUN_DONE;
d->sub_done(i)->cntl._run_done_state = Controller::CALLMETHOD_CAN_RUN_DONE;
}
// Setup the map for finding sub_done of i-th sub_channel
if (ndone != nchan) {
Expand Down Expand Up @@ -271,7 +270,7 @@ class ParallelChannelDone : public google::protobuf::Closure {
// [ Rendezvous point ]
// One and only one thread arrives here.
// all call_id of sub calls are destroyed and call_id of _cntl is
// still locked (because of _destroy_cid_in_done = true);
// still locked (because FLAGS_DESTROY_CID_IN_DONE is true);

// Merge responses of successful calls if fail_limit is not reached.
// nfailed may be increased during the merging.
Expand Down
4 changes: 2 additions & 2 deletions src/brpc/policy/baidu_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,8 +534,8 @@ void ProcessRpcResponse(InputMessageBase* msg_base) {
Controller* cntl = NULL;
const int rc = bthread_id_lock(cid, (void**)&cntl);
if (rc != 0) {
LOG_IF(FATAL, rc != EINVAL) << "Fail to lock correlation_id="
<< cid.value << ": " << berror(rc);
LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
<< "Fail to lock correlation_id=" << cid << ": " << berror(rc);
if (meta.has_stream_settings()) {
SendStreamRst(msg->socket(), meta.stream_settings().stream_id());
}
Expand Down
4 changes: 2 additions & 2 deletions src/brpc/policy/esp_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ void ProcessEspResponse(InputMessageBase* msg_base) {
Controller* cntl = NULL;
const int rc = bthread_id_lock(cid, (void**)&cntl);
if (rc != 0) {
LOG_IF(FATAL, rc != EINVAL) << "Fail to lock correlation_id="
<< cid.value << ", " << berror(rc);
LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
<< "Fail to lock correlation_id=" << cid << ", " << berror(rc);
return;
}

Expand Down
4 changes: 2 additions & 2 deletions src/brpc/policy/http_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ void ProcessHttpResponse(InputMessageBase* msg) {
Controller* cntl = NULL;
const int rc = bthread_id_lock(cid, (void**)&cntl);
if (rc != 0) {
LOG_IF(FATAL, rc != EINVAL) << "Fail to lock correlation_id="
<< cid.value << ": " << berror(rc);
LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
<< "Fail to lock correlation_id=" << cid << ": " << berror(rc);
return;
}

Expand Down
4 changes: 2 additions & 2 deletions src/brpc/policy/hulu_pbrpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,8 @@ void ProcessHuluResponse(InputMessageBase* msg_base) {
Controller* cntl = NULL;
const int rc = bthread_id_lock(cid, (void**)&cntl);
if (rc != 0) {
LOG_IF(FATAL, rc != EINVAL) << "Fail to lock correlation_id="
<< cid.value << ": " << berror(rc);
LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
<< "Fail to lock correlation_id=" << cid << ": " << berror(rc);
return;
}

Expand Down
4 changes: 2 additions & 2 deletions src/brpc/policy/memcache_binary_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ void ProcessMemcacheResponse(InputMessageBase* msg_base) {
Controller* cntl = NULL;
const int rc = bthread_id_lock(cid, (void**)&cntl);
if (rc != 0) {
LOG_IF(FATAL, rc != EINVAL) << "Fail to lock correlation_id="
<< cid.value << ": " << berror(rc);
LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
<< "Fail to lock correlation_id=" << cid << ": " << berror(rc);
return;
}

Expand Down
4 changes: 2 additions & 2 deletions src/brpc/policy/nova_pbrpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ void ProcessNovaResponse(InputMessageBase* msg_base) {
Controller* cntl = NULL;
const int rc = bthread_id_lock(cid, (void**)&cntl);
if (rc != 0) {
LOG_IF(FATAL, rc != EINVAL) << "Fail to lock correlation_id="
<< cid.value << ": " << berror(rc);
LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
<< "Fail to lock correlation_id=" << cid << ": " << berror(rc);
return;
}

Expand Down
4 changes: 2 additions & 2 deletions src/brpc/policy/nshead_mcpack_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ void ProcessNsheadMcpackResponse(InputMessageBase* msg_base) {
Controller* cntl = NULL;
const int rc = bthread_id_lock(cid, (void**)&cntl);
if (rc != 0) {
LOG_IF(FATAL, rc != EINVAL) << "Fail to lock correlation_id="
<< cid.value << ": " << berror(rc);
LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
<< "Fail to lock correlation_id=" << cid << ": " << berror(rc);
return;
}

Expand Down
4 changes: 2 additions & 2 deletions src/brpc/policy/nshead_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ void ProcessNsheadResponse(InputMessageBase* msg_base) {
Controller* cntl = NULL;
const int rc = bthread_id_lock(cid, (void**)&cntl);
if (rc != 0) {
LOG_IF(FATAL, rc != EINVAL) << "Fail to lock correlation_id="
<< cid.value << ", " << berror(rc);
LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
<< "Fail to lock correlation_id=" << cid << ": " << berror(rc);
return;
}

Expand Down
4 changes: 2 additions & 2 deletions src/brpc/policy/public_pbrpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ void ProcessPublicPbrpcResponse(InputMessageBase* msg_base) {
Controller* cntl = NULL;
const int rc = bthread_id_lock(cid, (void**)&cntl);
if (rc != 0) {
LOG_IF(FATAL, rc != EINVAL) << "Fail to lock correlation_id="
<< cid.value << ": " << berror(rc);
LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
<< "Fail to lock correlation_id=" << cid << ": " << berror(rc);
return;
}

Expand Down
4 changes: 2 additions & 2 deletions src/brpc/policy/redis_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ void ProcessRedisResponse(InputMessageBase* msg_base) {
Controller* cntl = NULL;
const int rc = bthread_id_lock(cid, (void**)&cntl);
if (rc != 0) {
LOG_IF(FATAL, rc != EINVAL) << "Fail to lock correlation_id="
<< cid.value << ": " << berror(rc);
LOG_IF(FATAL, rc != EINVAL && rc != EPERM)
<< "Fail to lock correlation_id=" << cid << ": " << berror(rc);
return;
}

Expand Down
4 changes: 2 additions & 2 deletions src/brpc/policy/rtmp_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3463,8 +3463,8 @@ void OnServerStreamCreated::Run(bool error,
Controller* cntl = NULL;
const int rc = bthread_id_lock(cid, (void**)&cntl);
if (rc != 0) {
LOG_IF(FATAL, rc != EINVAL) << "Fail to lock correlation_id="
<< cid.value << ": " << berror(rc);
LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
<< "Fail to lock correlation_id=" << cid << ": " << berror(rc);
return;
}

Expand Down
4 changes: 2 additions & 2 deletions src/brpc/policy/sofa_pbrpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,8 @@ void ProcessSofaResponse(InputMessageBase* msg_base) {
Controller* cntl = NULL;
const int rc = bthread_id_lock(cid, (void**)&cntl);
if (rc != 0) {
LOG_IF(FATAL, rc != EINVAL) << "Fail to lock correlation_id="
<< cid.value << ": " << berror(rc);
LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
<< "Fail to lock correlation_id=" << cid << ": " << berror(rc);
return;
}

Expand Down
4 changes: 2 additions & 2 deletions src/brpc/policy/ubrpc2pb_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,8 @@ void ProcessUbrpcResponse(InputMessageBase* msg_base) {
Controller* cntl = NULL;
const int rc = bthread_id_lock(cid, (void**)&cntl);
if (rc != 0) {
LOG_IF(FATAL, rc != EINVAL) << "Fail to lock correlation_id="
<< cid.value << ": " << berror(rc);
LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
<< "Fail to lock correlation_id=" << cid << ": " << berror(rc);
return;
}

Expand Down
4 changes: 2 additions & 2 deletions src/brpc/rtmp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1333,7 +1333,7 @@ int RtmpStreamBase::SendAACMessage(const RtmpAACMessage& msg) {
return _rtmpsock->Write(msg2);
}

int RtmpStreamBase::SendUserMessage(void* msg) {
int RtmpStreamBase::SendUserMessage(void*) {
CHECK(false) << "You should implement your own SendUserMessage";
return 0;
}
Expand Down Expand Up @@ -1429,7 +1429,7 @@ void RtmpStreamBase::SignalError() {

void RtmpStreamBase::OnFirstMessage() {}

void RtmpStreamBase::OnUserData(void* data) {
void RtmpStreamBase::OnUserData(void*) {
LOG(INFO) << remote_side() << '[' << stream_id()
<< "] ignored UserData{}";
}
Expand Down
2 changes: 1 addition & 1 deletion src/brpc/selective_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ void SubDone::Run() {
if (rc != 0) {
// _cid must be valid because schan does not dtor before cancelling
// all sub calls.
LOG(FATAL) << "Fail to lock correlation_id="
LOG(ERROR) << "Fail to lock correlation_id="
<< _cid.value << ": " << berror(rc);
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ int Socket::ResetFileDescriptor(int fd) {

// Make the fd non-blocking.
if (base::make_non_blocking(fd) != 0) {
PLOG(FATAL) << "Fail to set fd=" << fd << " to non-blocking";
PLOG(ERROR) << "Fail to set fd=" << fd << " to non-blocking";
return -1;
}
// turn off nagling.
Expand Down

0 comments on commit 02ed49d

Please sign in to comment.