Skip to content

Commit

Permalink
First shot at ws server fix
Browse files Browse the repository at this point in the history
  • Loading branch information
timonson committed Sep 29, 2021
1 parent 4ae7f6f commit 9462a62
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 156 deletions.
80 changes: 57 additions & 23 deletions server/creation.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { CustomError } from "./custom_error.ts";
import { verifyJwt } from "./auth.ts";

import type {
JsonObject,
Expand All @@ -16,8 +17,12 @@ export type CreationInput = {
methods: Methods;
options: Required<Options>;
};
export type RpcResponseOrBatchOrNull =
| RpcResponse
| RpcBatchResponse
| null;
type RpcResponseOrNull = RpcResponse | null;
type BatchResponseOrNull = RpcBatchResponse | null;
type RpcBatchResponseOrNull = RpcBatchResponse | null;

async function executeMethods(
obj: ValidationObject,
Expand Down Expand Up @@ -85,14 +90,36 @@ function addArgument(

export async function cleanBatch(
batch: Promise<RpcResponseOrNull>[],
): Promise<BatchResponseOrNull> {
): Promise<RpcBatchResponseOrNull> {
const batchResponse = (await Promise.all(batch)).filter((
obj: RpcResponseOrNull,
): obj is RpcResponse => obj !== null);
return batchResponse.length > 0 ? batchResponse : null;
}

export async function createResponseObject(
export function createRpcResponseObject(
obj:
| Omit<RpcSuccess, "jsonrpc">
| (RpcError & { id: RpcSuccess["id"] }),
): RpcResponse {
return "result" in obj && !("code" in obj)
? {
jsonrpc: "2.0",
result: obj.result,
id: obj.id,
}
: {
jsonrpc: "2.0",
error: {
code: obj.code,
message: obj.message,
data: obj.data,
},
id: obj.id,
};
}

export async function createRpcResponse(
{ validationObject, methods, options, jwtPayload }: CreationInput & {
jwtPayload?: Payload;
},
Expand Down Expand Up @@ -120,24 +147,31 @@ export async function createResponseObject(
}
}

export function createRpcResponseObject(
obj:
| Omit<RpcSuccess, "jsonrpc">
| (RpcError & { id: RpcSuccess["id"] }),
): RpcResponse {
return "result" in obj && !("code" in obj)
? {
jsonrpc: "2.0",
result: obj.result,
id: obj.id,
}
: {
jsonrpc: "2.0",
error: {
code: obj.code,
message: obj.message,
data: obj.data,
},
id: obj.id,
};
export async function createRpcResponseOrBatch(
validationObjectOrBatch: ValidationObject | ValidationObject[],
methods: Methods,
options: Required<Options>,
authHeader: string | null,
): Promise<RpcResponseOrBatchOrNull> {
return Array.isArray(validationObjectOrBatch)
? await cleanBatch(
validationObjectOrBatch.map(async (validationObject) =>
createRpcResponse(
await verifyJwt({
validationObject,
methods,
options,
authHeader,
}),
)
),
)
: await createRpcResponse(
await verifyJwt({
validationObject: validationObjectOrBatch,
methods,
options,
authHeader,
}),
);
}
40 changes: 12 additions & 28 deletions server/http.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { cleanBatch, createResponseObject } from "./creation.ts";
import { cleanBatch, createRpcResponseOrBatch } from "./creation.ts";
import { validateRequest } from "./validation.ts";
import { verifyJwt } from "./auth.ts";

import type { Methods, Options } from "./response.ts";

Expand All @@ -11,27 +10,12 @@ export async function handleHttpRequest(
authHeader: string | null,
): Promise<Response> {
const validationObjectOrBatch = validateRequest(await req.text(), methods);
const responseObjectOrBatchOrNull = Array.isArray(validationObjectOrBatch)
? await cleanBatch(
validationObjectOrBatch.map(async (rpc) =>
createResponseObject(
await verifyJwt({
validationObject: rpc,
methods,
options,
authHeader,
}),
)
),
)
: await createResponseObject(
await verifyJwt({
validationObject: validationObjectOrBatch,
methods,
options,
authHeader,
}),
);
const rpcResponseOrBatchOrNull = await createRpcResponseOrBatch(
validationObjectOrBatch,
methods,
options,
authHeader,
);
const headers = new Headers(options.headers);
if (options.cors) {
headers.append("access-control-allow-origin", "*");
Expand All @@ -40,21 +24,21 @@ export async function handleHttpRequest(
"Content-Type, Authorization",
);
}
if (responseObjectOrBatchOrNull === null) {
if (rpcResponseOrBatchOrNull === null) {
return new Response(null, { status: 204, headers: headers });
} else {
headers.append("content-type", "application/json");
return ("error" in responseObjectOrBatchOrNull &&
responseObjectOrBatchOrNull.error.code === -32700)
return ("error" in rpcResponseOrBatchOrNull &&
rpcResponseOrBatchOrNull.error.code === -32700)
? new Response(
JSON.stringify(responseObjectOrBatchOrNull),
JSON.stringify(rpcResponseOrBatchOrNull),
{
status: 400,
headers,
},
)
: new Response(
JSON.stringify(responseObjectOrBatchOrNull),
JSON.stringify(rpcResponseOrBatchOrNull),
{
status: 200,
headers,
Expand Down
87 changes: 30 additions & 57 deletions server/response.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import { handleHttpRequest } from "./http.ts";
import { handleWs } from "./ws.ts";
import { acceptWebSocket } from "./deps.ts";
import { internalMethods } from "./ws_internal_methods.ts";
import { createRpcResponseObject } from "./creation.ts";

import type { WebSocket } from "./deps.ts";
import type { JsonValue } from "../json_rpc_types.ts";

export type Methods = {
Expand Down Expand Up @@ -38,7 +36,7 @@ export type Options = {

export async function respond(
methods: Methods,
req: Request,
req: any,
{
headers = new Headers(),
publicErrorStack = false,
Expand All @@ -51,7 +49,7 @@ export async function respond(
) {
switch (proto) {
case "http":
const response = await handleHttpRequest(
return await handleHttpRequest(
req,
methods,
{
Expand All @@ -65,61 +63,36 @@ export async function respond(
},
req.headers.get("Authorization"),
);
break;
case "ws":
const methodsAndIdsStore = new Map();
const { socket, response } = Deno.upgradeWebSocket(req.request);
handleWs(
{
socket,
methods: enableInternalMethods
? { ...methods, ...internalMethods }
: methods,
options: {
headers,
publicErrorStack,
enableInternalMethods,
additionalArguments: enableInternalMethods
? [...additionalArguments, {
args: { methodsAndIdsStore },
methods: ["subscribe", "unsubscribe"],
}]
: additionalArguments,
proto,
cors,
auth,
},
},
req.request.headers.get("sec-websocket-protocol"),
);
req.respondWith(response);
return response;
break;
// case "ws":
// const { conn, r: bufReader, w: bufWriter, headers: reqHeaders } = req;
// return acceptWebSocket({
// conn,
// bufReader,
// bufWriter,
// headers: reqHeaders,
// })
// .then((socket: WebSocket) => {
// const methodsAndIdsStore = new Map();
// if (enableInternalMethods) {
// return handleWs(
// {
// socket,
// methods: { ...methods, ...internalMethods },
// options: {
// headers,
// publicErrorStack,
// enableInternalMethods,
// additionalArguments: [...additionalArguments, {
// args: { methodsAndIdsStore },
// methods: ["subscribe", "unsubscribe"],
// }],
// proto,
// cors,
// auth,
// },
// },
// );
// } else {
// return handleWs(
// {
// socket,
// methods,
// options: {
// headers,
// publicErrorStack,
// enableInternalMethods,
// additionalArguments,
// proto,
// cors,
// auth,
// },
// },
// );
// }
// })
// .catch(async (err) => {
// console.error(`Failed to accept websocket: ${err}`);
// await req.respond({ status: 400 });
// return err;
// });
// break;
default:
throw new TypeError(`The protocol '${proto}' is not supported.`);
}
Expand Down
Loading

0 comments on commit 9462a62

Please sign in to comment.