forked from SChernykh/p2pool
-
Notifications
You must be signed in to change notification settings - Fork 1
/
tcp_server.h
226 lines (164 loc) · 6.2 KB
/
tcp_server.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
/*
* This file is part of the Monero P2Pool <https://github.com/SChernykh/p2pool>
* Copyright (c) 2021-2024 SChernykh <https://github.com/SChernykh>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "uv_util.h"
#ifdef WITH_TLS
#include "tls.h"
#endif
#include <map>
namespace p2pool {
class TCPServer : public nocopy_nomove
{
public:
struct Client;
typedef Client* (*allocate_client_callback)();
TCPServer(int default_backlog, allocate_client_callback allocate_new_client, const std::string& socks5Proxy);
virtual ~TCPServer();
[[nodiscard]] bool connect_to_peer(bool is_v6, const char* ip, int port);
void drop_connections_async() { if (m_finished.load() == 0) { uv_async_send(&m_dropConnectionsAsync); } }
void shutdown_tcp();
virtual void print_status();
[[nodiscard]] uv_loop_t* get_loop() { return &m_loop; }
[[nodiscard]] virtual int external_listen_port() const { return m_listenPort; }
[[nodiscard]] bool connect_to_peer(bool is_v6, const raw_ip& ip, int port);
[[nodiscard]] bool connect_to_peer(Client* client);
virtual void on_connect_failed(bool /*is_v6*/, const raw_ip& /*ip*/, int /*port*/) {}
void ban(bool is_v6, raw_ip ip, uint64_t seconds);
virtual void print_bans();
struct Client
{
Client(char* read_buf, size_t size);
virtual ~Client() {}
virtual size_t size() const = 0;
virtual void reset();
[[nodiscard]] virtual bool on_connect() = 0;
[[nodiscard]] virtual bool on_read(char* data, uint32_t size) = 0;
[[nodiscard]] bool on_proxy_handshake(char* data, uint32_t size);
virtual void on_read_failed(int /*err*/) {}
virtual void on_disconnected() {}
static void on_alloc(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf);
static void on_write(uv_write_t* req, int status);
void close();
void ban(uint64_t seconds);
void init_addr_string();
void asan_poison_this() const;
char* m_readBuf;
uint32_t m_readBufSize;
TCPServer* m_owner;
// Used to maintain connected clients list
Client* m_prev;
Client* m_next;
uv_tcp_t m_socket;
bool m_isV6;
bool m_isIncoming;
bool m_readBufInUse;
bool m_isClosing;
uint32_t m_numRead;
enum {
ADDR_STRING_SIZE = 72,
};
raw_ip m_addr;
int m_port;
char m_addrString[ADDR_STRING_SIZE];
enum class Socks5ProxyState {
Default,
MethodSelectionSent,
ConnectRequestSent,
} m_socks5ProxyState;
std::atomic<uint32_t> m_resetCounter;
#ifdef WITH_TLS
ServerTls m_tls;
bool m_tlsChecked;
#endif
};
struct WriteBuf
{
uv_write_t m_write = {};
Client* m_client = nullptr;
void* m_data = nullptr;
size_t m_dataCapacity = 0;
};
std::multimap<size_t, WriteBuf*> m_writeBuffers;
[[nodiscard]] WriteBuf* get_write_buffer(size_t size_hint);
void return_write_buffer(WriteBuf* buf);
template<typename T>
FORCEINLINE static void parse_address_list(const std::string& address_list, T&& callback)
{
parse_address_list_internal(address_list, Callback<void, bool, const std::string&, const std::string&, int>::Derived<T>(std::move(callback)));
}
template<typename T>
[[nodiscard]] FORCEINLINE bool send(Client* client, T&& callback, bool raw = false) { return send_internal(client, Callback<size_t, uint8_t*, size_t>::Derived<T>(std::move(callback)), raw); }
private:
static void on_new_connection(uv_stream_t* server, int status);
static void on_connection_close(uv_handle_t* handle);
static void on_connection_error(uv_handle_t* handle);
static void on_connect(uv_connect_t* req, int status);
void on_new_client(uv_stream_t* server);
void on_new_client(uv_stream_t* server, Client* client);
[[nodiscard]] bool send_internal(Client* client, Callback<size_t, uint8_t*, size_t>::Base&& callback, bool raw);
allocate_client_callback m_allocateNewClient;
void close_sockets(bool listen_sockets);
static void error_invalid_ip(const std::string& address);
std::vector<uv_tcp_t*> m_listenSockets6;
std::vector<uv_tcp_t*> m_listenSockets;
protected:
virtual const char* get_log_category() const;
std::vector<uint8_t> m_callbackBuf;
int m_defaultBacklog;
uv_thread_t m_loopThread;
std::atomic<bool> m_loopThreadCreated;
static void loop(void* data);
static void parse_address_list_internal(const std::string& address_list, Callback<void, bool, const std::string&, const std::string&, int>::Base&& callback);
void start_listening(const std::string& listen_addresses, bool upnp);
bool start_listening(bool is_v6, const std::string& ip, int port, std::string address = std::string());
#ifdef WITH_UPNP
int m_portMapping;
#endif
std::string m_socks5Proxy;
bool m_socks5ProxyV6;
raw_ip m_socks5ProxyIP;
int m_socks5ProxyPort;
std::atomic<int> m_finished;
int m_listenPort;
uv_loop_t m_loop;
#ifdef P2POOL_DEBUGGING
void check_event_loop_thread(const char *func) const;
#else
static FORCEINLINE void check_event_loop_thread(const char*) {}
#endif
std::vector<Client*> m_preallocatedClients;
Client* get_client();
void return_client(Client* c);
Client* m_connectedClientsList;
std::atomic<uint32_t> m_numConnections;
std::atomic<uint32_t> m_numIncomingConnections;
uv_mutex_t m_bansLock;
unordered_map<raw_ip, std::chrono::steady_clock::time_point> m_bans;
[[nodiscard]] bool is_banned(bool is_v6, raw_ip ip);
unordered_set<raw_ip> m_pendingConnections;
uv_async_t m_dropConnectionsAsync;
static void on_drop_connections(uv_async_t* async) { reinterpret_cast<TCPServer*>(async->data)->close_sockets(false); }
virtual void on_shutdown() = 0;
uv_async_t m_shutdownAsync;
uv_prepare_t m_shutdownPrepare;
uv_timer_t m_shutdownTimer;
uint32_t m_shutdownCountdown;
uint32_t m_numHandles;
static void on_shutdown(uv_async_t* async);
};
} // namespace p2pool