Skip to content

Commit

Permalink
Sserver: Add a timing check mechanism for chain switching.
Browse files Browse the repository at this point in the history
  • Loading branch information
SwimmingTiger committed Sep 11, 2019
1 parent ed06e5f commit fad3da1
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 34 deletions.
170 changes: 136 additions & 34 deletions src/UserInfo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ UserInfo::UserInfo(StratumServer *server, const libconfig::Config &config)
config.lookupValue(
"users.auto_reg_max_pending_users", autoRegMaxPendingUsers_);
config.lookupValue("users.zookeeper_auto_reg_watch_dir", zkAutoRegWatchDir_);
config.lookupValue(
"users.namechains_check_interval", nameChainsCheckIntervalSeconds_);

LOG(INFO) << "UserInfo: user name will be case "
<< (caseInsensitive_ ? "insensitive" : "sensitive");
Expand Down Expand Up @@ -119,6 +121,10 @@ UserInfo::~UserInfo() {
if (chain.threadUpdate_.joinable())
chain.threadUpdate_.join();
}

if (nameChainsCheckingThread_.joinable()) {
nameChainsCheckingThread_.join();
}
}

void UserInfo::stop() {
Expand Down Expand Up @@ -158,10 +164,19 @@ bool UserInfo::getChainIdFromZookeeper(
DLOG(INFO) << "zk userchain map: " << userName << " : " << chainName;
for (chainId = 0; chainId < server_->chains_.size(); chainId++) {
if (chainName == server_->chains_[chainId].name_) {
// add to cache
std::unique_lock<std::shared_timed_mutex> l{nameChainlock_};
nameChains_[userName] = chainId;
return true;
ChainVars &chain = chains_[chainId];
std::shared_lock<std::shared_timed_mutex> l{*chain.nameIdlock_};
auto itr = chain.nameIds_.find(userName);
if (itr != chain.nameIds_.end()) {
// add to cache
std::unique_lock<std::shared_timed_mutex> l{nameChainlock_};
nameChains_[userName] = chainId;
return true;
} else {
LOG(ERROR) << "Userlist for chain " << server_->chainName(chainId)
<< " missing user " << userName;
return false;
}
}
}
// cannot find the chain, warning and ignore it
Expand All @@ -184,6 +199,9 @@ bool UserInfo::getChainIdFromZookeeper(

void UserInfo::setZkReconnectHandle() {
zk_->registerReconnectHandle([this]() {
LOG(WARNING)
<< "zookeeper reconnected, trigger SwitchChainEvent for all users";

// Chain switching while holding a lock can result in a deadlock.
// So release the lock immediately after copying.
nameChainlock_.lock_shared();
Expand All @@ -192,8 +210,7 @@ void UserInfo::setZkReconnectHandle() {

// Check the current chain of all users
for (auto item : nameChains) {
string path = zkUserChainMapDir_ + item.first;
handleSwitchChainEvent(nullptr, 0, 0, path.c_str(), this);
handleSwitchChainEvent(item.first);
}
});
}
Expand All @@ -217,59 +234,61 @@ void UserInfo::handleSwitchChainEvent(
}

string userName = nodePath.substr(userInfo->zkUserChainMapDir_.size());
userInfo->handleSwitchChainEvent(userName);
}

void UserInfo::handleSwitchChainEvent(const string &userName) {

// lookup cache
std::shared_lock<std::shared_timed_mutex> l{userInfo->nameChainlock_};
auto itr = userInfo->nameChains_.find(userName);
if (itr == userInfo->nameChains_.end()) {
std::shared_lock<std::shared_timed_mutex> l{nameChainlock_};
auto itr = nameChains_.find(userName);
if (itr == nameChains_.end()) {
LOG(INFO) << "No workers of user " << userName
<< " online, switching request will be ignored";
return;
}
size_t currentChainId = itr->second;
l.unlock();

size_t newChainId;
if (!userInfo->getChainIdFromZookeeper(userName, newChainId)) {
size_t newChainId = 0;
if (!getChainIdFromZookeeper(userName, newChainId)) {
LOG(ERROR) << "UserInfo::handleZookeeperEvent(): cannot get chain id from "
"zookeeper, switching request will be ignored";
return;
}
if (currentChainId == newChainId) {
LOG(INFO) << "Ignore empty switching request for user '" << userName
<< "': " << userInfo->server_->chainName(currentChainId) << " -> "
<< userInfo->server_->chainName(newChainId);
<< "': " << server_->chainName(currentChainId) << " -> "
<< server_->chainName(newChainId);
return;
}

const int32_t newUserId = userInfo->getUserId(newChainId, userName);
const int32_t newUserId = getUserId(newChainId, userName);
if (newUserId <= 0) {
LOG(INFO) << "Ignore switching request: cannot find user id, chainId: "
<< newChainId << ", userName: " << userName;
return;
}

userInfo->server_->dispatch(
[userInfo, userName, currentChainId, newChainId]() {
size_t onlineSessions =
userInfo->server_->switchChain(userName, newChainId);
server_->dispatch([this, userName, currentChainId, newChainId]() {
size_t onlineSessions = server_->switchChain(userName, newChainId);

if (onlineSessions == 0) {
LOG(INFO) << "No workers of user " << userName
<< " online, subsequent switching request will be ignored";
// clear cache
std::unique_lock<std::shared_timed_mutex> l{userInfo->nameChainlock_};
auto itr = userInfo->nameChains_.find(userName);
if (itr != userInfo->nameChains_.end()) {
userInfo->nameChains_.erase(itr);
}
}
if (onlineSessions == 0) {
LOG(INFO) << "No workers of user " << userName
<< " online, subsequent switching request will be ignored";
// clear cache
std::unique_lock<std::shared_timed_mutex> l{nameChainlock_};
auto itr = nameChains_.find(userName);
if (itr != nameChains_.end()) {
nameChains_.erase(itr);
}
}

LOG(INFO) << "User '" << userName << "' (" << onlineSessions
<< " miners) switched chain: "
<< userInfo->server_->chainName(currentChainId) << " -> "
<< userInfo->server_->chainName(newChainId);
});
LOG(INFO) << "User '" << userName << "' (" << onlineSessions
<< " miners) switched chain: "
<< server_->chainName(currentChainId) << " -> "
<< server_->chainName(newChainId);
});
}

bool UserInfo::getChainId(const string &userName, size_t &chainId) {
Expand Down Expand Up @@ -297,12 +316,14 @@ bool UserInfo::getChainId(const string &userName, size_t &chainId) {
// The first chain's id that find the user will be returned.
for (chainId = 0; chainId < chains_.size(); chainId++) {
ChainVars &chain = chains_[chainId];

std::shared_lock<std::shared_timed_mutex> l{*chain.nameIdlock_};
auto itr = chain.nameIds_.find(userName);
if (itr != chain.nameIds_.end()) {
// chainId has been assigned to the correct value
DLOG(INFO) << "userName: " << userName << ", chainId: " << chainId;
// add to cache
std::unique_lock<std::shared_timed_mutex> l{nameChainlock_};
nameChains_[userName] = chainId;
return true;
}
}
Expand Down Expand Up @@ -506,9 +527,90 @@ bool UserInfo::setupThreads() {
std::thread(&UserInfo::runThreadUpdate, this, chainId);
}

if (chains_.size() > 1) {
nameChainsCheckingThread_ =
std::thread(std::bind(&UserInfo::checkNameChains, this));
}

return true;
}

bool /*isInterrupted*/ UserInfo::interruptibleSleep(time_t seconds) {
const time_t sleepEnd = time(nullptr) + seconds;
while (time(nullptr) < sleepEnd) {
if (!running_) {
return true;
}
std::this_thread::sleep_for(1s);
}
return false;
}

void UserInfo::checkNameChains() {
LOG(INFO) << "UserInfo::checkNameChains running...";

if (interruptibleSleep(nameChainsCheckIntervalSeconds_))
return;

while (running_) {
// Chain switching while holding a lock can result in a deadlock.
// So release the lock immediately after copying.
nameChainlock_.lock_shared();
std::unordered_map<string, size_t> nameChains = nameChains_;
nameChainlock_.unlock_shared();

if (nameChains.empty()) {
if (interruptibleSleep(nameChainsCheckIntervalSeconds_))
return;
continue;
}

time_t eachUserSleepMillisecond =
nameChainsCheckIntervalSeconds_ * 1000 / nameChains.size();
if (eachUserSleepMillisecond == 0)
eachUserSleepMillisecond = 1;
LOG(INFO) << "UserInfo::checkNameChains checking, each user sleep "
<< eachUserSleepMillisecond << "ms";

for (auto itr : nameChains) {
if (!running_) {
return;
}

try {
const string &userName = itr.first;
const size_t chainId = itr.second;
const string &chainName = server_->chains_[chainId].name_;

string nodePath = zkUserChainMapDir_ + userName;
string newChainName = zk_->getValue(nodePath, 64);

if (chainName == newChainName) {
DLOG(INFO) << "User does not switch chains, user: " << userName
<< ", chain: " << chainName;
} else {
LOG(INFO) << "User switched the chain, user: " << userName
<< ", chains: " << chainName << " -> " << newChainName;
handleSwitchChainEvent(userName);
}

} catch (const std::exception &ex) {
LOG(ERROR) << "UserInfo::checkNameChains(): zk_->getValue() failed: "
<< ex.what();
} catch (...) {
LOG(ERROR) << "UserInfo::checkNameChains(): unknown exception";
}

if (eachUserSleepMillisecond > 5000) {
if (interruptibleSleep(eachUserSleepMillisecond / 1000))
return;
} else {
std::this_thread::sleep_for(eachUserSleepMillisecond * 1ms);
}
}
}
}

void UserInfo::handleAutoRegEvent(
zhandle_t *zh, int type, int state, const char *path, void *pUserInfo) {
if (path == nullptr || pUserInfo == nullptr) {
Expand Down
6 changes: 6 additions & 0 deletions src/UserInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,17 @@ class UserInfo {
// username -> chainId
std::unordered_map<string, size_t> nameChains_;

int nameChainsCheckIntervalSeconds_ = 300;
std::thread nameChainsCheckingThread_;

void runThreadUpdate(size_t chainId);
int32_t incrementalUpdateUsers(size_t chainId);
void checkNameChains();
bool /*isInterrupted*/ interruptibleSleep(time_t seconds);

bool getChainIdFromZookeeper(const string &userName, size_t &chainId);
void setZkReconnectHandle();
void handleSwitchChainEvent(const string &userName);
static void handleSwitchChainEvent(
zhandle_t *zh, int type, int state, const char *path, void *pUserInfo);
static void handleAutoRegEvent(
Expand Down
5 changes: 5 additions & 0 deletions src/bitcoin/cfg/sserver(multi-chains).cfg
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,9 @@ users = {
# Example, a worker "user2.11x20" will become "user1.user2.11x20".
#single_user_mode = false;
#single_user_name = "user1";

# The interval that checking if a user's chain be switched
# The timing check is just a backup measure to ensure successful chain switching.
# Real-time zookeeper event notification is the main measure.
namechains_check_interval = 300;
};

0 comments on commit fad3da1

Please sign in to comment.