Skip to content

Commit

Permalink
Receiver now sends periodic heart-beats to keep the sender waiting
Browse files Browse the repository at this point in the history
Summary: WdtOptions.h has details

Reviewed By: nedelchev

Differential Revision: D4660004

fbshipit-source-id: e374e22235e1866a57d02d3944f63842bd05bfae
  • Loading branch information
uddipta authored and facebook-github-bot committed Mar 8, 2017
1 parent 84fd93c commit 62be7c8
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 7 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ cmake_minimum_required(VERSION 3.2)
# There is no C per se in WDT but if you use CXX only here many checks fail
# Version is Major.Minor.YYMMDDX for up to 10 releases per day (X from 0 to 9)
# Minor currently is also the protocol version - has to match with Protocol.cpp
project("WDT" LANGUAGES C CXX VERSION 1.28.1702210)
project("WDT" LANGUAGES C CXX VERSION 1.29.1703040)

# On MacOS this requires the latest (master) CMake (and/or CMake 3.1.1/3.2)
# WDT itself works fine with C++11 (gcc 4.8 for instance) but more recent folly
Expand Down
7 changes: 5 additions & 2 deletions Protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
#include <wdt/WdtOptions.h>
#include <wdt/util/SerializationUtil.h>

#include <algorithm>

