Skip to content

Commit

Permalink
refactor(@nestjs/microservices) extract magic strings into constants
Browse files Browse the repository at this point in the history
  • Loading branch information
kamilmysliwiec committed Jun 21, 2018
1 parent adbfae2 commit f80cd9e
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 14 deletions.
19 changes: 11 additions & 8 deletions packages/microservices/client/client-grpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { ClientOptions } from '../interfaces/client-metadata.interface';
import { GRPC_DEFAULT_URL } from './../constants';
import { ClientGrpc, GrpcOptions } from './../interfaces';
import { ClientProxy } from './client-proxy';
import { GRPC_CANCELLED } from './constants';

let grpcPackage: any = {};

Expand All @@ -25,7 +26,7 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
this.grpcClient = this.createClient();
}

public getService<T extends {}>(name: keyof T): T {
public getService<T extends {}>(name: string): T {
const { options } = this.options as GrpcOptions;
if (!this.grpcClient[name]) {
throw new InvalidGrpcServiceException();
Expand Down Expand Up @@ -59,14 +60,15 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
): (...args) => Observable<any> {
return (...args) => {
return new Observable(observer => {
const call = client[methodName](...args);
let isClientCanceled = false;
const call = client[methodName](...args);

call.on('data', (data: any) => observer.next(data));
call.on('error', (error: any) => {
if (error.details === 'Cancelled') {
if (error.details === GRPC_CANCELLED) {
call.destroy();
if ( isClientCanceled ) {
return; // do not error if cancel was inititiated by Client
if (isClientCanceled) {
return;
}
}
observer.error(error);
Expand All @@ -76,10 +78,11 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
observer.complete();
});
return () => {
if (!call.finished) {
isClientCanceled = true;
call.cancel();
if (call.finished) {
return undefined;
}
isClientCanceled = true;
call.cancel();
};
});
};
Expand Down
3 changes: 2 additions & 1 deletion packages/microservices/client/constants.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export const ECONNREFUSED = 'ECONNREFUSED';
export const CONN_ERR = 'CONN_ERR';
export const CONN_ERR = 'CONN_ERR';
export const GRPC_CANCELLED = 'Cancelled';
1 change: 1 addition & 0 deletions packages/microservices/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export const MESSAGE_EVENT = 'message';
export const ERROR_EVENT = 'error';
export const CLOSE_EVENT = 'close';
export const SUBSCRIBE = 'subscribe';
export const CANCEL_EVENT = 'cancelled';

export const PATTERN_METADATA = 'pattern';
export const CLIENT_CONFIGURATION_METADATA = 'client';
Expand Down
2 changes: 1 addition & 1 deletion packages/microservices/interfaces/client-grpc.interface.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export interface ClientGrpc {
getService<T extends {}>(name: keyof T): T;
getService<T extends {}>(name: string): T;
}
8 changes: 4 additions & 4 deletions packages/microservices/server/server-grpc.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { fromEvent } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
import { InvalidGrpcPackageException } from '../exceptions/invalid-grpc-package.exception';
import { InvalidProtoDefinitionException } from '../exceptions/invalid-proto-definition.exception';
import { GrpcOptions, MicroserviceOptions } from '../interfaces/microservice-configuration.interface';
import { GRPC_DEFAULT_URL } from './../constants';
import { CANCEL_EVENT, GRPC_DEFAULT_URL } from './../constants';
import { CustomTransportStrategy } from './../interfaces';
import { Server } from './server';
import { fromEvent } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

let grpcPackage: any = {};

Expand Down Expand Up @@ -106,7 +106,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
const handler = methodHandler(call.request, call.metadata);
const result$ = this.transformToObservable(await handler);
await result$.pipe(
takeUntil(fromEvent(call, 'cancelled')),
takeUntil(fromEvent(call, CANCEL_EVENT)),
).forEach(data => call.write(data));
call.end();
};
Expand Down

0 comments on commit f80cd9e

Please sign in to comment.