Skip to content

Commit

Permalink
feature(@nestjs/microservices) added rmq transport support
Browse files Browse the repository at this point in the history
  • Loading branch information
AlariCode committed Jul 8, 2018
1 parent e1abf80 commit 685dc33
Show file tree
Hide file tree
Showing 10 changed files with 5,845 additions and 7,960 deletions.
13,634 changes: 5,675 additions & 7,959 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
"@nestjs/microservices": "5.0.0",
"@nestjs/testing": "5.0.0",
"@nestjs/websockets": "5.0.0",
"@types/amqplib": "^0.5.7",
"amqplib": "^0.5.2",
"axios": "^0.17.1",
"class-transformer": "^0.1.8",
"class-validator": "^0.8.1",
Expand Down
3 changes: 3 additions & 0 deletions packages/microservices/client/client-proxy-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Closeable } from '../interfaces/closeable.interface';
import { ClientNats } from './client-nats';
import { ClientMqtt } from './client-mqtt';
import { ClientGrpcProxy } from './client-grpc';
import { ClientRMQ } from './client-rmq';

export class ClientProxyFactory {
public static create(options: ClientOptions): ClientProxy & Closeable {
Expand All @@ -20,6 +21,8 @@ export class ClientProxyFactory {
return new ClientMqtt(options);
case Transport.GRPC:
return new ClientGrpcProxy(options);
case Transport.RMQ:
return new ClientRMQ(options);
default:
return new ClientTCP(options);
}
Expand Down
82 changes: 82 additions & 0 deletions packages/microservices/client/client-rmq.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { Channel, Connection } from 'amqplib';
import { CONNECT_EVENT, ERROR_EVENT, MESSAGE_EVENT, RQM_DEFAULT_URL, SUBSCRIBE, RQM_DEFAULT_QUEUE } from './../constants';
import { Logger } from '@nestjs/common/services/logger.service';
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { ClientProxy } from './client-proxy';
import { ClientOptions, RmqOptions } from '../interfaces';

let rqmPackage: any = {};

export class ClientRMQ extends ClientProxy {
private readonly logger = new Logger(ClientProxy.name);
private client: Connection = null;
private channel: Channel = null;
private url: string;
private queue: string;

constructor(
private readonly options: ClientOptions) {
super();
this.url =
this.getOptionsProp<RmqOptions>(this.options, 'url') || RQM_DEFAULT_URL;
this.queue =
this.getOptionsProp<RmqOptions>(this.options, 'queue') || RQM_DEFAULT_QUEUE;
rqmPackage = loadPackage('amqplib', ClientRMQ.name);
this.connect();
}

protected async publish(messageObj, callback: (err, result, disposed?: boolean) => void) {
try {
if (!this.client) {
await this.connect();
}
this.channel.assertQueue('', { exclusive: true }).then(responseQ => {
messageObj.replyTo = responseQ.queue;
this.channel.consume(responseQ.queue, (message) => {
this.handleMessage(message, this.client, callback)
}, { noAck: true });
this.channel.sendToQueue(this.queue, Buffer.from(JSON.stringify(messageObj)));
});
} catch (err) {
console.log(err);
callback(err, null);
}
}

private async handleMessage(message, server, callback): Promise<void> {
if(message) {
const { content, fields } = message;
const { err, response, isDisposed } = JSON.parse(content.toString());
if (isDisposed || err) {
callback({
err,
response: null,
isDisposed: true,
});
this.channel.deleteQueue(fields.routingKey);
}
callback({
err,
response,
});
}
}

public close(): void {
this.channel && this.channel.close();
this.client && this.client.close();
}

public handleError(client: Connection): void {
client.addListener(ERROR_EVENT, err => this.logger.error(err));
}

public async connect(): Promise<any> {
return new Promise(async (resolve, reject) => {
this.client = await rqmPackage.connect(this.url);
this.channel = await this.client.createChannel();
this.channel.assertQueue(this.queue, { durable: false });
resolve();
});
}
}
2 changes: 2 additions & 0 deletions packages/microservices/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ export const REDIS_DEFAULT_URL = 'redis://localhost:6379';
export const NATS_DEFAULT_URL = 'nats://localhost:4222';
export const MQTT_DEFAULT_URL = 'mqtt://localhost:1883';
export const GRPC_DEFAULT_URL = 'localhost:5000';
export const RQM_DEFAULT_URL = 'localhost';
export const RQM_DEFAULT_QUEUE = 'default';

export const CONNECT_EVENT = 'connect';
export const MESSAGE_EVENT = 'message';
Expand Down
1 change: 1 addition & 0 deletions packages/microservices/enums/transport.enum.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ export enum Transport {
NATS,
MQTT,
GRPC,
RMQ,
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import {
NatsOptions,
MqttOptions,
GrpcOptions,
RmqOptions,
} from './microservice-configuration.interface';

export type ClientOptions =
| RedisOptions
| NatsOptions
| MqttOptions
| GrpcOptions
| TcpClientOptions;
| TcpClientOptions
| RmqOptions;

export interface TcpClientOptions {
transport: Transport.TCP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export type MicroserviceOptions =
| RedisOptions
| NatsOptions
| MqttOptions
| RmqOptions
| CustomStrategy;

export interface CustomStrategy {
Expand Down Expand Up @@ -65,3 +66,11 @@ export interface NatsOptions {
tls?: any;
};
}

export interface RmqOptions {
transport?: Transport.RMQ;
options?: {
url?: string;
queue?: string;
};
}
3 changes: 3 additions & 0 deletions packages/microservices/server/server-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { ServerMqtt } from './server-mqtt';
import { ServerNats } from './server-nats';
import { ServerRedis } from './server-redis';
import { ServerTCP } from './server-tcp';
import { ServerRMQ } from './server-rqm';

export class ServerFactory {
public static create(
Expand All @@ -21,6 +22,8 @@ export class ServerFactory {
return new ServerMqtt(options);
case Transport.GRPC:
return new ServerGrpc(options);
case Transport.RMQ:
return new ServerRMQ(options);
default:
return new ServerTCP(options);
}
Expand Down
65 changes: 65 additions & 0 deletions packages/microservices/server/server-rqm.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { Server } from './server';
import { Channel, Connection } from 'amqplib';
import { RQM_DEFAULT_URL , RQM_DEFAULT_QUEUE } from './../constants';
import { CustomTransportStrategy, RmqOptions } from './../interfaces';
import { MicroserviceOptions } from '../interfaces/microservice-configuration.interface';
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { Observable } from 'rxjs';

let rqmPackage: any = {};

export class ServerRMQ extends Server implements CustomTransportStrategy {
private server: Connection = null;
private channel: Channel = null;
private url: string;
private queue: string;

constructor(private readonly options: MicroserviceOptions) {
super();
this.url =
this.getOptionsProp<RmqOptions>(this.options, 'url') || RQM_DEFAULT_URL;
this.queue =
this.getOptionsProp<RmqOptions>(this.options, 'queue') || RQM_DEFAULT_QUEUE;
rqmPackage = loadPackage('amqplib', ServerRMQ.name);
}

public async listen(callback: () => void): Promise<void> {
await this.start(callback);
this.channel.consume(this.queue, (msg) => this.handleMessage(msg) , {
noAck: true,
});
}

private async start(callback?: () => void) {
try {
this.server = await rqmPackage.connect(this.url);
this.channel = await this.server.createChannel();
this.channel.assertQueue(this.queue, { durable: false });
} catch (err) {
this.logger.error(err);
}
}

public close(): void {
this.channel && this.channel.close();
this.server && this.server.close();
}

private async handleMessage(message): Promise<void> {
const { content } = message;
const messageObj = JSON.parse(content.toString());
const handlers = this.getHandlers();
const pattern = JSON.stringify(messageObj.pattern);
if (!this.messageHandlers[pattern]) {
return;
}
const handler = this.messageHandlers[pattern];
const response$ = this.transformToObservable(await handler(messageObj.data)) as Observable<any>;
response$ && this.send(response$, (data) => this.sendMessage(data, messageObj.replyTo));
}

private sendMessage(message, replyTo): void {
const buffer = Buffer.from(JSON.stringify(message));
this.channel.sendToQueue(replyTo, buffer);
}
}

0 comments on commit 685dc33

Please sign in to comment.