Skip to content

Commit

Permalink
Prepare to move RPC into an npm module (cruise-automation#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
brianc authored Jan 25, 2019
1 parent 5a457a2 commit 7f8f29d
Show file tree
Hide file tree
Showing 5 changed files with 404 additions and 0 deletions.
74 changes: 74 additions & 0 deletions packages/@cruise-automation/rpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# @cruise-automation/rpc

An RPC layer to make it easier to communicate between a WebWorker and the main thread. Has support for sending and responding with [transferable](https://developer.mozilla.org/en-US/docs/Web/API/Transferable) objects to avoid structured cloning of large array buffers. It also propagates errors thrown in receivers back to the calling thread.

## Example

```js
// worker.js
import Rpc from '@cruise-automation/rpc'

const rpc = new Rpc(global);

rpc.receive('message', async ({ url }) => {
const res = await fetch(url);
if (!res.ok) {
throw new Error('Bad response ' + res.status);
}
const json = await res.json()
return { body: json }
});

```

```js
// ui-thread.js
const worker = new WebWorker('./worker.js')
const rpc = new Rpc(worker);
rpc.send('message', { url }).then(({ body }) => {
console.log('I got a response', body);
});

```

## API

The `Rpc` constructor takes a [`MessagePort`](https://developer.mozilla.org/en-US/docs/Web/API/MessagePort) as its constructor argument. In a `WebWorker` you generally would use `global` and on the UI thread you would use the instance of the `WebWorker` as the `MessagePort`.

### `rpc.send<TResult>(topic: string, data: any, transferables: Transferable[]): Promise<TResult>`

The `send` method takes a topic name and any data. This data is sent over the `MessagePort` and can be received on the other end with a registered `rpc.receive()` receiver on the same topic. You may also specify an optional array of [transferable](https://developer.mozilla.org/en-US/docs/Web/API/Transferable) objects. This returns a promise which resolves with whatever the handler registered on `rpc.receive` returns.

```js
const rpc = new Rpc(new WebWorker('/worker-script.js'))

rpc.send('fetch-and-parse', { url: '/lots-of-binary-data' }).then(({ result }) => {
console.log(result);
});
```

### `rpc.receive<T, TOut>(topic: string, handler: (T) => TOut): void`

The `receive` method registers a function to be called whenever a message is received on the specified topic. This function's return value can be waited on by a promise from the caller. To return an object with a list of [transferable](https://developer.mozilla.org/en-US/docs/Web/API/Transferable) objects in the graph you can add the list with a special key to the response from your receiver.

```js
// worker-script.js
rpc.receive('fetch-and-parse', async ({ url }) => {
const res = await fetch(url);
if (!res.ok) {
throw new Error('Bad response ' + res.status);
}
const arrayBuffer = await res.arrayBuffer();
const result = doLongRunningParseOperation(arrayBuffer);
return {
result,
[Rpc.transferables]: [result]
}
});
```

If the `handler` throws or rejects the error message will be sent through the `MessagePort` and calling thread's promise will reject.

### `Rpc.transferable`

This is a static property on the `Rpc` class that contains the magic string you must use as a key when responding to a message in a receiver and attaching transferables to the response.
5 changes: 5 additions & 0 deletions packages/@cruise-automation/rpc/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions packages/@cruise-automation/rpc/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "@cruise-automation/rpc",
"version": "0.0.0",
"description": "Add RPC to WebWorkers with transferrable object support",
"main": "lib/index.js",
"license": "Apache-2.0",
"scripts": {
"build-dev": "BABEL_ENV=$NODE_ENV babel src/index.js --out-dir lib --copy-files --config-file ../../../babel.config.js",
"build": "NODE_ENV=production npm run build-dev && flow-copy-source -v src lib --ignore '*.test.*'",
"watch": "NODE_ENV=development npm run build-dev --watch"
},
"keywords": [
"WebWorker",
"rpc"
]
}
138 changes: 138 additions & 0 deletions packages/@cruise-automation/rpc/src/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// @flow
//
// Copyright (c) 2018-present, GM Cruise LLC
//
// This source code is licensed under the Apache License, Version 2.0,
// found in the LICENSE file in the root directory of this source tree.
// You may not use this file except in compliance with the License.

// this type mirrors the MessageChannel api which is available on
// instances of web-workers as well as avaiable on 'global' within a worker

export type Channel = {
postMessage: (data: any, transfer?: any[]) => void,
onmessage?: (ev: MessageEvent) => void,
};

const RESPONSE = "$$RESPONSE";
const ERROR = "$$ERROR";

// helper function to create linked channels for testing
export function createLinkedChannels(): { local: Channel, remote: Channel } {
const local: Channel = {
postMessage(data: any, transfer?: Array<ArrayBuffer>) {
const ev = new MessageEvent("message", { data });
// eslint-disable-next-line no-use-before-define
if (remote.onmessage) {
remote.onmessage(ev); // eslint-disable-line no-use-before-define
}
},
};

const remote: Channel = {
postMessage(data: any, transfer?: Array<ArrayBuffer>) {
const ev = new MessageEvent("message", { data });
if (local.onmessage) {
local.onmessage(ev);
}
},
};
return { local, remote };
}

// This class allows you to hook up bi-directional async calls across web-worker
// boundaries where a single call to or from a worker can 'wait' on the response.
// Errors in receivers are propigated back to the caller as a rejection.
// It also supports returning transferables over the web-worker postMessage api,
// which was the main shortcomming with the worker-rpc npm module.
// To attach rpc to an instance of a worker in the main thread:
// const rpc = new Rpc(workerInstace);
// To attach rpc within an a web worker:
// const rpc = new Rpc(global);
// Check out the tests for more examples.
export default class Rpc {
static transferables = "$$TRANSFERABLES";
_channel: Channel;
_messageId: number = 0;
_pendingCallbacks: { [number]: (any) => void } = {};
_receivers: Map<string, (any) => any> = new Map();

constructor(channel: Channel) {
this._channel = channel;
if (this._channel.onmessage) {
throw new Error("channel.onmessage is already set. Can only use one Rpc instance per channel.");
}
this._channel.onmessage = this._onChannelMessage;
}

_onChannelMessage = (ev: MessageEvent) => {
const { id, topic, data } = (ev.data: any);
if (topic === RESPONSE) {
this._pendingCallbacks[id](ev.data);
delete this._pendingCallbacks[id];
return;
}
// invoke the receive handler in a promise so if it throws synchronously we can reject
new Promise((resolve, reject) => {
const handler = this._receivers.get(topic);
if (!handler) {
throw new Error(`no receiver registered for ${topic}`);
}
// This works both when `handler` returns a value or a Promise.
resolve(handler(data));
})
.then((result) => {
if (!result) {
return this._channel.postMessage({ topic: RESPONSE, id });
}
const transferables = result[Rpc.transferables];
delete result[Rpc.transferables];
const message = {
topic: RESPONSE,
id,
data: result,
};
this._channel.postMessage(message, transferables);
})
.catch((err) => {
const message = {
topic: RESPONSE,
id,
data: {
[ERROR]: true,
message: err.message,
},
};
this._channel.postMessage(message);
});
};

// Send a message across the rpc boundary to a receiver on the other side.
// This returns a promise for the receiver's response. If there is no registered
// receiver for the given topic, this method throws.
send<TResult>(topic: string, data: any, transfer?: ArrayBuffer[]): Promise<TResult> {
const id = this._messageId++;
const message = { topic, id, data };
const result = new Promise((resolve, reject) => {
this._pendingCallbacks[id] = (info) => {
if (info.data && info.data[ERROR]) {
reject(new Error(info.data.message));
} else {
resolve(info.data);
}
};
});
this._channel.postMessage(message, transfer);
return result;
}

// Register a receiver for a given message on a topic.
// Only one receiver can be registered per topic, and currently
// 'deregistering' a receiver is not supported.
receive<T, TOut>(topic: string, handler: (T) => TOut) {
if (this._receivers.has(topic)) {
throw new Error(`Receiver already registered for topic: ${topic}`);
}
this._receivers.set(topic, handler);
}
}
Loading

0 comments on commit 7f8f29d

Please sign in to comment.