Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/timonson/gentle_rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
timonson committed Sep 19, 2021
2 parents f8a5609 + 0180289 commit f67d442
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 36 deletions.
2 changes: 2 additions & 0 deletions client/proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type WsProxyFunction = {
params?: RpcRequest["params"],
) => ReturnType<WsRemote["call"]>;
subscribe: () => ReturnType<WsRemote["subscribe"]>;
listen: () => ReturnType<WsRemote["listen"]>;
};
export type WsProxy =
& {
Expand All @@ -79,6 +80,7 @@ export const wsProxyHandler = {
const proxyFunction: WsProxyFunction = (args?) => client.call(name, args);
proxyFunction.notify = (args?) => client.call(name, args, true);
proxyFunction.subscribe = () => client.subscribe(name);
proxyFunction.listen = () => client.listen(name);
return proxyFunction;
}
},
Expand Down
9 changes: 9 additions & 0 deletions client/validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,19 @@ import { BadServerDataError } from "./error.ts";
import type {
JsonValue,
RpcFailure,
RpcNotification,
RpcResponseBasis,
RpcSuccess,
} from "../json_rpc_types.ts";

export function validateRpcNotification(data: any): data is RpcNotification {
return (
data?.jsonrpc === "2.0" &&
typeof data.method === "string" &&
typeof data.id === "undefined"
);
}

function validateRpcBasis(data: any): data is RpcResponseBasis {
return (
data?.jsonrpc === "2.0" &&
Expand Down
110 changes: 77 additions & 33 deletions client/ws.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { createRequest } from "./creation.ts";
import { validateResponse } from "./validation.ts";
import { validateResponse, validateRpcNotification } from "./validation.ts";
import { BadServerDataError } from "./error.ts";

import type { JsonValue, RpcRequest } from "../json_rpc_types.ts";
import type { JsonObject, JsonValue, RpcRequest } from "../json_rpc_types.ts";

function isObject(obj: unknown): obj is Record<string, unknown> {
return (
Expand Down Expand Up @@ -47,9 +47,29 @@ export class Remote {
return this.textDecoder || (this.textDecoder = new TextDecoder());
}

private async *iterateOverPayloadData(
private async *iterateRequests(
rpcRequest: RpcRequest,
): AsyncGenerator<JsonValue> {
while (this.socket.readyState < 2) {
const payloadData = await this.payloadData;
if (payloadData === null) {
break;
}
const parsedData = JSON.parse(payloadData);

// Batch emits are handled by the subscription iterator
if (Array.isArray(parsedData)) continue;

const rpcResponse = validateResponse(parsedData);
if (rpcResponse.id === rpcRequest.id) {
yield rpcResponse.result;
break;
}
}
}

private async *iterateSubscriptions(
rpcRequest: RpcRequest,
{ isOnetime }: { isOnetime: boolean },
): AsyncGenerator<JsonValue> {
while (this.socket.readyState < 2) {
try {
Expand All @@ -59,50 +79,70 @@ export class Remote {
}
const parsedData = JSON.parse(payloadData);

//Processes for the method 'emitBatch':
if (Array.isArray(parsedData) && !isOnetime && parsedData.length > 0) {
const invalid = parsedData.map(validateResponse).find((res) =>
// Process for the method 'emitBatch':
if (Array.isArray(parsedData) && parsedData.length > 0) {
const rpcResponses = parsedData.map(validateResponse);
const invalid = rpcResponses.find((res) =>
!isObject(res.result) || res.result.event !== "emitted"
);
if (invalid) {
throw new BadServerDataError(
invalid.id ? invalid.id : null,
invalid.id || null,
"The server returned an invalid batch response.",
-32004,
);
} else {
continue;
for (const res of rpcResponses) {
if ((res.result as JsonObject).id === rpcRequest.id) {
yield res.result;
}
}
}
} else {
const rpcResponse = validateResponse(parsedData);
if (
!isOnetime &&
isObject(rpcResponse.result) &&
rpcResponse.result.id === rpcRequest.id
) {
if (
rpcResponse.result.event === "subscribed" ||
rpcResponse.result.event === "emitted"
) {
continue;
}
if (rpcResponse.result.event === "unsubscribed") {
break;
}
}
if (rpcResponse.id === rpcRequest.id) {
yield rpcResponse.result;
if (isOnetime) {
break;
switch (rpcResponse.result.event) {
case "subscribed":
continue;
case "unsubscribed":
break;
case "emitted":
yield rpcResponse.result;
break;
default:
throw new BadServerDataError(
rpcResponse.id ? rpcResponse.id : null,
"The server returned an invalid response.",
-32004,
);
}
}
}
} catch (err) {
if (err.id === rpcRequest.id) {
yield Promise.reject(err);
if (isOnetime) {
break;
}
}
}
}
}

private async *iterateNotifications(
eventName: RpcRequest["method"],
): AsyncGenerator<JsonValue> {
while (this.socket.readyState < 2) {
const payloadData = await this.payloadData;
if (payloadData === null) {
break;
}
const parsedData = JSON.parse(payloadData);

if (validateRpcNotification(parsedData)) {
const rpcNotification = parsedData;
if (rpcNotification.method === eventName) {
yield rpcNotification.params || null;
}
}
}
Expand All @@ -122,9 +162,7 @@ export class Remote {
rpcRequest,
));
if (isNotification) return Promise.resolve(undefined);
const generator = this.iterateOverPayloadData(rpcRequest, {
isOnetime: true,
});
const generator = this.iterateRequests(rpcRequest);
return generator.next().then((p) => p.value);
}

Expand All @@ -141,9 +179,7 @@ export class Remote {
},
));
return {
generator: this.iterateOverPayloadData(rpcRequest, {
isOnetime: false,
}),
generator: this.iterateSubscriptions(rpcRequest),
unsubscribe: (params?: RpcRequest["params"]): void => {
const rpcRequestUnsubscription = createRequest({
method: "unsubscribe",
Expand Down Expand Up @@ -173,4 +209,12 @@ export class Remote {
},
};
}

listen(
eventName: RpcRequest["method"],
) {
return {
generator: this.iterateNotifications(eventName),
};
}
}
7 changes: 5 additions & 2 deletions json_rpc_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ export type RpcId = number | string | null;
export type RpcParams = JsonArray | JsonObject;
export type RpcMethod = string;

export interface RpcRequest {
export interface RpcNotification {
jsonrpc: RpcVersion;
method: RpcMethod;
id?: RpcId;
params?: RpcParams;
}

export interface RpcRequest extends RpcNotification {
id?: RpcId;
}

export type RpcResponse = RpcSuccess | RpcFailure;

export type RpcBatchRequest = RpcRequest[];
Expand Down
7 changes: 6 additions & 1 deletion server/ws_internal_methods.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ function emit(
dispatchEvent(
new CustomEvent("emit", { detail: { method, params } }),
);
return { event: "emitted", id, method };
return {
event: "emitted",
id,
method,
...(typeof params !== "undefined" ? { params } : {}),
};
} else {
throw new Error("Wrong arguments.");
}
Expand Down

0 comments on commit f67d442

Please sign in to comment.