Skip to content

Commit

Permalink
add partykit connection type
Browse files Browse the repository at this point in the history
  • Loading branch information
Asvarox committed Jul 17, 2024
1 parent 22e6bad commit 7413cf9
Show file tree
Hide file tree
Showing 14 changed files with 272 additions and 28 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ VITE_APP_WEBSOCKET_URL="wss://${VITE_APP_BACKEND_HOST}"
VITE_APP_SENTRY_TUNNEL="https://${VITE_APP_BACKEND_HOST}/sentry"
VITE_APP_POSTHOG_PROXY="https://${VITE_APP_BACKEND_HOST}/posthog"
VITE_APP_SONG_CONVERT_PROXY="https://${VITE_APP_BACKEND_HOST}/proxy"
VITE_APP_PARTYKIT_URL="wss://allkaraoke-partykit.asvarox.partykit.dev"

# Changing the config name to mts causes this https://github.com/vitejs/vite/issues/13267
VITE_CJS_IGNORE_WARNING=true
2 changes: 1 addition & 1 deletion playwright.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const config: PlaywrightTestConfig = {
/* Retry on CI only */
retries: process.env.CI ? 0 : 0, // Retry with script that will rerun failed again on CI at the end
/* Opt out of parallel tests on CI. */
workers: process.env.CI ? 1 : undefined,
workers: process.env.CI ? 1 : 3,
/* Reporter to use. See https://playwright.dev/docs/test-reporters */
reporter: [
['json', { outputFile: 'test-results.json' }],
Expand Down
22 changes: 11 additions & 11 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/modules/RemoteMic/ConnectionStatus.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export default function ConnectionStatus() {
<Container>
<Row>
<strong>Remote mic server</strong>: {isOnline ? 'Online' : 'Offline'}
{isOnline && server.getGameCode().startsWith('w') && <> ({latency}ms)</>}
{isOnline && latency && <> ({latency}ms)</>}
</Row>
{isOnline && (
<>
Expand Down
7 changes: 6 additions & 1 deletion src/modules/RemoteMic/Network/Client/NetworkClient.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { throttle } from 'lodash-es';
import SimplifiedMic from 'modules/GameEngine/Input/SimplifiedMic';
import events from 'modules/GameEvents/GameEvents';
import { PartyKitClientTransport } from 'modules/RemoteMic/Network/Client/Transport/PartyKitClient';
import { PeerJSClientTransport } from 'modules/RemoteMic/Network/Client/Transport/PeerJSClient';
import { WebSocketClientTransport } from 'modules/RemoteMic/Network/Client/Transport/WebSocketClient';
import { ClientTransport } from 'modules/RemoteMic/Network/Client/Transport/interface';
Expand Down Expand Up @@ -60,7 +61,11 @@ export class NetworkClient {
public connect = (roomId: string, name: string, silent: boolean) => {
const lcRoomId = roomId.toLowerCase();
if (!this.transport) {
this.transport = lcRoomId.startsWith('w') ? new WebSocketClientTransport() : new PeerJSClientTransport();
this.transport = lcRoomId.startsWith('w')
? new WebSocketClientTransport()
: lcRoomId.startsWith('k')
? new PartyKitClientTransport()
: new PeerJSClientTransport();
}
if (this.clientId === null) this.setClientId(v4());
this.roomId = lcRoomId;
Expand Down
65 changes: 65 additions & 0 deletions src/modules/RemoteMic/Network/Client/Transport/PartyKitClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { transportCloseReason, transportErrorReason } from 'modules/RemoteMic/Network/Client/NetworkClient';
import { ClientTransport } from 'modules/RemoteMic/Network/Client/Transport/interface';
import { NetworkMessages } from 'modules/RemoteMic/Network/messages';
import { ForwardedMessage, PARTYKIT_SERVER } from 'modules/RemoteMic/Network/Server/Transport/PartyKitServer';
import { pack, unpack } from 'modules/RemoteMic/Network/utils';
import Listener from 'modules/utils/Listener';

export class PartyKitClientTransport extends Listener<[NetworkMessages]> implements ClientTransport {
private connection: WebSocket | null = null;
private roomId: string | null = null;

public connect(
clientId: string,
roomId: string,
onConnect: () => void,
onClose: (reason: transportCloseReason, originalEvent: any) => void,
onError: (error: transportErrorReason, originalEvent: any) => void,
): void {
this.roomId = roomId;
// this.connection = new PartySocket({ host: PARTYKIT_SERVER, room: roomId });

this.connection = new WebSocket(`${PARTYKIT_SERVER}/party/${roomId}`);
this.connection.binaryType = 'arraybuffer';

this.connection.onopen = () => {
this.connection?.send(pack({ t: 'register-player', id: clientId, roomId: roomId }));
};

this.connection.onmessage = (message) => {
const data = unpack<ForwardedMessage>(message.data);
if (data.t === 'forward') {
this.onUpdate(data.payload);
} else if (data.t === 'connected') {
onConnect();
}
};

this.connection.onclose = (event) => {
let reason = 'unknown';
try {
reason = JSON.parse(event.reason)?.error;
} catch (e) {
console.info('could not parse close reason', event);
}
this.clearAllListeners();
onClose(reason, event);
};

this.connection.onerror = (event) => {
this.clearAllListeners();
onError('error', event);
};
}

public sendEvent(event: NetworkMessages) {
this.connection?.send(pack({ t: 'forward', recipients: [this.roomId], payload: event }));
}

// readyState >=2 means that the connection is closing or closed
public isConnected = () => (this.connection?.readyState ?? Infinity) < 2;

public close = () => {
this.connection?.close();
};
}
18 changes: 14 additions & 4 deletions src/modules/RemoteMic/Network/Server/NetworkServer.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import events from 'modules/GameEvents/GameEvents';
import { PartyKitServerTransport } from 'modules/RemoteMic/Network/Server/Transport/PartyKitServer';
import { PeerJSServerTransport } from 'modules/RemoteMic/Network/Server/Transport/PeerJSServer';
import { WebSocketServerTransport } from 'modules/RemoteMic/Network/Server/Transport/WebSocketServer';
import { ServerTransport } from 'modules/RemoteMic/Network/Server/Transport/interface';
import { NetworkMessages } from 'modules/RemoteMic/Network/messages';
import RemoteMicManager from 'modules/RemoteMic/RemoteMicManager';
import SongDao from 'modules/Songs/SongsService';
import storage from 'modules/utils/storage';
import { InputLagSetting, UseWebsocketsSettings } from 'routes/Settings/SettingsState';
import { InputLagSetting, RemoteMicConnectionTypeSetting } from 'routes/Settings/SettingsState';

export const GAME_CODE_KEY = 'room_id_key';
export const GAME_CODE_LENGTH = 5;
Expand Down Expand Up @@ -36,7 +37,13 @@ export class NetworkServer {

public start = () => {
if (!this.transport) {
this.transport = UseWebsocketsSettings.get() ? new WebSocketServerTransport() : new PeerJSServerTransport();
const type = RemoteMicConnectionTypeSetting.get();
this.transport =
type === 'WebSockets'
? new WebSocketServerTransport()
: type === 'PartyKit'
? new PartyKitServerTransport()
: new PeerJSServerTransport();
}
if (this.started) return;
this.started = true;
Expand All @@ -52,7 +59,7 @@ export class NetworkServer {
if (type === 'register') {
RemoteMicManager.addRemoteMic(event.id, event.name, sender, event.silent, event.lag);
} else if (type === 'unregister') {
RemoteMicManager.removeRemoteMic(sender.peer, true);
RemoteMicManager.removeRemoteMic(sender.peer);
} else if (type === 'subscribe-event') {
RemoteMicManager.addSubscription(sender.peer, event.channel);
} else if (type === 'unsubscribe-event') {
Expand Down Expand Up @@ -120,5 +127,8 @@ export class NetworkServer {

public getLatency = () => this.transport?.getCurrentPing() ?? 0;

public getGameCode = (): string => (UseWebsocketsSettings.get() ? 'w' : 'p') + this.gameCode;
public getGameCode = (): string => {
const type = RemoteMicConnectionTypeSetting.get();
return (type === 'WebSockets' ? 'w' : type === 'PartyKit' ? 'k' : 'p') + this.gameCode;
};
}
139 changes: 139 additions & 0 deletions src/modules/RemoteMic/Network/Server/Transport/PartyKitServer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import {
SenderInterface,
ServerTransport,
transportCloseReason,
} from 'modules/RemoteMic/Network/Server/Transport/interface';
import { NetworkMessages } from 'modules/RemoteMic/Network/messages';
import { getPingTime, pack, unpack } from 'modules/RemoteMic/Network/utils';
import Listener from 'modules/utils/Listener';

export interface ForwardedMessage {
t: 'forward';
sender: string;
payload: NetworkMessages;
}

interface WebsocketConnectedMessage {
t: 'connected';
}

interface WebsocketPongMessage {
t: 'pong';
}

export type WebsocketMessage = ForwardedMessage | WebsocketConnectedMessage | WebsocketPongMessage;

export const PARTYKIT_SERVER = import.meta.env.VITE_APP_PARTYKIT_URL;

export class PartyKitServerTransport extends Listener<[NetworkMessages, SenderInterface]> implements ServerTransport {
public readonly name = 'PartyKit';
private connection: WebSocket | null = null;

public connect(
roomId: string,
onConnect: () => void,
onClose: (reason: transportCloseReason, originalEvent: any) => void,
) {
this.connection = new WebSocket(`${PARTYKIT_SERVER}/party/${roomId}`);
this.connection.binaryType = 'arraybuffer';
this.connection.onopen = () => {
this.connection?.send(pack({ t: 'register-room', id: roomId }));
onConnect();
this.ping();

this.connection?.addEventListener('message', (message) => {
const payload: WebsocketMessage = unpack(message.data);

if (payload.t === 'forward') {
if (!['ping', 'pong', 'freq'].includes(payload?.payload?.t)) console.log('received', payload);
const { sender, payload: data } = payload;
const conn = new SenderWrapper(sender, this.connection!);

this.onUpdate(data, conn);
} else if (payload.t === 'pong') {
this.onPong();
} else {
console.warn('Unknown message type', payload);
}
});
};

this.connection.onclose = (event) => {
onClose(event.reason, event);
};
}

public disconnect = () => {
this.connection?.close();
};

// todo create a a util to share with Network Client
private latency = 0;
private pingStart = getPingTime();
public pinging = false;
private pingTimeout: ReturnType<typeof setTimeout> | null = null;

private ping = () => {
this.pinging = true;
this.pingStart = getPingTime();

this.connection?.send(pack({ t: 'ping' }));
};
private onPong = () => {
if (!this.pinging) return;
this.latency = getPingTime() - this.pingStart;
this.pinging = false;

if (this.pingTimeout) clearTimeout(this.pingTimeout);
this.pingTimeout = setTimeout(this.ping, 5_000);
};

public getCurrentPing = () => {
return this.pinging ? Math.max(this.latency, getPingTime() - this.pingStart) : this.latency;
};
}

type callback = (data: any) => void;

class SenderWrapper implements SenderInterface {
private currentPing = 0;
constructor(
public peer: string,
private socket: WebSocket,
) {}

public send = (payload: NetworkMessages) => {
const data = { t: 'forward', recipients: [this.peer], payload };
if (!['ping', 'pong', 'freq'].includes(payload?.t)) console.log('sending', this.peer, payload);
this.socket.send(pack(data));
};

private callbacksMap: Map<callback, callback> = new Map();

public on = (event: string, callback: (data: any) => void) => {
if (event === 'data') {
this.callbacksMap.set(callback, (message) => {
const data: WebsocketMessage = unpack(message.data);
if (data.t === 'forward') {
const { sender, payload } = data;
if (sender === this.peer) {
callback(payload);
}
}
});
this.socket.addEventListener('message', this.callbacksMap.get(callback)!);
}
};

public off = (event: string, callback: (data: any) => void) => {
if (event === 'data') {
const actualCallback = this.callbacksMap.get(callback);
this.socket.removeEventListener('message', actualCallback!);
this.callbacksMap.delete(callback);
}
};

public close = () => {
this.socket.close();
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export type transportCloseReason = string;
export type transportErrorReason = string;

export interface ServerTransport extends Listener<[NetworkMessages, SenderInterface]> {
name: 'WebSockets' | 'PeerJS';
name: 'WebSockets' | 'PeerJS' | 'PartyKit';

connect(
roomId: string,
Expand Down
Loading

0 comments on commit 7413cf9

Please sign in to comment.