namespace facebook {
namespace wdt {

Expand All @@ -34,6 +32,7 @@ const int Protocol::ENCRYPTION_V1_VERSION = 23;
const int Protocol::INCREMENTAL_TAG_VERIFICATION_VERSION = 25;
const int Protocol::DELETE_CMD_VERSION = 26;
const int Protocol::VARINT_CHANGE = 27;
const int Protocol::HEART_BEAT_VERSION = 29;

/* All methods of Protocol class are static (functions) */

Expand Down Expand Up @@ -472,6 +471,9 @@ bool Protocol::encodeSettings(int senderProtocolVersion, char *dest,
if (settings.blockModeDisabled) {
flags |= (1 << 2);
}
if (settings.enableHeartBeat) {
flags |= (1 << 3);
}
if (off >= max) {
return false;
}
Expand Down Expand Up @@ -513,6 +515,7 @@ bool Protocol::decodeSettings(int protocolVersion, char *src, int64_t &off,
settings.enableChecksum = flags & 1;
settings.sendFileChunks = flags & (1 << 1);
settings.blockModeDisabled = flags & (1 << 2);
settings.enableHeartBeat = flags & (1 << 3);
br.pop_front();
}
off += offset(br, obr);
Expand Down
5 changes: 5 additions & 0 deletions Protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ struct Settings {
bool sendFileChunks{0};
/// whether block mode is disabled
bool blockModeDisabled{false};
/// whether heart-beat is enabled
bool enableHeartBeat{false};
};

class Protocol {
Expand Down Expand Up @@ -266,6 +268,8 @@ class Protocol {
static const int DELETE_CMD_VERSION;
/// version from which we switched varint to better one
static const int VARINT_CHANGE;
/// version from which heart-beat was introduced
static const int HEART_BEAT_VERSION;

/// Both version, magic number and command byte
enum CMD_MAGIC {
Expand All @@ -286,6 +290,7 @@ class Protocol {
// number of checkpoints for local checkpoint is 1, we can treat
// 0x01 to be a separate cmd
ENCRYPTION_CMD = 0x65, // (e)ncryption
HEART_BEAT_CMD = 0x48, // (H)eart-beat
};

// TODO: move the rest of those definitions closer to where they need to be
Expand Down
37 changes: 37 additions & 0 deletions ReceiverThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ ReceiverState ReceiverThread::processSettingsCmd() {
senderReadTimeout_ = settings.readTimeoutMillis;
senderWriteTimeout_ = settings.writeTimeoutMillis;
isBlockMode_ = !settings.blockModeDisabled;
enableHeartBeat_ = settings.enableHeartBeat;
if (!enableHeartBeat_) {
WTLOG(INFO) << "Disabling heart-beat as sender does not support it";
}
curConnectionVerified_ = true;

// determine footer type
Expand All @@ -359,6 +363,25 @@ ReceiverState ReceiverThread::processSettingsCmd() {
return READ_NEXT_CMD;
}

void ReceiverThread::sendHeartBeat() {
if (!enableHeartBeat_) {
return;
}
const auto now = Clock::now();
const int timeSinceLastHeartBeatMs = durationMillis(now - lastHeartBeatTime_);
const int heartBeatIntervalMs = (senderReadTimeout_ / kWaitTimeoutFactor);
if (timeSinceLastHeartBeatMs <= heartBeatIntervalMs) {
return;
}
lastHeartBeatTime_ = now;
// time to send a heart beat
char buf = Protocol::HEART_BEAT_CMD;
const int written = socket_->write(&buf, 1);
if (written != 1) {
WTLOG(WARNING) << "Failed to send heart-beat " << written;
}
}

/***PROCESS_FILE_CMD***/
ReceiverState ReceiverThread::processFileCmd() {
WTVLOG(1) << "entered PROCESS_FILE_CMD state";
Expand Down Expand Up @@ -393,6 +416,8 @@ ReceiverState ReceiverThread::processFileCmd() {
headerLen = folly::Endian::little(headerLen);
WVLOG(2) << "Processing FILE_CMD, header len " << headerLen;

sendHeartBeat();

if (headerLen > numRead_) {
int64_t end = oldOffset_ + numRead_;
numRead_ =
Expand Down Expand Up @@ -451,6 +476,8 @@ ReceiverState ReceiverThread::processFileCmd() {
}
});

sendHeartBeat();

// writer.open() deletes files if status == TO_BE_DELETED
// therefore if !(!delete_extra_files && status == TO_BE_DELETED)
// we should skip writer.open() call altogether
Expand Down Expand Up @@ -480,6 +507,9 @@ ReceiverState ReceiverThread::processFileCmd() {
// on the network
throttler->limit(*threadCtx_, toWrite + headerBytes);
}

sendHeartBeat();

ErrorCode code = ERROR;
if (toWrite > 0) {
code = writer.write(buf_ + off_, toWrite);
Expand All @@ -499,6 +529,9 @@ ReceiverState ReceiverThread::processFileCmd() {
threadStats_.setLocalErrorCode(ABORT);
return FINISH_WITH_ERROR;
}

sendHeartBeat();

int64_t nres = readAtMost(*socket_, buf_, bufSize_,
blockDetails.dataSize - writer.getTotalWritten());
if (nres <= 0) {
Expand All @@ -513,6 +546,9 @@ ReceiverState ReceiverThread::processFileCmd() {
if (footerType_ == CHECKSUM_FOOTER) {
checksum = folly::crc32c((const uint8_t *)buf_, nres, checksum);
}

sendHeartBeat();

code = writer.write(buf_, nres);
if (code != OK) {
threadStats_.setLocalErrorCode(code);
Expand Down Expand Up @@ -552,6 +588,7 @@ ReceiverState ReceiverThread::processFileCmd() {
numRead_ = off_ = 0;
}
if (footerType_ == CHECKSUM_FOOTER) {
sendHeartBeat();
// have to read footer cmd
oldOffset_ = off_;
numRead_ = readAtLeast(*socket_, buf_ + off_, bufSize_ - off_,
Expand Down
4 changes: 4 additions & 0 deletions ReceiverThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ class ReceiverThread : public WdtThread {
/// verifies received blocks which are not already verified
void markReceivedBlocksVerified();

/// checks whether heart-beat is enabled, and whether it is time to send
/// another heart-beat, and if yes, sends a heart-beat
void sendHeartBeat();

/// Mapping from receiver states to state functions
static const StateFunction stateMap_[];

Expand Down
58 changes: 57 additions & 1 deletion SenderThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <folly/Checksum.h>
#include <folly/Conv.h>
#include <folly/Memory.h>
#include <folly/ScopeGuard.h>
#include <folly/String.h>
#include <sys/stat.h>
#include <wdt/Sender.h>
Expand Down Expand Up @@ -195,13 +194,25 @@ SenderState SenderThread::sendSettings() {
int64_t off = 0;
buf_[off++] = Protocol::SETTINGS_CMD;
bool sendFileChunks = wdtParent_->isSendFileChunks();
enableHeartBeat_ = false;
if (options_.enable_heart_beat) {
if (threadProtocolVersion_ < Protocol::HEART_BEAT_VERSION) {
WTLOG(INFO) << "Disabling heart beat because of the receiver version is "
<< threadProtocolVersion_;
} else {
enableHeartBeat_ = true;
}
}
enableHeartBeat_ = (threadProtocolVersion_ >= Protocol::HEART_BEAT_VERSION &&
options_.enable_heart_beat);
Settings settings;
settings.readTimeoutMillis = readTimeoutMillis;
settings.writeTimeoutMillis = writeTimeoutMillis;
settings.transferId = wdtParent_->getTransferId();
settings.enableChecksum = (footerType_ == CHECKSUM_FOOTER);
settings.sendFileChunks = sendFileChunks;
settings.blockModeDisabled = (options_.block_size_mbytes <= 0);
settings.enableHeartBeat = enableHeartBeat_;
Protocol::encodeSettings(threadProtocolVersion_, buf_, off,
Protocol::kMaxSettings, settings);
int64_t toWrite = sendFileChunks ? Protocol::kMinBufLength : off;
Expand All @@ -215,6 +226,41 @@ SenderState SenderThread::sendSettings() {
return (sendFileChunks ? READ_FILE_CHUNKS : SEND_BLOCKS);
}

const int kHeartBeatReadTimeFactor = 10;

ErrorCode SenderThread::readHeartBeats() {
if (!enableHeartBeat_) {
return OK;
}
const auto now = Clock::now();
const int timeSinceLastHeartBeatMs = durationMillis(now - lastHeartBeatTime_);
const int heartBeatIntervalMs =
(options_.read_timeout_millis * kHeartBeatReadTimeFactor);
if (timeSinceLastHeartBeatMs <= heartBeatIntervalMs) {
return OK;
}
lastHeartBeatTime_ = now;
// time to read heart-beats
const int numRead = socket_->read(buf_, bufSize_,
/* don't try to read all the data */ false);
if (numRead <= 0) {
WTLOG(ERROR) << "Failed to read heart-beat " << numRead;
return SOCKET_READ_ERROR;
}
for (int i = 0; i < numRead; i++) {
const char receivedCmd = buf_[i];
if (receivedCmd != Protocol::HEART_BEAT_CMD) {
WTLOG(ERROR) << "Received " << receivedCmd
<< " instead of heart-beat cmd";
return PROTOCOL_ERROR;
}
}
if (!isTty_) {
WTLOG(INFO) << "Received " << numRead << " heart-beats";
}
return OK;
}

SenderState SenderThread::sendBlocks() {
WTVLOG(1) << "entered SEND_BLOCKS state";
ThreadTransferHistory &transferHistory = getTransferHistory();
Expand All @@ -226,6 +272,9 @@ SenderState SenderThread::sendBlocks() {
std::unique_ptr<ByteSource> source =
dirQueue_->getNextSource(threadCtx_.get(), transferStatus);
if (!source) {
// try to read any buffered heart-beats
readHeartBeats();

return SEND_DONE_CMD;
}
WDT_CHECK(!source->hasError());
Expand Down Expand Up @@ -280,6 +329,7 @@ TransferStats SenderThread::sendOneByteSource(
stats.incrFailedAttempts();
return stats;
}

stats.addHeaderBytes(written);
int64_t byteSourceHeaderBytes = written;
int64_t throttlerInstanceBytes = byteSourceHeaderBytes;
Expand All @@ -288,6 +338,9 @@ TransferStats SenderThread::sendOneByteSource(
<< folly::humanify(std::string(headerBuf, off));
int32_t checksum = 0;
while (!source->finished()) {
// TODO: handle protocol errors from readHeartBeats
readHeartBeats();

int64_t size;
char *buffer = source->read(size);
if (source->hasError()) {
Expand Down Expand Up @@ -643,6 +696,9 @@ SenderState SenderThread::readReceiverCmd() {
WDT_CHECK_EQ(OK, errCode);
return READ_RECEIVER_CMD;
}
if (cmd == Protocol::HEART_BEAT_CMD) {
return READ_RECEIVER_CMD;
}
WTLOG(ERROR) << "Read unexpected receiver cmd " << cmd << " port " << port_;
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return END;
Expand Down
8 changes: 8 additions & 0 deletions SenderThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class SenderThread : public WdtThread {
threadAbortChecker_ = std::make_unique<SocketAbortChecker>(this);
threadCtx_->setAbortChecker(threadAbortChecker_.get());
threadStats_.setId(folly::to<std::string>(threadIndex_));
isTty_ = isatty(STDERR_FILENO);
}

typedef SenderState (SenderThread::*StateFunction)();
Expand Down Expand Up @@ -276,9 +277,16 @@ class SenderThread : public WdtThread {
TransferStats sendOneByteSource(const std::unique_ptr<ByteSource> &source,
ErrorCode transferStatus);

/// checks to see if heart-beat is enabled, and if it is time to read
/// heart-beats, and if yes, reads heart-beats
ErrorCode readHeartBeats();

/// mapping from sender states to state functions
static const StateFunction stateMap_[];

/// whether stderr is tty
bool isTty_{false};

/// Negotiated protocol of the sender thread
int negotiatedProtocol_{-1};

Expand Down
6 changes: 3 additions & 3 deletions WdtConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
#include <fcntl.h>

#define WDT_VERSION_MAJOR 1
#define WDT_VERSION_MINOR 28
#define WDT_VERSION_BUILD 1702210
#define WDT_VERSION_MINOR 29
#define WDT_VERSION_BUILD 1703040
// Add -fbcode to version str
#define WDT_VERSION_STR "1.28.1702210-fbcode"
#define WDT_VERSION_STR "1.29.1703040-fbcode"
// Tie minor and proto version
#define WDT_PROTOCOL_VERSION WDT_VERSION_MINOR

Expand Down
16 changes: 16 additions & 0 deletions WdtOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,22 @@ class WdtOptions {
*/
bool skip_fadvise{false};

/**
* If true, periodic heart-beats from receiver to sender is enabled.
* The heart-beat interval is determined by the socket read timeout of the
* sender.
* WDT senders streams data and only waits for a receiver response after
* all the blocks are sent. Because of the high socket buffer sizes, it might
* take a long time for receiver to process all the bytes sent. So, for slower
* threads, there is significant difference between the time receiver
* processes all the bytes and the time sender finishes sending all the bytes.
* So, the sender might time out while waiting for the response from receiver.
* This happens a lot more for disks because of the lower io throughput.
* To solve this, receiver sends heart-beats to signal that it is sill
* processing data, and sender waits will it is still getting heart-beats.
*/
bool enable_heart_beat{true};

/**
* @return whether files should be pre-allocated or not
*/
Expand Down
6 changes: 6 additions & 0 deletions WdtThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class WdtThread {
buf_ = buffer->getData();
bufSize_ = buffer->getSize();
threadIndex_ = threadCtx_->getThreadIndex();
lastHeartBeatTime_ = Clock::now();
}
/// Starts a thread which runs the wdt functionality
void startThread();
Expand Down Expand Up @@ -94,6 +95,11 @@ class WdtThread {
/// Copy of the protocol version that might be changed
int threadProtocolVersion_;

/// whether heart-beat is enabled
bool enableHeartBeat_{false};

Clock::time_point lastHeartBeatTime_;

/// possible footer types
enum FooterType {
NO_FOOTER,
Expand Down
2 changes: 2 additions & 0 deletions util/WdtFlags.cpp.inc
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,5 @@ WDT_OPT(
WDT_OPT(skip_fadvise, bool, "If true, fadvise is skipped after block write");
WDT_OPT(fsync, bool,
"If true, each file is fsync'ed after its last block is received");
WDT_OPT(enable_heart_beat, bool,
"If true, periodic heart-beat from receiver to sender is enabled.");

0 comments on commit 62be7c8

Please sign in to comment.