Skip to content

Commit

Permalink
Stratum: added TLS support
Browse files Browse the repository at this point in the history
  • Loading branch information
SChernykh committed Aug 5, 2024
1 parent cb8ea37 commit 127dcc0
Show file tree
Hide file tree
Showing 8 changed files with 480 additions and 46 deletions.
8 changes: 8 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ option(WITH_RANDOMX "Include the RandomX library in the build. If this is turned
option(WITH_LTO "Use link-time compiler optimization (if linking fails for you, run cmake with -DWITH_LTO=OFF)" ON)
option(WITH_UPNP "Include UPnP support. If this is turned off, p2pool will not be able to configure port forwarding on UPnP-enabled routers." ON)
option(WITH_GRPC "Include gRPC support. If this is turned off, p2pool will not be able to merge mine with Tari." ON)
option(WITH_TLS "Include TLS support. If this is turned off, p2pool will not support Stratum TLS connections." ON)

option(DEV_TEST_SYNC "[Developer only] Sync test, stop p2pool after sync is complete" OFF)
option(DEV_WITH_TSAN "[Developer only] Compile with thread sanitizer" OFF)
Expand Down Expand Up @@ -166,6 +167,13 @@ if (WITH_GRPC)
set(SOURCES ${SOURCES} src/merge_mining_client_tari.cpp)
endif()

if (WITH_GRPC AND WITH_TLS)
add_definitions(-DWITH_TLS)

set(HEADERS ${HEADERS} src/tls.h)
set(SOURCES ${SOURCES} src/tls.cpp)
endif()

source_group(TREE "${CMAKE_CURRENT_SOURCE_DIR}" PREFIX "Header Files" FILES ${HEADERS})
source_group(TREE "${CMAKE_CURRENT_SOURCE_DIR}" PREFIX "Source Files" FILES ${SOURCES})

Expand Down
1 change: 1 addition & 0 deletions cmake/grpc.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@ add_definitions(-DPROTOBUF_ENABLE_DEBUG_LOGGING_MAY_LEAK_PII=0)
add_subdirectory(external/src/grpc)

include_directories(external/src/grpc/third_party/abseil-cpp)
include_directories(external/src/grpc/third_party/boringssl-with-bazel/src/include)
include_directories(external/src/grpc/third_party/protobuf/src)
include_directories(external/src/grpc/include)
94 changes: 71 additions & 23 deletions src/stratum_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ bool StratumServer::on_login(StratumClient* client, uint32_t id, const char* log
target = std::max(target, aux_diff.target());

if (get_custom_diff(login, client->m_customDiff)) {
LOGINFO(5, "client " << log::Gray() << static_cast<char*>(client->m_addrString) << " set custom difficulty " << client->m_customDiff);
LOGINFO(5, "client " << log::Gray() << static_cast<char*>(client->m_addrString) << log::NoColor() << " set custom difficulty " << client->m_customDiff);
target = std::max(target, client->m_customDiff.target());
}
else if (m_autoDiff) {
Expand All @@ -282,7 +282,7 @@ bool StratumServer::on_login(StratumClient* client, uint32_t id, const char* log

if (get_custom_user(login, client->m_customUser)) {
const char* s = client->m_customUser;
LOGINFO(5, "client " << log::Gray() << static_cast<char*>(client->m_addrString) << " set custom user " << s);
LOGINFO(5, "client " << log::Gray() << static_cast<char*>(client->m_addrString) << log::NoColor() << " set custom user " << s);
}

uint32_t job_id;
Expand Down Expand Up @@ -541,6 +541,7 @@ void StratumServer::show_workers()
size_t n = 0;

LOGINFO(0, log::pad_right("IP:port", addr_len + 8)
<< "TLS "
<< log::pad_right("uptime", 20)
<< log::pad_right("difficulty", 20)
<< log::pad_right("hashrate", 15)
Expand All @@ -557,7 +558,15 @@ void StratumServer::show_workers()
++diff.lo;
}
}

#ifdef WITH_TLS
const bool is_tls = c->m_tls.is_empty();
#else
constexpr bool is_tls = false;
#endif

LOGINFO(0, log::pad_right(static_cast<const char*>(c->m_addrString), addr_len + 8)
<< (is_tls ? "no " : "yes ")
<< log::pad_right(log::Duration(cur_time - c->m_connectedTime), 20)
<< log::pad_right(diff, 20)
<< log::pad_right(log::Hashrate(c->m_autoDiff.lo / AUTO_DIFF_TARGET_TIME, m_autoDiff && (c->m_autoDiff != 0)), 15)
Expand Down Expand Up @@ -1080,7 +1089,8 @@ void StratumServer::on_shutdown()
}

StratumServer::StratumClient::StratumClient()
: Client(m_stratumReadBuf, sizeof(m_stratumReadBuf))
: Client(m_rawReadBuf, sizeof(m_rawReadBuf))
, m_stratumReadBufBytes(0)
, m_rpcId(0)
, m_perConnectionJobId(0)
, m_connectedTime(0)
Expand All @@ -1100,6 +1110,10 @@ StratumServer::StratumClient::StratumClient()
void StratumServer::StratumClient::reset()
{
Client::reset();

m_stratumReadBuf[0] = '\0';
m_stratumReadBufBytes = 0;

m_rpcId = 0;
m_perConnectionJobId = 0;
m_connectedTime = 0;
Expand Down Expand Up @@ -1127,35 +1141,69 @@ bool StratumServer::StratumClient::on_connect()

bool StratumServer::StratumClient::on_read(char* data, uint32_t size)
{
if ((data != m_readBuf + m_numRead) || (data + size > m_readBuf + m_readBufSize)) {
LOGERR(1, "client: invalid data pointer or size in on_read()");
ban(DEFAULT_BAN_TIME);
return false;
#ifdef WITH_TLS
if (!m_tlsChecked) {
if (data[0] == 0x16) {
if (!m_tls.init()) {
LOGWARN(5, "client " << static_cast<const char*>(m_addrString) << ": TLS init failed");
return false;
}
LOGINFO(5, "client " << log::Gray() << static_cast<const char*>(m_addrString) << log::NoColor() << " is using TLS");
}
m_tlsChecked = true;
}
#endif

m_numRead += size;
auto on_parse = [this](char* data, uint32_t size) {
if (static_cast<size_t>(m_stratumReadBufBytes) + size > STRATUM_BUF_SIZE) {
LOGWARN(4, "client " << static_cast<const char*>(m_addrString) << " sent too long Stratum message");
ban(DEFAULT_BAN_TIME);
return false;
}

char* line_start = m_readBuf;
for (char* c = data; c < m_readBuf + m_numRead; ++c) {
if (*c == '\n') {
*c = '\0';
if (!process_request(line_start, static_cast<uint32_t>(c - line_start))) {
ban(DEFAULT_BAN_TIME);
return false;
memcpy(m_stratumReadBuf + m_stratumReadBufBytes, data, size);
m_stratumReadBufBytes += size;

char* line_start = m_stratumReadBuf;
for (char *e = line_start + m_stratumReadBufBytes, *c = e - size; c < e; ++c) {
if (*c == '\n') {
*c = '\0';
if (!process_request(line_start, static_cast<uint32_t>(c - line_start))) {
ban(DEFAULT_BAN_TIME);
return false;
}
line_start = c + 1;
}
line_start = c + 1;
}
}

// Move the possible unfinished line to the beginning of m_readBuf to free up more space for reading
if (line_start != m_readBuf) {
m_numRead = static_cast<uint32_t>(m_readBuf + m_numRead - line_start);
if (m_numRead > 0) {
memmove(m_readBuf, line_start, m_numRead);
// Move the possible unfinished line to the beginning of m_stratumReadBuf to free up more space for reading
if (line_start != m_stratumReadBuf) {
m_stratumReadBufBytes = static_cast<uint32_t>(m_stratumReadBuf + m_stratumReadBufBytes - line_start);
if (m_stratumReadBufBytes > 0) {
memmove(m_stratumReadBuf, line_start, m_stratumReadBufBytes);
}
}

return true;
};

#ifdef WITH_TLS
if (!m_tls.is_empty()) {
auto on_write = [this](const uint8_t* data, size_t size) {
return m_owner->send(this, [data, size](uint8_t* buf, size_t buf_size) -> size_t {
if (buf_size < size) {
return 0;
}
memcpy(buf, data, size);
return size;
}, true);
};

return m_tls.on_read(data, size, std::move(on_parse), std::move(on_write));
}
#endif

return true;
return on_parse(data, size);
}

bool StratumServer::StratumClient::process_request(char* data, uint32_t size)
Expand Down
3 changes: 3 additions & 0 deletions src/stratum_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ class StratumServer : public TCPServer
[[nodiscard]] bool process_login(rapidjson::Document& doc, uint32_t id);
[[nodiscard]] bool process_submit(rapidjson::Document& doc, uint32_t id);

alignas(8) char m_rawReadBuf[STRATUM_BUF_SIZE];

alignas(8) char m_stratumReadBuf[STRATUM_BUF_SIZE];
uint32_t m_stratumReadBufBytes;

uint32_t m_rpcId;
uint32_t m_perConnectionJobId;
Expand Down
66 changes: 45 additions & 21 deletions src/tcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ void TCPServer::print_bans()
}
}

bool TCPServer::send_internal(Client* client, Callback<size_t, uint8_t*, size_t>::Base&& callback)
bool TCPServer::send_internal(Client* client, Callback<size_t, uint8_t*, size_t>::Base&& callback, bool raw)
{
check_event_loop_thread(__func__);

Expand All @@ -559,34 +559,50 @@ bool TCPServer::send_internal(Client* client, Callback<size_t, uint8_t*, size_t>
return false;
}

WriteBuf* buf = get_write_buffer(bytes_written);
auto on_write = [this, client](const uint8_t* data, size_t size) {
WriteBuf* buf = get_write_buffer(size);

buf->m_write.data = buf;
buf->m_client = client;
buf->m_write.data = buf;
buf->m_client = client;

if (buf->m_dataCapacity < bytes_written) {
buf->m_dataCapacity = round_up(bytes_written, 64);
buf->m_data = realloc_hook(buf->m_data, buf->m_dataCapacity);
if (!buf->m_data) {
LOGERR(0, "failed to allocate " << buf->m_dataCapacity << " bytes to send data");
PANIC_STOP();
if (buf->m_dataCapacity < size) {
buf->m_dataCapacity = round_up(size, 64);
buf->m_data = realloc_hook(buf->m_data, buf->m_dataCapacity);
if (!buf->m_data) {
LOGERR(0, "failed to allocate " << buf->m_dataCapacity << " bytes to send data");
PANIC_STOP();
}
}
}

memcpy(buf->m_data, m_callbackBuf.data(), bytes_written);
memcpy(buf->m_data, data, size);

uv_buf_t bufs[1];
bufs[0].base = reinterpret_cast<char*>(buf->m_data);
bufs[0].len = static_cast<int>(bytes_written);
uv_buf_t bufs[1];
bufs[0].base = reinterpret_cast<char*>(buf->m_data);
bufs[0].len = static_cast<int>(size);

const int err = uv_write(&buf->m_write, reinterpret_cast<uv_stream_t*>(&client->m_socket), bufs, 1, Client::on_write);
if (err) {
LOGWARN(1, "failed to start writing data to client connection " << static_cast<const char*>(client->m_addrString) << ", error " << uv_err_name(err));
return_write_buffer(buf);
return false;
const int err = uv_write(&buf->m_write, reinterpret_cast<uv_stream_t*>(&client->m_socket), bufs, 1, Client::on_write);
if (err) {
LOGWARN(1, "failed to start writing data to client connection " << static_cast<const char*>(client->m_addrString) << ", error " << uv_err_name(err));
return_write_buffer(buf);
return false;
}

return true;
};

#ifdef WITH_TLS
if (!client->m_tls.is_empty() && !raw) {
if (!client->m_tls.on_write(m_callbackBuf.data(), bytes_written, std::move(on_write))) {
LOGWARN(1, "TLS write failed to client connection " << static_cast<const char*>(client->m_addrString));
return false;
}
return true;
}
#else
(void)raw;
#endif

return true;
return on_write(m_callbackBuf.data(), bytes_written);
}

const char* TCPServer::get_log_category() const
Expand Down Expand Up @@ -999,6 +1015,9 @@ TCPServer::Client::Client(char* read_buf, size_t size)
, m_addrString{}
, m_socks5ProxyState(Socks5ProxyState::Default)
, m_resetCounter{ 0 }
#ifdef WITH_TLS
, m_tlsChecked(false)
#endif
{
m_readBuf[0] = '\0';
m_readBuf[m_readBufSize - 1] = '\0';
Expand All @@ -1023,6 +1042,11 @@ void TCPServer::Client::reset()
m_socks5ProxyState = Socks5ProxyState::Default;
m_readBuf[0] = '\0';
m_readBuf[m_readBufSize - 1] = '\0';

#ifdef WITH_TLS
m_tls.reset();
m_tlsChecked = false;
#endif
}

void TCPServer::Client::on_alloc(uv_handle_t* handle, size_t /*suggested_size*/, uv_buf_t* buf)
Expand Down
14 changes: 12 additions & 2 deletions src/tcp_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
#pragma once

#include "uv_util.h"

#ifdef WITH_TLS
#include "tls.h"
#endif

#include <map>

namespace p2pool {
Expand Down Expand Up @@ -106,6 +111,11 @@ class TCPServer : public nocopy_nomove
} m_socks5ProxyState;

std::atomic<uint32_t> m_resetCounter;

#ifdef WITH_TLS
ServerTls m_tls;
bool m_tlsChecked;
#endif
};

struct WriteBuf
Expand All @@ -128,7 +138,7 @@ class TCPServer : public nocopy_nomove
}

template<typename T>
[[nodiscard]] FORCEINLINE bool send(Client* client, T&& callback) { return send_internal(client, Callback<size_t, uint8_t*, size_t>::Derived<T>(std::move(callback))); }
[[nodiscard]] FORCEINLINE bool send(Client* client, T&& callback, bool raw = false) { return send_internal(client, Callback<size_t, uint8_t*, size_t>::Derived<T>(std::move(callback)), raw); }

private:
static void on_new_connection(uv_stream_t* server, int status);
Expand All @@ -138,7 +148,7 @@ class TCPServer : public nocopy_nomove
void on_new_client(uv_stream_t* server);
void on_new_client(uv_stream_t* server, Client* client);

[[nodiscard]] bool send_internal(Client* client, Callback<size_t, uint8_t*, size_t>::Base&& callback);
[[nodiscard]] bool send_internal(Client* client, Callback<size_t, uint8_t*, size_t>::Base&& callback, bool raw);

allocate_client_callback m_allocateNewClient;

Expand Down
Loading

0 comments on commit 127dcc0

Please sign in to comment.