forked from rachoA/txchain
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.cpp
258 lines (212 loc) · 5.61 KB
/
main.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
//
// Usage: tx [-cPORT] maxnode publish_port peer1:port1 [peer2:port2 ...]
// -cPORT : Client input mode (auto data generation mode disabled)
// ex:
// tx 4 7000 192.168.1.10:7000 192.168.1.10:7001 192.168.1.10:7002 192.168.1.10:7003
// tx 4 7001 192.168.1.10:7000 192.168.1.10:7001 192.168.1.10:7002 192.168.1.10:7003
// tx 4 7002 192.168.1.10:7000 192.168.1.10:7001 192.168.1.10:7002 192.168.1.10:7003
// tx 4 7003 192.168.1.10:7000 192.168.1.10:7001 192.168.1.10:7002 192.168.1.10:7003
//
#include "txcommon.h"
int _nverifier = MAX_VERIFIER;
int _automode = 1; // auto data generation mode (client input disabled)
int _clientport = DEFAULT_CLIENT_PORT;
int _maxnode = 1; //
int _chainport = DEFAULT_CHAIN_PORT;
int _npeer = 0;
char _peerlist[MAX_NODE + 1][40] = {0};
Params_type_t _netparams;
safe_queue<txdata_t> _sendq; // send queue for publisher
safe_queue<txdata_t> _verifyq; // stores received tx for verifier
safe_queue<txdata_t> _mempoolq; // stores verifier result
safe_queue<txdata_t> _consensusq; // verification result queue
vector<txdata_t> _mempool; // mempool
map<string, txdata_t> _mempoolmap; // mempool index (key=txid)
void parse_command_line(int ac, char *av[]);
void create_main_threads();
void create_subscriber_threads();
void create_verifier_threads(int nverifiers);
int main(int ac, char *av[])
{
parse_command_line(ac, av);
printf("Start txchain main: pid=%d\n", getpid());
_sendq.setmax(10000);
_verifyq.setmax(10000);
_mempoolq.setmax(10000);
_consensusq.setmax(10000);
_mempool.resize(10000);
// load params set
_netparams = load_params("../lib/params.dat");
create_main_threads();
create_subscriber_threads(); // SUBSCRIBER
create_verifier_threads(_nverifier); // VERIFIER
while (1)
{
sleep(1);
fflush(stdout);
fflush(stderr);
}
return 0;
}
//
// Parsing command line parameters
//
// Usage: tx maxnode publish_port peer1:port1 [peer2:port2 ...]
//
void parse_command_line(int ac, char *av[])
{
int ii = 0;
if (ac >= 2 && strncmp(av[1], "-c", 2) == 0)
{
_automode = 0;
_clientport = atoi(&av[1][2]);
if (_clientport <= 0)
_clientport = DEFAULT_CLIENT_PORT;
ac--, av++;
}
printf("Auto mode = %d\n", _automode);
printf("Client port = %d\n", _clientport);
// get maxnode
if (ac >= 2 && atoi(av[1]) > 0)
{
_maxnode = atoi(av[1]);
if (_maxnode > 100)
_maxnode = 100; // ?ִ? node?? 100????..
ac--, av++;
}
printf("Max node = %d\n", _maxnode);
// get chain port
if (ac >= 2 && atoi(av[1]) > 0)
{
_chainport = atoi(av[1]);
if (_chainport <= 10 || _chainport > 65535)
{
fprintf(stderr, "ERROR: port range error (1 ~ 65535)\n");
exit(-1);
}
ac--, av++;
}
printf("Chain port = %d\n", _chainport);
// get peer list
for (ii = 1; ii < ac && _npeer < MAX_NODE; ii++)
{
if (_npeer > _maxnode)
{
fprintf(stderr, "WARNING: Number of peer %d > Max node %d\n", _npeer, _maxnode);
continue;
}
strcpy(_peerlist[_npeer], av[ii]);
printf("peer[%d] = %s\n", _npeer, _peerlist[_npeer]);
_npeer++;
}
printf("\n\n");
}
void create_main_threads()
{
pthread_t thrid;
int ret = 0;
// Send thread
printf("Create [publisher] thread: sendport=%d\n", _chainport);
ret = pthread_create(&thrid, NULL, thread_publisher, (void *)&_chainport);
if (ret < 0)
{
perror("thread_publisher() thread creation error");
exit(-1);
}
pthread_detach(thrid);
sleepms(1);
// Send test thread
if (_automode)
{
printf("Create [send test] thread\n");
ret = pthread_create(&thrid, NULL, thread_send_test, (void *)&_chainport);
if (ret < 0)
{
perror("thread_send_test() thread creation error");
exit(-1);
}
pthread_detach(thrid);
sleepms(1);
}
// level db thread
printf("Create [leveldb] thread\n");
ret = pthread_create(&thrid, NULL, thread_levledb, (void *)&_chainport);
if (ret < 0)
{
perror("thread_levledb() thread creation error");
exit(-1);
}
pthread_detach(thrid);
sleepms(1);
}
void create_subscriber_threads()
{
pthread_t cthrid, thrid[100];
int ret = 0;
// Client thread
if (_automode == 0)
{
printf("Create [client] thread\n");
ret = pthread_create(&cthrid, NULL, thread_client, (void *)&_clientport);
if (ret < 0)
{
perror("thread_client() thread creation error");
exit(-1);
}
pthread_detach(cthrid);
sleepms(1);
}
// Subscriber threads
for (int idx = 0; idx < _npeer; idx ++)
{
char *peer = _peerlist[idx];
char *tp = strchr(peer, ':');
if (tp == NULL)
{
fprintf(stderr, "ERROR: peer format is IP:PORT. %s skipped\n", peer);
exit(-1);
}
// skip subscriber on self node
if (_maxnode > 1 && atoi(tp+1) == _chainport)
{
printf("Peer %s skipped.\n", peer);
printf("\n");
continue;
}
printf("Create subscriber thread [%d]=%s\n", idx, peer);
ret = pthread_create(&thrid[idx], NULL, thread_subscriber, (void *)_peerlist[idx]);
if (ret < 0)
{
perror("thread_subscriber() thread creation error");
exit(-1);
}
pthread_detach(thrid[idx]);
sleepms(1);
}
}
void create_verifier_threads(int nverifiers)
{
pthread_t thrid[100], cthrid;
// consensus thread
printf("Create [consensus] thread\n");
int ret = pthread_create(&cthrid, NULL, thread_consensus, (void *)&_chainport);
if (ret < 0)
{
perror("thread_consensus() thread creation error");
exit(-1);
}
pthread_detach(cthrid);
sleepms(1);
// Verifier thread creation
for (int idx = 0; idx < nverifiers; idx++)
{
int id = idx;
int ret = pthread_create(&thrid[idx], NULL, thread_verifier, (void *)&id);
if (ret < 0) {
perror("thread_verifier() thread creation error");
exit(-1);
}
pthread_detach(thrid[idx]);
sleepms(1);
}
}