Skip to content

Commit

Permalink
fix longlink task timeout when other channel longlink disconnects
Browse files Browse the repository at this point in the history
Signed-off-by: alanzyzhang <[email protected]>
  • Loading branch information
zuyuanzhang committed May 6, 2020
1 parent 9948f75 commit 7f5d3c0
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 5 deletions.
4 changes: 3 additions & 1 deletion mars/comm/socket/complexconnect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ SOCKET ComplexConnect::ConnectImpatient(const std::vector<socket_address>& _veca

xgroup2_define(group);
vecsocketfsm[i]->AfterSelect(sel, group);
xgroup2_if(!group.Empty(), TSF"index:%_, @%_, ", i, this) << group;
xgroup2_if(!group.Empty(), TSF"index:%_, @%_, status:%_,%_", i, this, vecsocketfsm[i]->Status(), vecsocketfsm[i]->CheckStatus()) << group;

if (TcpClientFSM::EEnd == vecsocketfsm[i]->Status()) {
if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(),
Expand All @@ -603,6 +603,7 @@ SOCKET ComplexConnect::ConnectImpatient(const std::vector<socket_address>& _veca
delete vecsocketfsm[i];
vecsocketfsm[i] = NULL;
lasterror = -1;
xerror2(TSF"socket error, code:%_", errcode_);
continue;
}

Expand All @@ -615,6 +616,7 @@ SOCKET ComplexConnect::ConnectImpatient(const std::vector<socket_address>& _veca
delete vecsocketfsm[i];
vecsocketfsm[i] = NULL;
lasterror = -1;
xerror2(TSF"socket error, code:%_", errcode_);
continue;
}

Expand Down
2 changes: 2 additions & 0 deletions mars/comm/socket/tcpclient_fsm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ void TcpClientFSM::AfterConnectSelect(const SocketSelect& _sel, XLogger& _log) {

error_ = socket_error(sock_);

xinfo2(TSF"socket error:%_, ", error_) >> _log;

if (0 != error_) {
xwarn2(TSF"close connect error:(%_, %_), ", error_, socket_strerror(error_)) >> _log;
end_connecttime_ = gettickcount();
Expand Down
2 changes: 1 addition & 1 deletion mars/stn/src/longlink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ void LongLink::__Run() {
}

uint64_t cur_time = gettickcount();
xinfo_function(TSF"LongLink Rebuild span:%_, net:%_", conn_profile_.disconn_time != 0 ? cur_time - conn_profile_.disconn_time : 0, getNetInfo());
xinfo_function(TSF"LongLink Rebuild span:%_, net:%_, channel name:%_", conn_profile_.disconn_time != 0 ? cur_time - conn_profile_.disconn_time : 0, getNetInfo(), config_.name);

ConnectProfile conn_profile;
conn_profile.start_time = cur_time;
Expand Down
30 changes: 27 additions & 3 deletions mars/stn/src/longlink_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ void LongLinkTaskManager::__RunLoop() {
}

void LongLinkTaskManager::__RunOnTimeout() {
xinfo_function();
std::list<TaskProfile>::iterator first = lst_cmd_.begin();
std::list<TaskProfile>::iterator last = lst_cmd_.end();

Expand Down Expand Up @@ -315,6 +316,7 @@ void LongLinkTaskManager::__RunOnTimeout() {
}

void LongLinkTaskManager::__RunOnStartTask() {
xdebug_function();
std::list<TaskProfile>::iterator first = lst_cmd_.begin();
std::list<TaskProfile>::iterator last = lst_cmd_.end();

Expand Down Expand Up @@ -584,20 +586,26 @@ void LongLinkTaskManager::__BatchErrorRespHandle(const std::string& _name, ErrCm

if (kTaskFailHandleSessionTimeout == _fail_handle || kTaskFailHandleRetryAllTasks == _fail_handle) {
__Disconnect(_name, LongLink::kDecodeErr);
MessageQueue::CancelMessage(asyncreg_.Get(), 0);
if (!__OtherChannelHasRunningTask(_name)) {
MessageQueue::CancelMessage(asyncreg_.Get(), 0);
}
retry_interval_ = 0;
}

if (kTaskFailHandleDefault == _fail_handle) {
if (kEctDns != _err_type && kEctSocket != _err_type) { // not longlink callback
__Disconnect(_name, LongLink::kDecodeErr);
}
MessageQueue::CancelMessage(asyncreg_.Get(), 0);
if (!__OtherChannelHasRunningTask(_name)) {
MessageQueue::CancelMessage(asyncreg_.Get(), 0);
}
}

if (kEctNetMsgXP == _err_type) {
__Disconnect(_name, LongLink::kTaskTimeout);
MessageQueue::CancelMessage(asyncreg_.Get(), 0);
if (!__OtherChannelHasRunningTask(_name)) {
MessageQueue::CancelMessage(asyncreg_.Get(), 0);
}
}
}

Expand Down Expand Up @@ -883,3 +891,19 @@ void LongLinkTaskManager::__DumpLongLinkChannelInfo() {
xinfo2(TSF"longlink channel name:%_, null:%_", item.first, item.second == nullptr);
}
}

bool LongLinkTaskManager::__OtherChannelHasRunningTask(const std::string& _name) {
ScopedLock lock(meta_mutex_);
if (lst_cmd_.empty()) {
xinfo2(TSF"there is no task in task list");
return false;
}
for (auto item : lst_cmd_) {
if (item.task.channel_name != _name && item.running_id) {
xinfo2(TSF"find running task, id:%_, channel name:%_", item.task.taskid, item.task.channel_name);
return true;
}
}
return false;
}

1 change: 1 addition & 0 deletions mars/stn/src/longlink_task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class LongLinkTaskManager {
void __Disconnect(const std::string& _name, LongLink::TDisconnectInternalCode code);
void __RedoTasks(const std::string& _name);
void __DumpLongLinkChannelInfo();
bool __OtherChannelHasRunningTask(const std::string& _name);

private:
MessageQueue::ScopeRegister asyncreg_;
Expand Down

0 comments on commit 7f5d3c0

Please sign in to comment.