Skip to content

Commit

Permalink
feature(@nestjs/microservices) Added additional options and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
AlariCode committed Jul 20, 2018
1 parent bb73598 commit e0d1364
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 58 deletions.
11 changes: 8 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/common/enums/transport.enum.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ export enum Transport {
NATS,
MQTT,
GRPC,
RMQ
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ import {
NatsOptions,
MqttOptions,
GrpcOptions,
RmqOptions
} from './microservice-configuration.interface';

export interface ClientOptions {
transport?: Transport;
options?:
| TcpClientOptions
| RedisOptions
| NatsOptions
| MqttOptions
| GrpcOptions;
| TcpClientOptions
| RedisOptions
| NatsOptions
| MqttOptions
| GrpcOptions
| RmqOptions;
}

export interface TcpClientOptions {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import { Transport } from '../../enums/transport.enum';
import { MqttClientOptions } from '../external/mqtt-options.interface';
import { CustomTransportStrategy } from './custom-transport-strategy.interface';
import { Options } from 'amqplib';

export type MicroserviceOptions =
| GrpcOptions
| TcpOptions
| RedisOptions
| NatsOptions
| MqttOptions
| RmqOptions
| CustomStrategy;

export interface CustomStrategy {
Expand Down Expand Up @@ -64,3 +66,14 @@ export interface NatsOptions {
tls?: any;
};
}

export interface RmqOptions {
transport?: Transport.RMQ;
options?: {
url?: string;
queue?: string;
prefetchCount?: number;
isGlobalPrefetchCount?: boolean;
queueOptions?: Options.AssertQueue;
};
}
34 changes: 22 additions & 12 deletions packages/microservices/client/client-rmq.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Channel, Connection } from 'amqplib';
import { CONNECT_EVENT, ERROR_EVENT, MESSAGE_EVENT, RQM_DEFAULT_URL, SUBSCRIBE, RQM_DEFAULT_QUEUE } from './../constants';
import { Channel, Connection, Options } from 'amqplib';
import { ERROR_EVENT, RQM_DEFAULT_URL, RQM_DEFAULT_QUEUE, RQM_DEFAULT_PREFETCH_COUNT, RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT, RQM_DEFAULT_QUEUE_OPTIONS } from './../constants';
import { Logger } from '@nestjs/common/services/logger.service';
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { ClientProxy } from './client-proxy';
Expand All @@ -14,30 +14,36 @@ export class ClientRMQ extends ClientProxy {
private channel: Channel = null;
private url: string;
private queue: string;
private prefetchCount: number;
private isGlobalPrefetchCount: boolean;
private queueOptions: Options.AssertQueue
private replyQueue: string;
private responseEmitter: EventEmitter;

constructor(
private readonly options: ClientOptions) {
super();
this.url =
this.getOptionsProp<RmqOptions>(this.options, 'url') || RQM_DEFAULT_URL;
this.queue =
this.getOptionsProp<RmqOptions>(this.options, 'queue') || RQM_DEFAULT_QUEUE;
this.prefetchCount =
this.getOptionsProp<RmqOptions>(this.options, 'prefetchCount') || RQM_DEFAULT_PREFETCH_COUNT;
this.isGlobalPrefetchCount =
this.getOptionsProp<RmqOptions>(this.options, 'isGlobalPrefetchCount') || RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
this.queueOptions =
this.getOptionsProp<RmqOptions>(this.options, 'queueOptions') || RQM_DEFAULT_QUEUE_OPTIONS;
rqmPackage = loadPackage('amqplib', ClientRMQ.name);
this.connect();
}

protected async publish(messageObj, callback: (err, result, disposed?: boolean) => void) {
protected publish(messageObj, callback: (err, result, disposed?: boolean) => void) {
try {
if (!this.client) {
await this.connect();
}
let correlationId = Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);
this.responseEmitter.once(correlationId, msg => {
this.handleMessage(msg, callback);
});
this.channel.sendToQueue(this.queue, Buffer.from(JSON.stringify(messageObj)), {
this.channel.sendToQueue(this.queue, Buffer.from(JSON.stringify(messageObj)), {
replyTo: this.replyQueue,
correlationId: correlationId
});
Expand All @@ -48,7 +54,7 @@ export class ClientRMQ extends ClientProxy {
}

private async handleMessage(message, callback): Promise<void> {
if(message) {
if (message) {
const { content } = message;
const { err, response, isDisposed } = JSON.parse(content.toString());
if (isDisposed || err) {
Expand All @@ -61,7 +67,7 @@ export class ClientRMQ extends ClientProxy {
callback({
err,
response,
});
});
}
}

Expand All @@ -80,11 +86,15 @@ export class ClientRMQ extends ClientProxy {
}, { noAck: true });
}

public async connect():Promise<any> {
public async connect(): Promise<any> {
if (this.client && this.channel) {
return Promise.resolve();
}
return new Promise(async (resolve, reject) => {
this.client = await rqmPackage.connect(this.url);
this.channel = await this.client.createChannel();
await this.channel.assertQueue(this.queue, { durable: false });
await this.channel.assertQueue(this.queue, this.queueOptions);
await this.channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
this.replyQueue = (await this.channel.assertQueue('', { exclusive: true })).queue;
this.responseEmitter = new EventEmitter();
this.responseEmitter.setMaxListeners(0);
Expand Down
6 changes: 5 additions & 1 deletion packages/microservices/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ export const NATS_DEFAULT_URL = 'nats://localhost:4222';
export const MQTT_DEFAULT_URL = 'mqtt://localhost:1883';
export const GRPC_DEFAULT_URL = 'localhost:5000';
export const RQM_DEFAULT_URL = 'amqp://localhost';
export const RQM_DEFAULT_QUEUE = 'default';

export const CONNECT_EVENT = 'connect';
export const MESSAGE_EVENT = 'message';
Expand All @@ -19,3 +18,8 @@ export const CLIENT_CONFIGURATION_METADATA = 'client';
export const CLIENT_METADATA = '__isClient';
export const PATTERN_HANDLER_METADATA = '__isPattern';
export const NO_PATTERN_MESSAGE = `There is no equivalent message pattern defined in the remote service.`;

export const RQM_DEFAULT_QUEUE = 'default';
export const RQM_DEFAULT_PREFETCH_COUNT = 0;
export const RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT = false;
export const RQM_DEFAULT_QUEUE_OPTIONS = {};
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { MqttClientOptions } from '@nestjs/common/interfaces/external/mqtt-options.interface';
import { Transport } from '../enums/transport.enum';
import { Server } from './../server/server';
import { Options } from 'amqplib';
import { CustomTransportStrategy } from './custom-transport-strategy.interface';

export type MicroserviceOptions =
Expand Down Expand Up @@ -72,5 +73,8 @@ export interface RmqOptions {
options?: {
url?: string;
queue?: string;
prefetchCount?: number;
isGlobalPrefetchCount?: boolean;
queueOptions?: Options.AssertQueue;
};
}
84 changes: 47 additions & 37 deletions packages/microservices/server/server-rqm.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Server } from './server';
import { Channel, Connection } from 'amqplib';
import { RQM_DEFAULT_URL , RQM_DEFAULT_QUEUE } from './../constants';
import { Channel, Connection, Options } from 'amqplib';
import { RQM_DEFAULT_URL, RQM_DEFAULT_QUEUE, RQM_DEFAULT_PREFETCH_COUNT, RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT, RQM_DEFAULT_QUEUE_OPTIONS } from './../constants';
import { CustomTransportStrategy, RmqOptions } from './../interfaces';
import { MicroserviceOptions } from '../interfaces/microservice-configuration.interface';
import { loadPackage } from '@nestjs/common/utils/load-package.util';
Expand All @@ -13,53 +13,63 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
private channel: Channel = null;
private url: string;
private queue: string;
private prefetchCount: number;
private queueOptions: Options.AssertQueue
private isGlobalPrefetchCount: boolean;

constructor(private readonly options: MicroserviceOptions) {
super();
this.url =
this.getOptionsProp<RmqOptions>(this.options, 'url') || RQM_DEFAULT_URL;
this.queue =
this.getOptionsProp<RmqOptions>(this.options, 'queue') || RQM_DEFAULT_QUEUE;
this.prefetchCount =
this.getOptionsProp<RmqOptions>(this.options, 'prefetchCount') || RQM_DEFAULT_PREFETCH_COUNT;
this.isGlobalPrefetchCount =
this.getOptionsProp<RmqOptions>(this.options, 'isGlobalPrefetchCount') || RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
this.queueOptions =
this.getOptionsProp<RmqOptions>(this.options, 'queueOptions') || RQM_DEFAULT_QUEUE_OPTIONS;
rqmPackage = loadPackage('amqplib', ServerRMQ.name);
}
}

public async listen(callback: () => void): Promise<void> {
await this.start(callback);
this.channel.consume(this.queue, (msg) => this.handleMessage(msg) , {
noAck: true,
});
}
public async listen(callback: () => void): Promise<void> {
await this.start(callback);
this.channel.consume(this.queue, (msg) => this.handleMessage(msg), {
noAck: true,
});
}

private async start(callback?: () => void) {
try {
this.server = await rqmPackage.connect(this.url);
this.channel = await this.server.createChannel();
this.channel.assertQueue(this.queue, { durable: false });
} catch (err) {
this.logger.error(err);
}
}
private async start(callback?: () => void) {
try {
this.server = await rqmPackage.connect(this.url);
this.channel = await this.server.createChannel();
this.channel.assertQueue(this.queue, this.queueOptions);
await this.channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
} catch (err) {
this.logger.error(err);
}
}

public close(): void {
this.channel && this.channel.close();
this.server && this.server.close();
}
public close(): void {
this.channel && this.channel.close();
this.server && this.server.close();
}

private async handleMessage(message): Promise<void> {
const { content, properties } = message;
const messageObj = JSON.parse(content.toString());
const handlers = this.getHandlers();
const pattern = JSON.stringify(messageObj.pattern);
if (!this.messageHandlers[pattern]) {
return;
private async handleMessage(message): Promise<void> {
const { content, properties } = message;
const messageObj = JSON.parse(content.toString());
const handlers = this.getHandlers();
const pattern = JSON.stringify(messageObj.pattern);
if (!this.messageHandlers[pattern]) {
return;
}
const handler = this.messageHandlers[pattern];
const response$ = this.transformToObservable(await handler(messageObj.data)) as Observable<any>;
response$ && this.send(response$, (data) => this.sendMessage(data, properties.replyTo, properties.correlationId));
}
const handler = this.messageHandlers[pattern];
const response$ = this.transformToObservable(await handler(messageObj.data)) as Observable<any>;
response$ && this.send(response$, (data) => this.sendMessage(data, properties.replyTo, properties.correlationId));
}

private sendMessage(message, replyTo, correlationId): void {
const buffer = Buffer.from(JSON.stringify(message));
this.channel.sendToQueue(replyTo, buffer, {correlationId: correlationId});
}
private sendMessage(message, replyTo, correlationId): void {
const buffer = Buffer.from(JSON.stringify(message));
this.channel.sendToQueue(replyTo, buffer, { correlationId: correlationId });
}
}

0 comments on commit e0d1364

Please sign in to comment.