Skip to content

Commit

Permalink
eye rth client updated
Browse files Browse the repository at this point in the history
  • Loading branch information
ig-or committed Mar 2, 2024
1 parent b2f9280 commit f35ba42
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 25 deletions.
67 changes: 43 additions & 24 deletions nano/eye/eth_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ bool EthClient::startClient(data_1 cb, vf ping) { // , int msTimeout
readingTheLogFile = 0;
theFile = 0;
lastReportedProgress = 0;
outbox_.clear();

cb1 = cb; // remember the callbacks
ping1 = ping;

xmprintf(9, "EthClient::startClient() starting \n");
xmprintf(5, "EthClient::startClient() starting \n");

try {
//tcp::resolver resolver(io_service);
Expand Down Expand Up @@ -116,6 +117,7 @@ void EthClient::handle_connect(const boost::system::error_code& error, const bo
} else { // Otherwise we have successfully established a connection.
xmprintf(7, "\tEthClient::handle_connect() Connected to %s \n", ep.address().to_string().c_str());
eState = esConnected;
outbox_.clear();
do_read();// Start the input actor.
start_write();// Start the heartbeat actor.
}
Expand Down Expand Up @@ -247,18 +249,17 @@ int EthClient::do_write(const char* s) {
while ((currentFileName.length() > 1) && ((currentFileName.back() == '\n') || (currentFileName.back() == '\r'))) {
currentFileName.pop_back();
}
xmprintf(0, "do_write: reading file %s [%s] \r\n", currentFileName.c_str(), s);
xmprintf(1, "do_write: reading file %s [%s] \r\n", currentFileName.c_str(), s);
}

int ret = socket_.write_some(boost::asio::buffer(s, n), error);
if (ret == 0) {
readingTheLogFile = 0;
xmprintf(0, "EthClient::do_write(%s) write error (%s)\r\n", s, error.message());
std::cout << "write error: " << error.message() << std::endl;
} else {
//xmprintf(0, "socket.write_some = %d\n", ret);
}
return ret;
std::lock_guard<std::mutex> lk(muOutbox);
outbox_.push_back(s);
boost::asio::post(io_context, [this]() { heartbeat_timer_.cancel(); });


//int ret = socket_.write_some(boost::asio::buffer(s, n), error);

return 0;
}

void EthClient::makeReconnect() {
Expand Down Expand Up @@ -515,8 +516,16 @@ void EthClient::start_write() {
return;
}

xmprintf(9, "EthClient::start_write() Start an asynchronous operation to send a heartbeat message. \n");
boost::asio::async_write(socket_, boost::asio::buffer("ping\n", 5),
xmprintf(9, "EthClient::start_write() Start an asynchronous operation to send a message. \n");
std::string s; // a string to write
std::lock_guard<std::mutex> lk(muOutbox);
if (outbox_.empty()) {
s = "ping";
} else {
s = outbox_.front();
outbox_.pop_front();
}
boost::asio::async_write(socket_, boost::asio::buffer(s),
std::bind(&EthClient::handle_write, this, _1));
}

Expand All @@ -531,17 +540,27 @@ void EthClient::handle_write(const boost::system::error_code& error) {
}

if (!error) {
xmprintf(9, "EthClient::handle_write; Wait 10 seconds before sending the next heartbeat. \n");
heartbeat_timer_.expires_after(std::chrono::seconds(10));
heartbeat_timer_.async_wait(std::bind(&EthClient::start_write, this));
} else {
xmprintf(4, "EthClient::handle_write(Error on heartbeat: error = %s) \n", error.message().c_str());
socket_.close();
xmprintf(4, "\tdisconnected ? \n");

xmprintf(4, "\t(re)connecting to teensy.... \n");
tcp::endpoint ep(boost::asio::ip::address::from_string("192.168.0.177"), 8889);
start_connect(ep);
xmprintf(9, "EthClient::handle_write; \n");
muOutbox.lock();
bool oEmpty = outbox_.empty();
muOutbox.unlock();
if (oEmpty) { // nothing to send right now.
heartbeat_timer_.expires_after(std::chrono::milliseconds(380));
heartbeat_timer_.async_wait(std::bind(&EthClient::start_write, this));
} else {
start_write(); // there is something in the queue
}
} else {
if (error.value() == boost::asio::error::operation_aborted) { // we just add something to write maybe
xmprintf(9, "EthClient::handle_write: error = operation_aborted, so queue is not empty\n");
start_write();
} else { // serious error like disconnect
xmprintf(4, "EthClient::handle_write(Error on heartbeat: error = %s) \n", error.message().c_str());
socket_.close();
xmprintf(4, "\tdisconnected ? \n");

makeReconnect();
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion nano/eye/eth_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "boost/asio.hpp"
#include <mutex>
#include <deque>
#include <condition_variable>
//#include <chrono>

Expand Down Expand Up @@ -76,10 +77,11 @@ enum EState {
data_1 cb1 = 0;
vf ping1 = 0;

std::mutex mu;
std::mutex mu, muOutbox;
std::condition_variable cv;
boost::asio::steady_timer deadline_;
boost::asio::steady_timer heartbeat_timer_;
std::deque<std::string> outbox_; /// messages to write

};

Expand Down
3 changes: 3 additions & 0 deletions nano/eye/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ static char sbuf[sbSize];
static int currentLogLevel = 7;

int xmprintf(int q, const char * s, ...) {
if (q > currentLogLevel) {
return 0;
}
std::lock_guard<std::mutex> lk(xmpMutex);
va_list args;
va_start(args, s);
Expand Down

0 comments on commit f35ba42

Please sign in to comment.