Skip to content

Commit

Permalink
Merge branch 'ivibe-my-fix-branch'
Browse files Browse the repository at this point in the history
  • Loading branch information
kamilmysliwiec committed Mar 26, 2019
2 parents 48be3ef + 802be5c commit 70fa9f0
Show file tree
Hide file tree
Showing 15 changed files with 935 additions and 26 deletions.
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
"grpc": "1.19.0",
"http2": "3.3.7",
"iterare": "1.1.2",
"json-socket": "0.3.0",
"merge-graphql-schemas": "1.5.8",
"mqtt": "2.18.8",
"multer": "1.4.1",
Expand Down
4 changes: 2 additions & 2 deletions packages/microservices/client/client-tcp.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Logger } from '@nestjs/common';
import * as JsonSocket from 'json-socket';
import * as net from 'net';
import { share, tap } from 'rxjs/operators';
import {
Expand All @@ -14,6 +13,7 @@ import {
ClientOptions,
TcpClientOptions,
} from '../interfaces/client-metadata.interface';
import { JsonSocket } from '../helpers/json-socket';
import { ClientProxy } from './client-proxy';
import { ECONNREFUSED } from './constants';

Expand Down Expand Up @@ -42,7 +42,7 @@ export class ClientTCP extends ClientProxy {
this.socket = this.createSocket();
this.bindEvents(this.socket);

const source$ = this.connect$(this.socket._socket).pipe(
const source$ = this.connect$(this.socket.netSocket).pipe(
tap(() => {
this.isConnected = true;
this.socket.on(MESSAGE_EVENT, (buffer: WritePacket & PacketId) =>
Expand Down
1 change: 1 addition & 0 deletions packages/microservices/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export const RQM_DEFAULT_URL = 'amqp://localhost';
export const CONNECT_EVENT = 'connect';
export const DISCONNECT_EVENT = 'disconnect';
export const MESSAGE_EVENT = 'message';
export const DATA_EVENT = 'data';
export const ERROR_EVENT = 'error';
export const CLOSE_EVENT = 'close';
export const SUBSCRIBE = 'subscribe';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export class CorruptedPacketLengthException extends Error {
constructor(rawContentLength: string) {
super(`Corrupted length value "${rawContentLength}" supplied in a packet`);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export class InvalidJSONFormatException extends Error {
constructor(err: Error, data: string) {
super(`Could not parse JSON: ${err.message}\nRequest data: ${data}`);
}
}
5 changes: 5 additions & 0 deletions packages/microservices/errors/net-socket-closed.exception.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export class NetSocketClosedException extends Error {
constructor() {
super(`The net socket is closed.`);
}
}
130 changes: 130 additions & 0 deletions packages/microservices/helpers/json-socket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { Socket } from 'net';
import { StringDecoder } from 'string_decoder';
import {
CLOSE_EVENT,
CONNECT_EVENT,
DATA_EVENT,
ERROR_EVENT,
MESSAGE_EVENT,
} from '../constants';
import { CorruptedPacketLengthException } from '../errors/corrupted-packet-length.exception';
import { InvalidJSONFormatException } from '../errors/invalid-json-format.exception';
import { NetSocketClosedException } from '../errors/net-socket-closed.exception';

export class JsonSocket {
private contentLength: number | null = null;
private isClosed = false;
private buffer = '';

private readonly stringDecoder = new StringDecoder();
private readonly delimeter = '#';

public get netSocket() {
return this.socket;
}

constructor(public readonly socket: Socket) {
this.socket.on(DATA_EVENT, this.onData.bind(this));
this.socket.on(CONNECT_EVENT, () => (this.isClosed = false));
this.socket.on(CLOSE_EVENT, () => (this.isClosed = true));
this.socket.on(ERROR_EVENT, () => (this.isClosed = true));
}

public connect(port: number, host: string) {
this.socket.connect(port, host);
return this;
}

public on(event: string, callback: (err?: any) => void) {
this.socket.on(event, callback);
return this;
}

public once(event: string, callback: (err?: any) => void) {
this.socket.once(event, callback);
return this;
}

public end() {
this.socket.end();
return this;
}

public sendMessage(message: any, callback?: (err?: any) => void) {
if (this.isClosed) {
callback && callback(new NetSocketClosedException());
return;
}
this.socket.write(this.formatMessageData(message), 'utf-8', callback);
}

private onData(dataRaw: Buffer | string) {
const data = Buffer.isBuffer(dataRaw)
? this.stringDecoder.write(dataRaw)
: dataRaw;

try {
this.handleData(data);
} catch (e) {
this.socket.emit(ERROR_EVENT, e.message);
this.socket.end();
}
}

private handleData(data: string) {
this.buffer += data;

if (this.contentLength == null) {
const i = this.buffer.indexOf(this.delimeter);
/**
* Check if the buffer has the delimeter (#),
* if not, the end of the buffer string might be in the middle of a content length string
*/
if (i !== -1) {
const rawContentLength = this.buffer.substring(0, i);
this.contentLength = parseInt(rawContentLength, 10);

if (isNaN(this.contentLength)) {
this.contentLength = null;
this.buffer = '';
throw new CorruptedPacketLengthException(rawContentLength);
}
this.buffer = this.buffer.substring(i + 1);
}
}

if (this.contentLength !== null) {
const length = this.buffer.length;

if (length === this.contentLength) {
this.handleMessage(this.buffer);
} else if (length > this.contentLength) {
const message = this.buffer.substring(0, this.contentLength);
const rest = this.buffer.substring(this.contentLength);
this.handleMessage(message);
this.onData(rest);
}
}
}

private handleMessage(data: string) {
this.contentLength = null;
this.buffer = '';

let message: Record<string, unknown>;
try {
message = JSON.parse(data);
} catch (e) {
throw new InvalidJSONFormatException(e, data);
}
message = message || {};
this.socket.emit(MESSAGE_EVENT, message);
}

private formatMessageData(message: any) {
const messageData = JSON.stringify(message);
const length = messageData.length;
const data = length + this.delimeter + messageData;
return data;
}
}
13 changes: 7 additions & 6 deletions packages/microservices/server/server-tcp.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { isString, isUndefined } from '@nestjs/common/utils/shared.utils';
import * as JsonSocket from 'json-socket';
import * as net from 'net';
import { Server as NetSocket } from 'net';
import { Server as NetSocket, Socket } from 'net';
import { Observable } from 'rxjs';
import {
CLOSE_EVENT,
Expand All @@ -15,6 +14,7 @@ import {
MicroserviceOptions,
TcpOptions,
} from '../interfaces/microservice-configuration.interface';
import { JsonSocket } from '../helpers/json-socket';
import { Server } from './server';

export class ServerTCP extends Server implements CustomTransportStrategy {
Expand All @@ -39,15 +39,16 @@ export class ServerTCP extends Server implements CustomTransportStrategy {
this.server.close();
}

public bindHandler<T extends Record<string, any>>(socket: T) {
public bindHandler(socket: Socket) {
const readSocket = this.getSocketInstance(socket);
readSocket.on(MESSAGE_EVENT, async (msg: ReadPacket & PacketId) =>
this.handleMessage(readSocket, msg),
);
readSocket.on(ERROR_EVENT, this.handleError.bind(this));
}

public async handleMessage<T extends Record<string, any>>(
socket: T,
public async handleMessage(
socket: JsonSocket,
packet: ReadPacket & PacketId,
) {
const pattern = !isString(packet.pattern)
Expand Down Expand Up @@ -98,7 +99,7 @@ export class ServerTCP extends Server implements CustomTransportStrategy {
this.server.on(CLOSE_EVENT, this.handleClose.bind(this));
}

private getSocketInstance<T>(socket: T): JsonSocket {
private getSocketInstance(socket: Socket): JsonSocket {
return new JsonSocket(socket);
}
}
20 changes: 5 additions & 15 deletions packages/microservices/test/client/client-tcp.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,7 @@ import { ERROR_EVENT } from '../../constants';

describe('ClientTCP', () => {
let client: ClientTCP;
let socket: {
connect: sinon.SinonStub;
publish: sinon.SinonSpy;
_socket: {
addListener: sinon.SinonStub;
removeListener: sinon.SinonSpy;
once: sinon.SinonStub;
};
on: sinon.SinonStub;
end: sinon.SinonSpy;
sendMessage: sinon.SinonSpy;
};
let socket;
let createSocketStub: sinon.SinonStub;

beforeEach(() => {
Expand All @@ -27,9 +16,8 @@ describe('ClientTCP', () => {

socket = {
connect: sinon.stub(),
publish: sinon.spy(),
on: sinon.stub().callsFake(onFakeCallback),
_socket: {
netSocket: {
addListener: sinon.stub().callsFake(onFakeCallback),
removeListener: sinon.spy(),
once: sinon.stub().callsFake(onFakeCallback),
Expand Down Expand Up @@ -134,7 +122,9 @@ describe('ClientTCP', () => {
toPromise: () => source,
pipe: () => source,
};
connect$Stub = sinon.stub(client, 'connect$' as any).callsFake(() => source);
connect$Stub = sinon
.stub(client, 'connect$' as any)
.callsFake(() => source);
await client.connect();
});
afterEach(() => {
Expand Down
Loading

0 comments on commit 70fa9f0

Please sign in to comment.