Skip to content

Commit

Permalink
Merge branch 'main' of github.com:ig-or/donkey
Browse files Browse the repository at this point in the history
  • Loading branch information
ig-or committed Mar 11, 2024
2 parents caa294d + 7f0f353 commit 0d06d33
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 75 deletions.
54 changes: 45 additions & 9 deletions nano/eye/eth_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,33 @@ bool EthClient::checkThePacket(char* buf, int& len) {
return true;
}

/* static const int smBufSize = 512;
unsigned char smBuf[smBufSize];
int smBufIndex = 0;
*/
int EthClient::do_write(const unsigned char* buf, int size) {
if ((buf == 0) || (size < 1)) {
return 0;
}
if (!connectedToTeensy) {
return 0;
}
std::lock_guard<std::mutex> lk(muOutbox);
int u = smBufSize - smBufIndex - 1; // the space that we have
if (u < 10) {
xmprintf(1, "EthClient::do_write overflow 1; u = %d \n", u);
return 0;
}
if (u < size) { // overflow
xmprintf(1, "EthClient::do_write overflow 2; u = %d \n", u);
return 0;
} else { // OK
u = size;
}
memcpy(smBuf + smBufIndex, buf, u);
smBufIndex += u;
boost::asio::post(io_context, [this]() { heartbeat_timer_.cancel(); });
}

