Skip to content

Commit

Permalink
Merge pull request nestjs#1576 from nestjs/bugfix/client-rmq
Browse files Browse the repository at this point in the history
bugfix(microservices): remove reply_queue in rabbitmq client
  • Loading branch information
kamilmysliwiec authored Feb 20, 2019
2 parents 4fcc373 + 4a44d16 commit 6cea592
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
15 changes: 7 additions & 8 deletions packages/microservices/client/client-rmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
} from './../constants';
import { WritePacket } from './../interfaces';
import { ClientProxy } from './client-proxy';
import { RABBITMQ_REPLY_QUEUE } from './constants';

let rqmPackage: any = {};

Expand All @@ -29,7 +30,6 @@ export class ClientRMQ extends ClientProxy {
protected prefetchCount: number;
protected isGlobalPrefetchCount: boolean;
protected queueOptions: any;
protected replyQueue: string;
protected responseEmitter: EventEmitter;

constructor(protected readonly options: ClientOptions['options']) {
Expand Down Expand Up @@ -62,7 +62,7 @@ export class ClientRMQ extends ClientProxy {
public consumeChannel() {
this.channel.addSetup(channel =>
channel.consume(
this.replyQueue,
RABBITMQ_REPLY_QUEUE,
msg => this.responseEmitter.emit(msg.properties.correlationId, msg),
{ noAck: true },
),
Expand All @@ -78,7 +78,10 @@ export class ClientRMQ extends ClientProxy {

const connect$ = this.connect$(this.client);
this.connection = this.mergeDisconnectEvent(this.client, connect$)
.pipe(switchMap(() => this.createChannel()), share())
.pipe(
switchMap(() => this.createChannel()),
share(),
)
.toPromise();
return this.connection;
}
Expand Down Expand Up @@ -112,10 +115,6 @@ export class ClientRMQ extends ClientProxy {
await channel.assertQueue(this.queue, this.queueOptions);
await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);

this.replyQueue = (await channel.assertQueue('', {
exclusive: true,
})).queue;

this.responseEmitter = new EventEmitter();
this.responseEmitter.setMaxListeners(0);
this.consumeChannel();
Expand All @@ -137,7 +136,7 @@ export class ClientRMQ extends ClientProxy {
this.queue,
Buffer.from(JSON.stringify(message)),
{
replyTo: this.replyQueue,
replyTo: RABBITMQ_REPLY_QUEUE,
correlationId,
},
);
Expand Down
1 change: 1 addition & 0 deletions packages/microservices/client/constants.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export const ECONNREFUSED = 'ECONNREFUSED';
export const CONN_ERR = 'CONN_ERR';
export const GRPC_CANCELLED = 'Cancelled';
export const RABBITMQ_REPLY_QUEUE = 'amq.rabbitmq.reply-to';

0 comments on commit 6cea592

Please sign in to comment.