Skip to content

Commit

Permalink
Update coverage. Add more tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
brendandburns committed Feb 13, 2020
1 parent ba0dc1a commit 77bd5e6
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 27 deletions.
64 changes: 38 additions & 26 deletions src/web-socket-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,41 @@ export class WebSocketHandler implements WebSocketInterface {
return true;
}

public static async processData(
data: string | Buffer,
ws: WebSocket | null,
createWS: () => Promise<WebSocket>,
streamNum: number = 0,
retryCount: number = 3,
): Promise<WebSocket | null> {
const buff = Buffer.alloc(data.length + 1);

buff.writeInt8(streamNum, 0);
if (data instanceof Buffer) {
data.copy(buff, 1);
} else {
buff.write(data, 1);
}

let i = 0;
for (; i < retryCount; ++i) {
if (ws !== null && ws.readyState === WebSocket.OPEN) {
ws.send(buff);
break;
} else {
ws = await createWS();
}
}

// This throw doesn't go anywhere.
// TODO: Figure out the right way to return an error.
if (i >= retryCount) {
throw new Error("can't send data to ws");
}

return ws;
}

public static restartableHandleStandardInput(
createWS: () => Promise<WebSocket>,
stdin: stream.Readable | any,
Expand All @@ -85,33 +120,10 @@ export class WebSocketHandler implements WebSocketInterface {
let queue: Promise<void> = Promise.resolve();
let ws: WebSocket | null = null;

async function processData(data): Promise<void> {
const buff = Buffer.alloc(data.length + 1);

buff.writeInt8(streamNum, 0);
if (data instanceof Buffer) {
data.copy(buff, 1);
} else {
buff.write(data, 1);
}

let i = 0;
for (; i < retryCount; ++i) {
if (ws !== null && ws.readyState === WebSocket.OPEN) {
ws.send(buff);
break;
} else {
ws = await createWS();
}
}

if (i >= retryCount) {
throw new Error("can't send data to ws");
}
}

stdin.on('data', (data) => {
queue = queue.then(() => processData(data));
queue = queue.then(async () => {
ws = await WebSocketHandler.processData(data, ws, createWS, streamNum, retryCount);
});
});

stdin.on('end', () => {
Expand Down
32 changes: 31 additions & 1 deletion src/web-socket-handler_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { promisify } from 'util';
import { expect } from 'chai';
import WebSocket = require('isomorphic-ws');
import { ReadableStreamBuffer, WritableStreamBuffer } from 'stream-buffers';
import { anyFunction, capture, instance, mock, reset, verify } from 'ts-mockito';

import { V1Status } from './api';
import { KubeConfig } from './config';
Expand Down Expand Up @@ -301,3 +300,34 @@ describe('WebSocket', () => {
}
});
});

describe('Restartable Handle Standard Input', () => {
it('should throw on negative retry', () => {
const p = new Promise<WebSocket>(() => {});
expect(() => WebSocketHandler.restartableHandleStandardInput(() => p, null, 0, -1)).to.throw(
"retryCount can't be lower than 0.",
);
});

it('should retry n times', () => {
const retryTimes = 5;
let count = 0;
const ws = {
readyState: false,
} as unknown;
WebSocketHandler.processData(
'some test data',
null,
(): Promise<WebSocket> => {
return new Promise<WebSocket>((resolve) => {
count++;
resolve(ws as WebSocket);
});
},
0,
retryTimes,
).catch(() => {
expect(count).to.equal(retryTimes);
});
});
});

0 comments on commit 77bd5e6

Please sign in to comment.