forked from kubernetes-client/javascript
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwatch.ts
133 lines (119 loc) · 4.38 KB
/
watch.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import byline = require('byline');
import request = require('request');
import { Duplex } from 'stream';
import { KubeConfig } from './config';
export interface WatchUpdate {
type: string;
object: object;
}
// We cannot use the type ReadableStream because Request returned by request
// library is not a true ReadableStream and there is extra abort method.
export interface RequestResult {
pipe(stream: Duplex): void;
on(ev: string, cb: (arg: any) => void): void;
abort(): void;
}
export interface Response {
statusCode: number;
statusMessage: string;
}
// The contract is that the provided request library will return a readable
// stream with abort method.
export interface RequestInterface {
webRequest(opts: request.OptionsWithUri): RequestResult;
}
export class DefaultRequest implements RequestInterface {
// requestImpl can be overriden in case we need to test mocked DefaultRequest
private requestImpl: (opts: request.OptionsWithUri) => request.Request;
constructor(requestImpl?: (opts: request.OptionsWithUri) => request.Request) {
this.requestImpl = requestImpl ? requestImpl : request;
}
// Using request lib can be confusing when combining Stream- with Callback-
// style API. We avoid the callback and handle HTTP response errors, that
// would otherwise require a different error handling, in a transparent way
// to the user (see github issue request/request#647 for more info).
public webRequest(opts: request.OptionsWithUri): RequestResult {
const req = this.requestImpl(opts);
// pause the stream until we get a response not to miss any bytes
req.pause();
req.on('response', (resp) => {
if (resp.statusCode === 200) {
req.resume();
} else {
req.emit('error', new Error(resp.statusMessage));
}
});
return req;
}
}
export class Watch {
public static SERVER_SIDE_CLOSE: object = { error: 'Connection closed on server' };
public config: KubeConfig;
private readonly requestImpl: RequestInterface;
public constructor(config: KubeConfig, requestImpl?: RequestInterface) {
this.config = config;
if (requestImpl) {
this.requestImpl = requestImpl;
} else {
this.requestImpl = new DefaultRequest();
}
}
// Watch the resource and call provided callback with parsed json object
// upon event received over the watcher connection.
//
// "done" callback is called either when connection is closed or when there
// is an error. In either case, watcher takes care of properly closing the
// underlaying connection so that it doesn't leak any resources.
public async watch(
path: string,
queryParams: any,
callback: (phase: string, apiObj: any, watchObj?: any) => void,
done: (err: any) => void,
): Promise<any> {
const cluster = this.config.getCurrentCluster();
if (!cluster) {
throw new Error('No currently active cluster');
}
const url = cluster.server + path;
queryParams.watch = true;
const headerParams: any = {};
const requestOptions: request.OptionsWithUri = {
method: 'GET',
qs: queryParams,
headers: headerParams,
uri: url,
useQuerystring: true,
json: true,
pool: false,
};
await this.config.applyToRequest(requestOptions);
let req;
let doneCalled: boolean = false;
const doneCallOnce = (err: any) => {
if (!doneCalled) {
req.abort();
doneCalled = true;
done(err);
}
};
req = this.requestImpl.webRequest(requestOptions);
const stream = byline.createStream();
req.on('error', doneCallOnce);
req.on('socket', (socket) => {
socket.setTimeout(30000);
socket.setKeepAlive(true, 30000);
});
stream.on('error', doneCallOnce);
stream.on('close', () => doneCallOnce(null));
stream.on('data', (line) => {
try {
const data = JSON.parse(line);
callback(data.type, data.object, data);
} catch (ignore) {
// ignore parse errors
}
});
req.pipe(stream);
return req;
}
}