diff --git a/src/web-socket-handler.ts b/src/web-socket-handler.ts index 866c0dd343..b6387c2dd9 100644 --- a/src/web-socket-handler.ts +++ b/src/web-socket-handler.ts @@ -72,6 +72,41 @@ export class WebSocketHandler implements WebSocketInterface { return true; } + public static async processData( + data: string | Buffer, + ws: WebSocket | null, + createWS: () => Promise, + streamNum: number = 0, + retryCount: number = 3, + ): Promise { + const buff = Buffer.alloc(data.length + 1); + + buff.writeInt8(streamNum, 0); + if (data instanceof Buffer) { + data.copy(buff, 1); + } else { + buff.write(data, 1); + } + + let i = 0; + for (; i < retryCount; ++i) { + if (ws !== null && ws.readyState === WebSocket.OPEN) { + ws.send(buff); + break; + } else { + ws = await createWS(); + } + } + + // This throw doesn't go anywhere. + // TODO: Figure out the right way to return an error. + if (i >= retryCount) { + throw new Error("can't send data to ws"); + } + + return ws; + } + public static restartableHandleStandardInput( createWS: () => Promise, stdin: stream.Readable | any, @@ -85,33 +120,10 @@ export class WebSocketHandler implements WebSocketInterface { let queue: Promise = Promise.resolve(); let ws: WebSocket | null = null; - async function processData(data): Promise { - const buff = Buffer.alloc(data.length + 1); - - buff.writeInt8(streamNum, 0); - if (data instanceof Buffer) { - data.copy(buff, 1); - } else { - buff.write(data, 1); - } - - let i = 0; - for (; i < retryCount; ++i) { - if (ws !== null && ws.readyState === WebSocket.OPEN) { - ws.send(buff); - break; - } else { - ws = await createWS(); - } - } - - if (i >= retryCount) { - throw new Error("can't send data to ws"); - } - } - stdin.on('data', (data) => { - queue = queue.then(() => processData(data)); + queue = queue.then(async () => { + ws = await WebSocketHandler.processData(data, ws, createWS, streamNum, retryCount); + }); }); stdin.on('end', () => { diff --git a/src/web-socket-handler_test.ts b/src/web-socket-handler_test.ts index 819363778f..7344f9cf19 100644 --- a/src/web-socket-handler_test.ts +++ b/src/web-socket-handler_test.ts @@ -2,7 +2,6 @@ import { promisify } from 'util'; import { expect } from 'chai'; import WebSocket = require('isomorphic-ws'); import { ReadableStreamBuffer, WritableStreamBuffer } from 'stream-buffers'; -import { anyFunction, capture, instance, mock, reset, verify } from 'ts-mockito'; import { V1Status } from './api'; import { KubeConfig } from './config'; @@ -301,3 +300,34 @@ describe('WebSocket', () => { } }); }); + +describe('Restartable Handle Standard Input', () => { + it('should throw on negative retry', () => { + const p = new Promise(() => {}); + expect(() => WebSocketHandler.restartableHandleStandardInput(() => p, null, 0, -1)).to.throw( + "retryCount can't be lower than 0.", + ); + }); + + it('should retry n times', () => { + const retryTimes = 5; + let count = 0; + const ws = { + readyState: false, + } as unknown; + WebSocketHandler.processData( + 'some test data', + null, + (): Promise => { + return new Promise((resolve) => { + count++; + resolve(ws as WebSocket); + }); + }, + 0, + retryTimes, + ).catch(() => { + expect(count).to.equal(retryTimes); + }); + }); +});