Skip to content

Commit

Permalink
adding a well-typed server spec in prep for a typesafe client
Browse files Browse the repository at this point in the history
  • Loading branch information
coffeemug committed Jun 29, 2021
1 parent f050702 commit 1aab629
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 74 deletions.
31 changes: 25 additions & 6 deletions integration-tests/__tests__/basic.spec.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import * as t from 'io-ts';
import WS from 'ws';
import { Server, AliasError } from 'ts-alias-server';
import { fromRpc, OnRpcFn, fromChannel, OnChannelFn } from 'ts-alias-server';
import { _rpc, RpcFn, _channel, ConnectFn } from 'ts-alias-server';
import { RpcClient } from 'ts-alias-client';

/*
Type safe rpc calls
*/
type Context = void;
const rpc = <ArgsT extends t.Mixed>(argsT: ArgsT, onRpc: OnRpcFn<t.TypeOf<ArgsT>, Context>) => fromRpc(argsT, onRpc);
const channel = <ArgsT extends t.Mixed>(args: ArgsT, onChannel: OnChannelFn<t.TypeOf<ArgsT>, Context>) => fromChannel(args, onChannel);
const rpc = <ArgsT extends t.Mixed>(argsT: ArgsT, onRpc: RpcFn<t.TypeOf<ArgsT>, Context>) => _rpc<Context, ArgsT>(argsT, onRpc);
const channel = <ArgsT extends t.Mixed, Event>(args: ArgsT, onChannel: ConnectFn<Context, t.TypeOf<ArgsT>, Event>) => _channel(args, onChannel);

/*
Test a call
Expand Down Expand Up @@ -72,15 +72,34 @@ test('', async () => {
expect(x).toBe(undefined);
});

/*
Test a channel
*/
const triple = channel<typeof t.number, number>(t.number, async ({ emit }, args, _) => {
emit("ok", args * 3);
});

test('', async () => {
const x = await client.call("triple", 7);
expect(x).toBe(21);
});

test('', async () => {
const x = await new Promise((resolve, reject) => {
client.watch("triple", 7, reject, resolve);
});
expect(x).toBe(21);
});

