Skip to content

Commit

Permalink
Merge branch '5.6.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
kamilmysliwiec committed Jan 16, 2019
2 parents c90b8e3 + c448e56 commit 5eee77e
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 40 deletions.
52 changes: 32 additions & 20 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
"license": "MIT",
"dependencies": {
"@grpc/proto-loader": "^0.3.0",
"@nestjs/common": "5.1.0",
"@nestjs/core": "^5.3.10",
"@nestjs/microservices": "5.1.0",
"@nestjs/common": "5.5.0",
"@nestjs/core": "^5.5.0",
"@nestjs/microservices": "5.5.0",
"@nestjs/testing": "5.1.0",
"@nestjs/websockets": "5.1.0",
"@nuxtjs/opencollective": "^0.1.0",
Expand Down
5 changes: 5 additions & 0 deletions packages/common/http/http.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ export class HttpModule {
imports: options.imports,
providers: [
...this.createAsyncProviders(options),
{
provide: AXIOS_INSTANCE_TOKEN,
useFactory: (config: HttpModuleOptions) => Axios.create(config),
inject: [HTTP_MODULE_OPTIONS],
},
{
provide: HTTP_MODULE_ID,
useValue: randomStringGenerator(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ export interface GrpcOptions {
transport?: Transport.GRPC;
options: {
url?: string;
maxSendMessageLength?: number;
maxReceiveMessageLength?: number;
credentials?: any;
protoPath: string;
package: string;
Expand Down
2 changes: 2 additions & 0 deletions packages/microservices/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ export const RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT = false;
export const RQM_DEFAULT_QUEUE_OPTIONS = {};

export const GRPC_DEFAULT_PROTO_LOADER = '@grpc/proto-loader';
export const GRPC_DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH = 4 * 1024 * 1024;
export const GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH = 4 * 1024 * 1024;
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ export interface GrpcOptions {
transport?: Transport.GRPC;
options: {
url?: string;
maxSendMessageLength?: number;
maxReceiveMessageLength?: number;
credentials?: any;
protoPath: string;
package: string;
Expand Down
100 changes: 88 additions & 12 deletions packages/microservices/server/server-grpc.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import { isObject, isUndefined } from '@nestjs/common/utils/shared.utils';
import { fromEvent } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
import { CANCEL_EVENT, GRPC_DEFAULT_PROTO_LOADER, GRPC_DEFAULT_URL } from '../constants';
import {
CANCEL_EVENT,
GRPC_DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH,
GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH,
GRPC_DEFAULT_PROTO_LOADER,
GRPC_DEFAULT_URL
} from '../constants';
import { InvalidGrpcPackageException } from '../exceptions/errors/invalid-grpc-package.exception';
import { InvalidProtoDefinitionException } from '../exceptions/errors/invalid-proto-definition.exception';
import { CustomTransportStrategy } from '../interfaces';
Expand All @@ -23,13 +30,11 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
this.getOptionsProp<GrpcOptions>(options, 'url') || GRPC_DEFAULT_URL;

const protoLoader =
this.getOptionsProp<GrpcOptions>(options, 'protoLoader') || GRPC_DEFAULT_PROTO_LOADER;
this.getOptionsProp<GrpcOptions>(options, 'protoLoader') ||
GRPC_DEFAULT_PROTO_LOADER;

grpcPackage = this.loadPackage('grpc', ServerGrpc.name);
grpcProtoLoaderPackage = this.loadPackage(
protoLoader,
ServerGrpc.name,
);
grpcProtoLoaderPackage = this.loadPackage(protoLoader, ServerGrpc.name);
}

public async listen(callback: () => void) {
Expand All @@ -55,16 +60,30 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
this.logger.error(invalidPackageError.message, invalidPackageError.stack);
throw invalidPackageError;
}
for (const name of this.getServiceNames(grpcPkg)) {

// Take all of the services defined in grpcPkg and assign them to
// method handlers defined in Controllers
for (const definition of this.getServiceNames(grpcPkg)) {
this.grpcClient.addService(
grpcPkg[name].service,
await this.createService(grpcPkg[name], name),
// First parameter requires exact service definition from proto
definition.service.service,
// Here full proto definition required along with namespaced pattern name
await this.createService(definition.service, definition.name),
);
}
}

public getServiceNames(grpcPkg: any) {
return Object.keys(grpcPkg).filter(name => grpcPkg[name].service);
/**
* Will return all of the services along with their fully namespaced
* names as an array of objects.
* This method initiates recursive scan of grpcPkg object
*/
public getServiceNames(grpcPkg: any): { name: string; service: any }[] {
// Define accumulator to collect all of the services available to load
const services: { name: string; service: any }[] = [];
// Initiate recursive services collector starting with empty name
this.collectDeepServices('', grpcPkg, services);
return services;
}

public async createService(grpcService: any, name: string) {
Expand Down Expand Up @@ -137,7 +156,10 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
}

public createClient(): any {
const server = new grpcPackage.Server();
const server = new grpcPackage.Server({
'grpc.max_send_message_length': this.getOptionsProp<GrpcOptions>(this.options, 'maxSendMessageLength', GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH),
'grpc.max_receive_message_length': this.getOptionsProp<GrpcOptions>(this.options, 'maxReceiveMessageLength', GRPC_DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH)
});
const credentials = this.getOptionsProp<GrpcOptions>(
this.options,
'credentials',
Expand Down Expand Up @@ -177,4 +199,58 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
throw invalidProtoError;
}
}

/**
* Recursively fetch all of the service methods available on loaded
* protobuf descriptor object, and collect those as an objects with
* dot-syntax full-path names.
*
* Example:
* for proto package Bundle.FirstService with service Events { rpc...
* will be resolved to object of (while loaded for Bundle package):
* {
* name: "FirstService.Events",
* service: {Object}
* }
*/
private collectDeepServices(
name: string,
grpcDefinition: any,
accumulator: { name: string; service: any }[],
) {
if (!isObject(grpcDefinition)) {
return;
}
const keysToTraverse = Object.keys(grpcDefinition);
// Traverse definitions or namespace extensions
for (const key of keysToTraverse) {
const nameExtended = this.parseDeepServiceName(name, key);
const deepDefinition = grpcDefinition[key];

const isServiceDefined = !isUndefined(deepDefinition.service);
const isServiceBoolean = isServiceDefined
? deepDefinition.service !== false
: false;

if (isServiceDefined && isServiceBoolean) {
accumulator.push({
name: nameExtended,
service: deepDefinition,
});
}
// Continue recursion until objects end or service definition found
else {
this.collectDeepServices(nameExtended, deepDefinition, accumulator);
}
}
}

private parseDeepServiceName(name: string, key: string): string {
// If depth is zero then just return key
if (name.length === 0) {
return key;
}
// Otherwise add next through dot syntax
return name + '.' + key;
}
}
2 changes: 1 addition & 1 deletion packages/microservices/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export abstract class Server {
prop: keyof T['options'],
defaultValue = undefined,
) {
return obj ? obj[prop as string] : defaultValue;
return obj && obj[prop as string] || defaultValue;
}

protected handleError(error: string) {
Expand Down
76 changes: 72 additions & 4 deletions packages/microservices/test/server/server-grpc.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,16 @@ describe('ServerGrpc', () => {
});
describe('when package exist', () => {
it('should call "addService"', async () => {
const serviceNames = ['test', 'test2'];
const serviceNames = [{
name: 'test',
service: true
}, {
name: 'test2',
service: true
}];
sinon.stub(server, 'lookupPackage').callsFake(() => ({
test: true,
test2: true,
test: {service: true},
test2: {service: true}
}));
sinon.stub(server, 'getServiceNames').callsFake(() => serviceNames);

Expand All @@ -73,7 +79,16 @@ describe('ServerGrpc', () => {
key2: { service: true },
key3: { service: false },
};
const expected = ['key', 'key2'];
const expected = [
{
name: 'key',
service: {service: true}
},
{
name: 'key2',
service: {service: true}
}
];
expect(server.getServiceNames(obj)).to.be.eql(expected);
});
});
Expand Down Expand Up @@ -225,4 +240,57 @@ describe('ServerGrpc', () => {
expect(server.deserialize(content)).to.equal(content);
});
});

describe('proto interfaces parser should account for package namespaces', () => {
it('should parse multi-level proto package tree"', () => {
const grpcPkg = {
A: {
C: {
E: {
service: {
serviceName: {}
}
}
}
},
B: {
D: {
service: {
serviceName: {}
}
}
}
};
const svcs = server.getServiceNames(grpcPkg);
expect(svcs.length).to
.be.equal(
2,
'Amount of services collected from namespace should be equal 2'
);
expect(svcs[0].name).to.be.equal('A.C.E');
expect(svcs[1].name).to.be.equal('B.D');
});
it('should parse single level proto package tree"', () => {
const grpcPkg = {
A: {
service: {
serviceName: {}
}
},
B: {
service: {
serviceName: {}
}
}
};
const services = server.getServiceNames(grpcPkg);
expect(services.length).to
.be.equal(
2,
'Amount of services collected from namespace should be equal 2'
);
expect(services[0].name).to.be.equal('A');
expect(services[1].name).to.be.equal('B');
});
});
});

0 comments on commit 5eee77e

Please sign in to comment.