forked from freddiecoleman/chia-network-scanner
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPeerConnection.ts
180 lines (148 loc) · 5.78 KB
/
PeerConnection.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import { log } from './log';
import { MessageChannel } from './MessageChannel';
import { Peer } from './peer';
import { decodeMessage, encodeMessage, ProtocolMessageTypes } from './encoder';
interface PeerConnectionOptions {
networkId: string;
protocolVersion: string;
softwareVersion: string;
nodeType: number;
hostname: string;
port: number;
connectionTimeout: number;
cert: Buffer;
key: Buffer;
}
class PeerConnection {
private readonly messageChannel: MessageChannel;
private readonly messageHandlers: Map<number, Function> = new Map();
public constructor({
networkId,
protocolVersion,
softwareVersion,
nodeType,
hostname,
port,
connectionTimeout,
cert,
key
}: PeerConnectionOptions) {
this.messageChannel = new MessageChannel({
networkId,
protocolVersion,
softwareVersion,
nodeType,
hostname,
port,
connectionTimeout,
onMessage: data => this.onMessage(data),
cert,
key
});
}
public async connect(): Promise<void> {
return new Promise(async(resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('Failed to connect to peer within 2 seconds')), 2000);
try {
await this.messageChannel.connect();
// Connected within timeout
clearTimeout(timeout);
resolve();
} catch (err) {
clearTimeout(timeout);
reject(err);
}
});
}
/**
* Chia application level handshake required before using the peer protocol.
*/
public async handshake(): Promise<this> {
const { hostname, port, connectionTimeout, networkId, protocolVersion, softwareVersion, nodeType } = this.messageChannel;
return new Promise(async(resolve, reject) => {
const timeout = setTimeout(() => reject(new Error(`${hostname}:${port} did not respond to handshake within ${(connectionTimeout / 1000).toFixed(2)} seconds. Bailing.`)), connectionTimeout);
// Handle handshake response messages
const handshakeResponse = [
this.expectMessage(ProtocolMessageTypes.handshake),
this.expectMessage(ProtocolMessageTypes.handshake_ack)
];
// Initiate handshake
this.sendMessage(ProtocolMessageTypes.handshake, {
network_id: networkId,
protocol_version: protocolVersion,
software_version: softwareVersion,
server_port: port,
node_type: nodeType
});
try {
// Wait for handshake response
await Promise.race(handshakeResponse);
} catch (err) {
return reject(err);
}
// Handshake completed within timeout
clearTimeout(timeout);
// We can now use the peer protocol
resolve(this);
});
}
public sendMessage(messageType: number, data: any) {
const message = encodeMessage(messageType, data);
log.info(`Sending ${messageType} message`);
this.messageChannel.sendMessage(message);
}
/**
* Get the peers of this peer.
*/
public getPeers(): Promise<Peer[]> {
const { hostname, port, connectionTimeout } = this.messageChannel;
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error(`${hostname}:${port} did not respond after ${(connectionTimeout / 1000).toFixed(2)} seconds. Bailing.`)), connectionTimeout);
this.addMessageHandler(ProtocolMessageTypes.respond_peers, (respondPeers: any) => {
clearTimeout(timeout);
resolve(respondPeers.peer_list);
});
this.sendMessage(ProtocolMessageTypes.request_peers, {});
});
}
public close(): void {
return this.messageChannel.close();
}
private addMessageHandler(messageType: number, handler: Function) {
log.debug(`Adding message handler for ${messageType} messages`);
this.messageHandlers.set(messageType, handler);
}
private onMessage(data: Buffer): void {
try {
const messageType = data[0];
log.info(`Received message of type ${messageType}`);
const message = decodeMessage(data);
log.info(`Decoded message to ${JSON.stringify(message)}`)
const handler = this.messageHandlers.get(messageType);
if (handler) {
return handler(message);
}
log.warn(`No handler for ${messageType} message. Discarding it.`);
} catch (err) {
// Anybody could send any old rubbish down the wire and we don't want that to crash our process
log.error(err, 'Error handling inbound message');
log.warn(`Hex of message that could not be decoded: ${data.toString('hex')}`);
}
}
/**
* Expects a message of a messageType to be received within a timeout.
*
* @param messageType expected
*/
private expectMessage(messageType: number): Promise<void> {
const { hostname, port, connectionTimeout } = this.messageChannel;
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error(`${hostname}:${port} did not receive ${messageType} message within ${(connectionTimeout / 1000).toFixed(2)} seconds. Bailing.`)), connectionTimeout);
this.addMessageHandler(messageType, () => {
clearTimeout(timeout);
resolve();
});
});
}
}
export { PeerConnection };