/*
Setup and teardown boilerplate
*/
const channels = { div, raise };
let server: Server<Context>;
const channels = { div, raise, triple };
let server: Server<Context, typeof channels>;
let client: RpcClient<Context>;
beforeAll(() => {
// start the server
server = new Server<Context>({
server = new Server<Context, typeof channels>({
onContext: () => {},
channels,
port: 443,
Expand Down
38 changes: 25 additions & 13 deletions server/src/core/config.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,51 @@
import * as t from 'io-ts';
import { Status, Request, ErrorBody } from 'ts-alias-wire';

/*
Server configuration
*/
export interface Config<Context> {
export interface Config<Context, SpecT extends ServerBaseSpec> {
channels: ServerSpec<Context, SpecT>,
onContext: ContextFn<Context>,
channels: { [k: string]: ConnectFn<Context> | undefined },
port: number,
port?: number,
}

// Context
export type ContextFn<Context>
= (metadata: unknown) => Promise<Context> | Context;
export type ServerSpec<Context, SpecT extends ServerBaseSpec> = {
[Key in keyof SpecT]: SpecT[Key] extends ChannelSpec<Context, SpecT[Key]['argSpec'], infer _> ? SpecT[Key] : never;
};

export type ServerBaseSpec = Record<string, { argSpec: t.Mixed, onConnect: unknown }>;

export type ChannelSpec<Context, ArgSpecT extends t.Mixed, Event> = {
argSpec: ArgSpecT,
onConnect: ConnectFn<Context, t.TypeOf<ArgSpecT>, Event>
}

// New connection
export type ConnectFn<Context> = (
controls: Channel,
args: unknown,
export type ConnectFn<Context, ArgT, Event> = (
controls: Channel<Event>,
args: ArgT,
context: Context,
request: Request
) => Promise<Destructor | void>;

// Channel interface
export interface Channel {
export interface Channel<OkBody> {
emit: <statusT extends Status>(
status: statusT,
response: ResponseT[statusT]
response: ResponseT<OkBody>[statusT]
) => void;
}

export type Destructor = () => void;

// Context
export type ContextFn<Context>
= (metadata: unknown) => Promise<Context> | Context;

// type util
export type ResponseT = {
ok: unknown,
export type ResponseT<Event> = {
ok: Event,
error: ErrorBody,
};

37 changes: 30 additions & 7 deletions server/src/core/request.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { Config, ResponseT } from './config';
import { isLeft } from 'fp-ts/Either';
import { PathReporter } from 'io-ts/PathReporter'
import { Config, ServerBaseSpec } from './config';
import { Request, OkEvent, ErrorEvent, Status, ErrorBody } from 'ts-alias-wire';

/*
Actual request handling (would be nice to break up this function)
*/
export const handleRequest = async <Context>(socket: any, message: string, config: Config<Context>) => {
const emit = <statusT extends Status>(status: statusT, message: ResponseT[statusT]) => {
export const handleRequest = async <Context, SpecT extends ServerBaseSpec>(socket: any, message: string, config: Config<Context, SpecT>) => {
const emit = (status: Status, message: unknown) => {
try {
// I think there is a typescript limitation that requires explicitly adding this
// runtime branch. It should be possible to eventually remove it and have the
Expand Down Expand Up @@ -53,7 +55,7 @@ export const handleRequest = async <Context>(socket: any, message: string, confi
}

// grab the requested channel
const channel = config.channels[request.channel];
const channel = config.channels[request.channel as keyof SpecT];
if (!channel) {
const res: ErrorBody = {
identifier: 'unknown_call',
Expand All @@ -62,7 +64,28 @@ export const handleRequest = async <Context>(socket: any, message: string, confi
return;
}

// validate the incoming message
const decoded = channel.argSpec.decode(request.args);
if (isLeft(decoded)) {
const parseErrors = PathReporter.report(decoded);
const res: ErrorBody = {
identifier: 'bad_arguments',
message: JSON.stringify(parseErrors, null, 2),
};
emit('error', res);
return;
}

// fire in the hole!
const context = await config.onContext(request.metadata);
return await channel({ emit }, request.args, context, request);
}
try {
const context = config.onContext && await config.onContext(request.metadata);
return await channel.onConnect({ emit }, request.args, context, request);
} catch (err: any) {
console.log("ERRORING:", err.toString());
const res: ErrorBody = {
identifier: 'internal_server_error',
message: err.toString(),
}
emit('error', res);
}
}
6 changes: 3 additions & 3 deletions server/src/core/server.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import WebSocket from "ws";
import { Config, Destructor } from './config';
import { Config, ServerBaseSpec, Destructor } from './config';
import { handleRequest } from './request';

/*
Server code
*/
class Server<Context> {
class Server<Context, SpecT extends ServerBaseSpec> {
wss?: WebSocket.Server;
constructor(public config: Config<Context>) {
constructor(public config: Config<Context, SpecT>) {
}

async start() {
Expand Down
71 changes: 26 additions & 45 deletions server/src/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,67 +1,48 @@
import { isLeft } from 'fp-ts/Either';
import * as t from 'io-ts';
import { isLeft } from 'fp-ts/Either';
import { PathReporter } from 'io-ts/PathReporter'
import { ConnectFn, Channel, Destructor } from './core/config';
import { ConnectFn, Channel, ChannelSpec, Destructor } from './core/config';
import { ErrorBody } from 'ts-alias-wire';

/*
Type safe channels
*/
export type OnChannelFn<ArgsT extends t.Mixed, Context> =
(controls: Channel, args: ArgsT, context: Context)
=> Promise<Destructor | void>;
export const _channel =
<Context, Event, ArgSpecT extends t.Mixed>(
argsType: ArgSpecT,
start: ConnectFn<Context, t.TypeOf<ArgSpecT>, Event>
): ChannelSpec<Context, ArgSpecT, Event> => ({
argSpec: argsType,
onConnect: start,
});

export const fromChannel =
<ArgsT extends t.Mixed, Context>
(argsType: ArgsT, channel: OnChannelFn<ArgsT, Context>): ConnectFn<Context> =>
{
const onSubscribe: ConnectFn<Context> = async ({emit}, args, context, _) => {
const decoded = argsType.decode(args);
if (isLeft(decoded)) {
const parseErrors = PathReporter.report(decoded);
const res: ErrorBody = {
identifier: 'bad_arguments',
message: JSON.stringify(parseErrors, null, 2),
};
emit('error', res);
return;
}
/*
Type safe RPC calls
*/
export type RpcFn<ArgsT extends t.Mixed, Context> =
(args: ArgsT, context: Context) => unknown | Promise<unknown>;

export const _rpc =
<Context, ArgsT extends t.Mixed>(
argsType: ArgsT,
cb: RpcFn<t.TypeOf<ArgsT>, Context>
) : ChannelSpec<Context, ArgsT, Event> =>
{
const onSubscribe = _channel<Context, ReturnType<typeof cb>, ArgsT>(argsType, async ({emit}, args, context) => {
try {
return await channel({emit}, <t.TypeOf<ArgsT>>decoded.right, context);
} catch (err: any) {
const response = await cb(args, context);
emit('ok', response);
} catch (err: unknown) {
if (err instanceof AliasError) {
const res: ErrorBody = {
identifier: err.identifier,
message: err.message,
}
emit('error', res);
} else {
const res: ErrorBody = {
identifier: 'internal_server_error',
message: err.toString(),
}
emit('error', res);
throw err;
}
}
};
return onSubscribe;
}

/*
Type safe RPC calls
*/
export type OnRpcFn<ArgsT extends t.Mixed, Context> =
(args: ArgsT, context: Context) => unknown | Promise<unknown>;

export const fromRpc =
<ArgsT extends t.Mixed, Context>
(argsType: ArgsT, onRpc: OnRpcFn<t.TypeOf<ArgsT>, Context>)
: ConnectFn<Context> =>
{
const onSubscribe = fromChannel<ArgsT, Context>(argsType, async ({emit}, args, context) => {
const response = await onRpc(args, context);
emit('ok', response);
});
return onSubscribe;
}
Expand Down

0 comments on commit 1aab629

Please sign in to comment.