int EthClient::do_write(const char* s) {
if (s == 0) {
Expand Down Expand Up @@ -507,6 +534,7 @@ void EthClient::do_read() {

void EthClient::start_write() {
if (pleaseStop) {
xmprintf(5, "\tEthClient::start_write : pleaseStop!\n");
return;
}
if (eState != esConnected) {
Expand All @@ -517,19 +545,25 @@ void EthClient::start_write() {
xmprintf(9, "EthClient::start_write() Start an asynchronous operation to send a message. \n");
std::string s; // a string to write
std::lock_guard<std::mutex> lk(muOutbox);
if (outbox_.empty()) {
if (outbox_.empty() && (smBufIndex == 0)) {
s = "ping";
boost::asio::async_write(socket_, boost::asio::buffer(s), std::bind(&EthClient::handle_write, this, _1));
} else {
s = outbox_.front();
outbox_.pop_front();
if (smBufIndex != 0) {
boost::asio::async_write(socket_, boost::asio::buffer(smBuf, smBufIndex), std::bind(&EthClient::handle_write, this, _1));
smBufIndex = 0;
} else if (!outbox_.empty()) {
s = outbox_.front();
outbox_.pop_front();
boost::asio::async_write(socket_, boost::asio::buffer(s), std::bind(&EthClient::handle_write, this, _1));
}
}
boost::asio::async_write(socket_, boost::asio::buffer(s),
std::bind(&EthClient::handle_write, this, _1));
}

void EthClient::handle_write(const boost::system::error_code& error) {
xmprintf(9, "EthClient::handle_write(error = %s) \n", error.message().c_str());
if (pleaseStop) {
xmprintf(9, "\tEthClient::handle_write pleaseStop! \n");
return;
}
if (eState != esConnected) {
Expand All @@ -538,24 +572,26 @@ void EthClient::handle_write(const boost::system::error_code& error) {
}

if (!error) {
xmprintf(9, "EthClient::handle_write; \n");

muOutbox.lock();
bool oEmpty = outbox_.empty();
bool oEmpty = outbox_.empty() && (smBufIndex == 0);
muOutbox.unlock();
if (oEmpty) { // nothing to send right now.
xmprintf(9, "\tEthClient::handle_write: no error, waiting.. \n");
heartbeat_timer_.expires_after(std::chrono::milliseconds(380));
heartbeat_timer_.async_wait(std::bind(&EthClient::start_write, this));
} else {
xmprintf(9, "\tEthClient::handle_write: no error, calling start_write() \n");
start_write(); // there is something in the queue
}
} else {
if (error.value() == boost::asio::error::operation_aborted) { // we just add something to write maybe
xmprintf(9, "EthClient::handle_write: error = operation_aborted, so queue is not empty\n");
xmprintf(9, "\tEthClient::handle_write: error = operation_aborted, so queue is not empty\n");
start_write();
} else { // serious error like disconnect
xmprintf(4, "EthClient::handle_write(Error on heartbeat: error = %s) \n", error.message().c_str());
socket_.close();
xmprintf(4, "\tdisconnected ? \n");
xmprintf(4, "\tdisconnected ? makeReconnect!\n");

makeReconnect();
}
Expand Down
6 changes: 6 additions & 0 deletions nano/eye/eth_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class EthClient {
* \return
*/
int do_write(const char* s);
int do_write(const unsigned char* buf, int size);
bool isConnected() {
return connectedToTeensy;
}
Expand Down Expand Up @@ -86,6 +87,11 @@ enum EState {
boost::asio::steady_timer deadline_;
boost::asio::steady_timer heartbeat_timer_;
std::deque<std::string> outbox_; /// messages to write

// for writing binary info
static const int smBufSize = 512;
unsigned char smBuf[smBufSize];
int smBufIndex = 0;

};

Expand Down
2 changes: 1 addition & 1 deletion nano/eye/eye.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ void Eye::startEye() {
st = std::thread(&Eye::see, this);
}
void Eye::stopEye() {
pleaseStop = true;
if (!st.joinable()) { // already stopped?
return;
}
pleaseStop = true;
st.join();
}
void Eye::see() {
Expand Down
2 changes: 0 additions & 2 deletions nano/eye/eye.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,4 @@ class Eye : public SLidarConsumer, public EthConsumer {
void ethPing(unsigned char, unsigned int);
void see();



};
13 changes: 11 additions & 2 deletions nano/eye/lidar.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ double SLidar::updateFrontDistance(double d) {
//}
double tmp[fdSize];
memcpy(tmp, frontDistances.buf, fdSize * sizeof(double));
double a = opt_med5(tmp);
double a = 0.0;
switch (fdSize) {
case 3: a = opt_med3(tmp); break;
case 5: a = opt_med5(tmp); break;
default: break;
};
return a;
}

Expand All @@ -107,19 +112,23 @@ void SLidar::slRun() {

while (!pleaseStop) {
std::this_thread::yield();
//continue;
test = getScan(p, np);
if (!test) {
continue;
}
//xmprintf(12, "SLidar::slRun() got %d points \n", np);
lc.slScan(p, np);
//continue;
d = checkFrontDistance(p, np, calibration);
//continue;
d1 = updateFrontDistance(d);
//continue;
lc.slFrontObstacle(d1);

rCounter += 1;
}
xmprintf(5, "SLidar::slRun() stoppint the lidar \n");
xmprintf(5, "SLidar::slRun() stopping the lidar \n");
test = slStop();


Expand Down
119 changes: 71 additions & 48 deletions nano/eye/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@
#include "eth_client.h"
#include "lidar.h"
#include <xmroundbuf.h>
#include "xmessagesend.h"
#include "xmessage.h"
#ifdef G4LIDAR
#include "g4.h"
#endif
#include "eye.h"

static EthClient* cli = 0;
static SLidar* lidar = 0;
static Eye e;
static int currentLogLevel = 7; // 7

std::chrono::time_point<std::chrono::steady_clock> pingTime;
int xmprintf(int q, const char * s, ...);
Expand Down Expand Up @@ -62,6 +66,7 @@ void exitHandler(int s){
cli->StopClient();
delete cli; cli = 0;
}
e.stopEye();
xmprintf(6, "exitHandler() filished\n");
exit(0);
}
Expand All @@ -72,7 +77,7 @@ int main(int argc, char *argv[]) {
eStartTime = std::chrono::steady_clock::now();
pingInfo.clear();
pingTime = std::chrono::steady_clock::now();
xmprintf(0, "eye starting .. \n");
xmprintf(0, "starting .. currentLogLevel = %d\n", currentLogLevel);

// setup Ctrl^C
struct sigaction sigIntHandler;
Expand All @@ -81,61 +86,29 @@ int main(int argc, char *argv[]) {
sigIntHandler.sa_flags = 0;
sigaction(SIGINT, &sigIntHandler, NULL);

//cli.startClient(teeData);
Eye eye;

cli = new EthClient(eye);
cli = new EthClient(e);
std::thread tcp([&] { cli->startClient(); } );

#ifdef G4LIDAR
lidar = new G4Lidar(eye);
lidar = new G4Lidar(e);
#else
lidar = new SLidar(eye); //this will do nothing
lidar = new SLidar(e); //this will do nothing
#endif
eye.setEthClient(cli);
e.setEthClient(cli);
result = lidar->startLidar();

std::this_thread::sleep_for(200ms);
eye.startEye();
tcp.join();
return 0;
e.startEye();


//xmprintf(0, "main thread started \n");
while (1) {
std::this_thread::sleep_for(100ms);

std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
long long dt = std::chrono::duration_cast<std::chrono::milliseconds>(now - pingTime).count(); // how long ago was the last ping
if (dt > 1500) {
xmprintf(0, "ping timeout; dt = %llu milliseconds \r\n", dt);

//xmprintf(0, "111 \r\n");
xmprintf(0, "ping times: \r\n");
for (int i = 0; i < pingInfo.num; i++) {
long long dx = std::chrono::duration_cast<std::chrono::milliseconds>(now - pingInfo[i].pcTime).count();
xmprintf(0, "%d\t(%u, %u, %llu) \r\n", i, pingInfo[i].id, pingInfo[i].teensyTime, dx);
}

break;
}
if (!cli->isConnected()) {
xmprintf(0, "server disconnected \r\n");
break;
}
//xmprintf(0, "stopping tcp .. \r\n");
//cli->StopClient();
//tcp.join();
//xmprintf(0, "tcp thread finished \r\n");

while(1) {
std::this_thread::sleep_for(200ms);
}
//xmprintf(0, "222 \r\n");

//ungetc('q', stdin);
//xmprintf(0, "exiting .. ");
//std::cout << "exiting .. ";
//for (int i = 0; i < 250; i++) { ungetc('q', stdin); ungetc('\r', stdin); ungetc('\n', stdin); }

xmprintf(0, "stopping tcp .. \r\n");
cli->StopClient();
tcp.join();
xmprintf(0, "tcp thread finished \r\n");

std::this_thread::sleep_for(200ms);

xmprintf(0, "eye stop\r\n");
return 0;
Expand All @@ -145,17 +118,67 @@ void assert_failed(const char* file, unsigned int line, const char* str) {
xmprintf(0, "AF: file %s, line %d, (%s)\n", file, line, str);
}

// -----------------------------------------------------------------------------------------------------------------------

static std::mutex msgSendMutex;
const int smBufSize = maxxMessageSize*2;
unsigned char smBuf[smBufSize];
unsigned int smBufIndex = 0;
void sendMsg(const unsigned char* data, unsigned char type, unsigned short int size) {
if (cli == 0) {
return;
}
std::lock_guard<std::mutex> lk(msgSendMutex); // this might be called from different threads
smBufIndex = 0;
sendMessage(data, type, size);
//if (cli != 0) {
cli->do_write(smBuf, smBufIndex);
//}
}

/**
* this is called from inside 'sendMessage' several times.
* This function puts data into smBuf
*/
int XQSendInfo(const unsigned char* data, unsigned short int size) {

int u = smBufSize - smBufIndex - 1;
if (u < size) { // space available smaller than we need
xmprintf(1, "XQSendInfo overflow \n");
} else {
u = size;
}
memcpy(smBuf + smBufIndex, data, u);
smBufIndex += size;
return 0;
}

/**
* @brief make a message with several 'float' numbers
*
* @param type message type
* @param ... all the float numbers
* @return message size in bytes
*/
int sendFmessage(unsigned char type, int n, ...) {
va_list args;
va_start(args, n);
float* ftmp = new float[n];
for (int i = 0; i < n; i++) {
ftmp[i] = va_arg (args, double);
}
int u = n * sizeof(float);
sendMsg((const unsigned char*)ftmp, type, u);
va_end(args);
delete[] ftmp;
return u;
}

// -----------------------------------------------------------------------------------------------------------------------

static std::mutex xmpMutex;
static const int sbSize = 1024;
static char sbuf[sbSize];
static int currentLogLevel = 7;


int xmprintf(int q, const char * s, ...) {

Expand Down
Loading

0 comments on commit 0d06d33

Please sign in to comment.