Skip to content

Commit

Permalink
Merge branch 'grpc-proto-tests-upgrade' of https://github.com/anton-a…
Browse files Browse the repository at this point in the history
…lation/nest into anton-alation-grpc-proto-tests-upgrade
  • Loading branch information
kamilmysliwiec committed May 30, 2019
2 parents a3ad69d + bf17608 commit fca5411
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 0 deletions.
166 changes: 166 additions & 0 deletions integration/microservices/e2e/orders-grpc.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import { INestApplication } from '@nestjs/common';
import { Transport } from '@nestjs/microservices';
import { Test } from '@nestjs/testing';
import * as express from 'express';
import { join } from 'path';
import * as request from 'supertest';
import * as ProtoLoader from '@grpc/proto-loader';
import * as GRPC from 'grpc';
import { expect } from 'chai';
import { fail } from 'assert';
import { AdvancedGrpcController } from '../src/grpc-advanced/advanced.grpc.controller';

describe('Advanced GRPC transport', () => {
let server;
let app: INestApplication;
let client: any;

before(async () => {
const module = await Test.createTestingModule({
controllers: [AdvancedGrpcController],
}).compile();
// Create gRPC + HTTP server
server = express();
app = module.createNestApplication(server);
/*
* Create microservice configuration
*/
app.connectMicroservice({
transport: Transport.GRPC,
options: {
package: 'proto_example',
protoPath: 'root.proto',
loader: {
includeDirs: [join(__dirname, '../src/grpc-advanced/proto')],
keepCase: true,
},
},
});
// Start gRPC microservice
await app.startAllMicroservicesAsync();
await app.init();
// Load proto-buffers for test gRPC dispatch
const proto = ProtoLoader.loadSync('root.proto', {
includeDirs: [join(__dirname, '../src/grpc-advanced/proto')],
}) as any;
// Create Raw gRPC client object
const protoGRPC = GRPC.loadPackageDefinition(proto) as any;
// Create client connected to started services at standard 5000 port
client = new protoGRPC.proto_example.orders.OrderService(
'localhost:5000',
GRPC.credentials.createInsecure(),
);

});

it(`GRPC Sending and Receiving HTTP POST`, () => {
return request(server)
.post('/')
.send('1')
.expect(200, {
id: 1,
itemTypes: [1],
shipmentType: {
from: 'test',
to: 'test1',
carrier: 'test-carrier',
},
});
});

it('GRPC Sending and receiving message', async () => {

// Execute find in Promise
return new Promise(resolve => {
client.find({
id: 1,
}, (err, result) => {
// Compare results
expect(err).to.be.null;
expect(result).to.eql({
id: 1,
itemTypes: [1],
shipmentType: {
from: 'test',
to: 'test1',
carrier: 'test-carrier',
},
});
// Resolve after checkups
resolve();
});

});

});

it('GRPC Sending and receiving Stream from RX handler', async () => {

const callHandler = client.sync();

callHandler.on('data', (msg: number) => {
// Do deep comparison (to.eql)
expect(msg).to.eql({
id: 1,
itemTypes: [1],
shipmentType: {
from: 'test',
to: 'test1',
carrier: 'test-carrier',
},
});
});

callHandler.on('error', (err: any) => {
// We want to fail only on real errors while Cancellation error
// is expected
if (String(err).toLowerCase().indexOf('cancelled') === -1) {
fail('gRPC Stream error happened, error: ' + err);
}
});

return new Promise((resolve, reject) => {
callHandler.write({
id: 1,
});
setTimeout(() => resolve(), 1000);
});

});

it('GRPC Sending and receiving Stream from Call handler', async () => {

const callHandler = client.syncCall();

callHandler.on('data', (msg: number) => {
// Do deep comparison (to.eql)
expect(msg).to.eql({
id: 1,
itemTypes: [1],
shipmentType: {
from: 'test',
to: 'test1',
carrier: 'test-carrier',
},
});
});

callHandler.on('error', (err: any) => {
// We want to fail only on real errors while Cancellation error
// is expected
if (String(err).toLowerCase().indexOf('cancelled') === -1) {
fail('gRPC Stream error happened, error: ' + err);
}
});

return new Promise((resolve, reject) => {
callHandler.write({
id: 1,
});
setTimeout(() => resolve(), 1000);
});

});


});
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { Body, Controller, HttpCode, Post } from '@nestjs/common';
import {
Client, ClientGrpc, GrpcMethod,
GrpcStreamMethod, GrpcStreamCall, Transport,
} from '@nestjs/microservices';
import { join } from 'path';
import { Observable, of, Subject } from 'rxjs';

@Controller()
export class AdvancedGrpcController {
/*
* HTTP Proxy Client defines loading pattern
*/
@Client({
transport: Transport.GRPC,
options: {
package: 'proto_example.orders',
protoPath: 'root.proto',
loader: {
includeDirs: [join(__dirname, './proto')],
keepCase: true,
},
},
})
client: ClientGrpc;

/**
* HTTP Proxy entry for support non-stream find method
* @param id
*/
@Post()
@HttpCode(200)
call(@Body() id: number): Observable<number> {
const svc = this.client.getService<any>('OrderService');
return svc.find({ id });
}

/**
* GRPC stub for Find method
* @param id
*/
@GrpcMethod('orders.OrderService')
async find({ id }: { id: number }): Promise<any> {
return of({
id: 1,
itemTypes: [1],
shipmentType: {
from: 'test',
to: 'test1',
carrier: 'test-carrier',
},
});
}

/**
* GRPC stub implementation for sync stream method
* @param messages
*/
@GrpcStreamMethod('orders.OrderService')
async sync(messages: Observable<any>): Promise<any> {
const s = new Subject();
const o = s.asObservable();
messages.subscribe(msg => {
s.next({
id: 1,
itemTypes: [1],
shipmentType: {
from: 'test',
to: 'test1',
carrier: 'test-carrier',
},
});
});
return o;
}

/**
* GRPC stub implementation for syncCall stream method (implemented through call)
* @param stream
*/
@GrpcStreamCall('orders.OrderService')
async syncCall(stream: any) {
stream.on('data', (msg: any) => {
stream.write({
id: 1,
itemTypes: [1],
shipmentType: {
from: 'test',
to: 'test1',
carrier: 'test-carrier',
},
});
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
syntax = "proto3";
package proto_example.common.items;

enum ItemType {
DEFAULT = 0;
SUPERIOR = 1;
FLAWLESS = 2;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
syntax = "proto3";
package proto_example.common.shipments;

message ShipmentType {
string from = 1;
string to = 2;
string carrier = 3;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
syntax = "proto3";
package proto_example.orders;

import public "common/item_types.proto";
import public "common/shipment_types.proto";

message Order {
int32 id = 1;
repeated common.items.ItemType itemTypes = 2;
common.shipments.ShipmentType shipmentType = 3;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
syntax = "proto3";
import "orders/message.proto";
package proto_example.orders;

service OrderService {
rpc Find(Order) returns (Order);
rpc Sync(stream Order) returns (stream Order);
rpc SyncCall(stream Order) returns (stream Order);
}
3 changes: 3 additions & 0 deletions integration/microservices/src/grpc-advanced/proto/root.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
syntax = "proto3";
package proto_example;
import public "orders/service.proto";

0 comments on commit fca5411

Please sign in to comment.