Skip to content

Commit

Permalink
Update stream status parsing and props
Browse files Browse the repository at this point in the history
  • Loading branch information
0xspeedybird committed Dec 14, 2024
1 parent fb7acf8 commit 735bb85
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 34 deletions.
8 changes: 5 additions & 3 deletions apps/app/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ ARCJET_KEY=""
SUPABASE_URL=""
SUPABASE_ANON_KEY=""
MIXPANEL_PROJECT_TOKEN=""
# Gateway stream status URL - this will be appended with ${streamId}/status
STREAM_STATUS_ENDPOINT_URL="https://ai.livepeer.monster/live/video-to-video"
STREAM_STATUS_ENDPOINT_USER="ai-gateway-livepeer"
STREAM_STATUS_ENDPOINT_PASSWORD=""

# Client
NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY=""
Expand All @@ -31,6 +35,4 @@ NEXT_PUBLIC_VERCEL_PROJECT_PRODUCTION_URL="http://localhost:3000"
NEXT_PUBLIC_AI_WEBRTC_BASE_URL="https://mediamtx.server/aiWebrtc/"
NEXT_PUBLIC_AI_WEBRTC_WHIP_PATH="%STREAM_KEY%/whip"
NEXT_PUBLIC_RTMP_URL="rtmp://rtmp.ingest.server/live/%STREAM_KEY%"
NEXT_PUBLIC_LIVEPEER_STUDIO_API_KEY=""
# Gateway stream status URL - this will be appended with ${streamId}/status
NEXT_PUBLIC_STREAM_STATUS_ENDPOINT_URL="https://ai.livepeer.monster/live/video-to-video"
NEXT_PUBLIC_LIVEPEER_STUDIO_API_KEY=""
36 changes: 24 additions & 12 deletions apps/app/app/api/streams/[id]/status/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,37 @@ const ERROR_MESSAGES = {
UNAUTHORIZED: "Authentication required",
INVALID_RESPONSE: "Invalid stream status response from Gateway",
INTERNAL_ERROR: "An unexpected error occurred",
} as const;
} as const;

export async function GET(request: NextRequest,
export async function GET(
request: NextRequest,
{ params }: { params: Promise<{ id: string }> }
) {
) {
const streamId = (await params).id;
const userId = request.headers.get("x-user-id");
if (!userId) {
return createErrorResponse(401, ERROR_MESSAGES.UNAUTHORIZED);
const username = process.env.STREAM_STATUS_ENDPOINT_USER;
const password = process.env.STREAM_STATUS_ENDPOINT_PASSWORD;

if (!username || !password) {
return createErrorResponse(200, ERROR_MESSAGES.INTERNAL_ERROR + " - Missing auth credentials.");
}

const authHeader = `Basic ${Buffer.from(`${username}:${password}`).toString("base64")}`;

try {
const response = await fetch(`${process.env.NEXT_PUBLIC_STREAM_STATUS_ENDPOINT_URL}/${streamId}/status`);
const response = await fetch(
`${process.env.STREAM_STATUS_ENDPOINT_URL}/${streamId}/status`,
{
headers: {
Authorization: authHeader,
},
}
);

if (!response.ok) {
return createErrorResponse(200, ERROR_MESSAGES.INVALID_RESPONSE + ` - [${response.status}] ${response.statusText}`);
const responseMsg = await response.text();
return createErrorResponse(200, ERROR_MESSAGES.INVALID_RESPONSE + ` - [${response.status}] ${response.statusText} - ${responseMsg?.replace(/[\n\r]+/g, ' ')}`);
}

const data = await response.json();
return NextResponse.json({ success: true, error: null, data: data }, { status: 200 });
} catch (error) {
Expand All @@ -29,8 +44,5 @@ export async function GET(request: NextRequest,
}

function createErrorResponse(status: number, message: unknown) {
return NextResponse.json({ success: false, error: message }, { status });
return NextResponse.json({ success: false, error: message }, { status });
}



13 changes: 6 additions & 7 deletions apps/app/components/stream/stream-status-indicator.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ const StreamStatusIndicator = ({ streamId }: { streamId: string }) => {

const getStatusColor = () => {
const color = (() => {
switch (status?.PRIMARY_STATE) {
case 'ACTIVE':
switch (status?.state) {
case 'ONLINE':
return 'green';
case 'PROCESSING':
return 'yellow';
case 'DEGRADED':
return 'red';
case 'OFFLINE':
return 'gray';
case 'DEGRADED_INPUT':
case 'DEGRADED_OUTPUT':
return 'red';
default:
return 'gray';
}
Expand All @@ -28,7 +27,7 @@ const StreamStatusIndicator = ({ streamId }: { streamId: string }) => {
return (
<div>
<Badge className={`${error?'bg-gray-500/90':getStatusColor()} text-white font-medium text-xs`}>
{loading? <LoaderCircleIcon className="animate-spin"/> : error ? 'UNKNOWN' : status?.PRIMARY_STATE}
{loading? <LoaderCircleIcon className="animate-spin"/> : error || !status?.state ? 'UNKNOWN' : status?.state}
</Badge>
</div>
);
Expand Down
86 changes: 74 additions & 12 deletions apps/app/hooks/useStreamStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@ import { usePrivy } from '@privy-io/react-auth';
import { useEffect, useState, useRef } from 'react';

type StreamStatus = {
PRIMARY_STATE: 'OFFLINE' | 'ACTIVE' | 'PROCESSING' | 'DEGRADED';
lastUpdated: number;
healthMetrics: {
inputStatus: { state: 'OFFLINE' | 'ACTIVE'; lastUpdated: number };
outputStatus: { state: 'OFFLINE' | 'ACTIVE'; platform: string; fps: number; latency: number };
state: 'OFFLINE' | 'ONLINE' | 'DEGRADED_INPUT' | 'DEGRADED_OUTPUT';
last_updated: number;
input_status: {
last_frame_time: number;
fps: number;
};
output_status: {
last_frame_time: number;
fps: number;
last_restart_time: number | null;
last_error_time: number | null;
};
};

// Define the global base polling interval in milliseconds
const BASE_POLLING_INTERVAL = 5000;
const MAX_BACKOFF_INTERVAL = 120000; // Maximum backoff interval (e.g., 2 minutes)

Expand All @@ -22,6 +27,63 @@ export const useStreamStatus = (streamId: string) => {
const failureCountRef = useRef(0);
const intervalIdRef = useRef<NodeJS.Timeout | null>(null);

const determineStreamStatus = (apiResponse: any): StreamStatus => {
const now = Date.now() / 1000;

const {
input_fps,
output_fps,
last_input_time,
last_output_time,
last_restart_time,
last_error_time,
} = apiResponse;

const inputHealthyThreshold = 2; // seconds
const inputOfflineThreshold = 60; // seconds
const outputHealthyThreshold = 5; // seconds
const outputOfflineThreshold = 60; // seconds
const fpsMinThreshold = Math.min(10, 0.8 * input_fps);

const inputIsHealthy = now - last_input_time < inputHealthyThreshold;
const inputIsOffline = now - last_input_time > inputOfflineThreshold;

const outputIsHealthy = now - last_output_time < outputHealthyThreshold;
const outputIsOffline = now - last_output_time > outputOfflineThreshold;

const inputDegraded = input_fps < 15;
const outputDegraded = output_fps < fpsMinThreshold;

let state: StreamStatus["state"] = "OFFLINE";

if (inputIsHealthy && outputIsHealthy) {
if (!inputDegraded && !outputDegraded) {
state = "ONLINE";
} else if (inputDegraded) {
state = "DEGRADED_INPUT";
} else if (outputDegraded) {
state = "DEGRADED_OUTPUT";
}
} else if (inputIsOffline || outputIsOffline) {
state = "OFFLINE";
}

return {
state,
last_updated: now,
input_status: {
last_frame_time: last_input_time,
fps: input_fps,
},
output_status: {
last_frame_time: last_output_time,
fps: output_fps,
last_restart_time: last_restart_time || null,
last_error_time: last_error_time || null,
},
};
};

useEffect(() => {
if (!ready || !user) return;

Expand All @@ -38,11 +100,13 @@ export const useStreamStatus = (streamId: string) => {
}

const { success, error, data } = await res.json();
if (!success) {
triggerError(error);
if (!success || !data) {
triggerError(error??"No stream data returned from api");
return;
}
setStatus(data);
// example output from api -> {"input_fps":23.992516753574126,"last_error":null,"last_error_time":null,"last_input_time":1734119451.551498,"last_output_time":1734119451.5212831,"last_params":{},"last_params_hash":"-2083727633593109426","last_params_update_time":null,"last_restart_logs":null,"last_restart_time":null,"output_fps":13.07218761733542,"pipeline":"streamdiffusion","pipeline_id":"pip_p4XsqEJk2ZqqWLuw","request_id":"dcfa489c","restart_count":0,"start_time":1734119361.5293992,"stream_id":"str_jzpgwxehWzYSUBAi","type":"status"}
const transformedStatus = determineStreamStatus(data);
setStatus(transformedStatus);
setError(null);
failureCountRef.current = 0;
resetPollingInterval();
Expand All @@ -57,7 +121,7 @@ export const useStreamStatus = (streamId: string) => {
setError(errorMsg);
failureCountRef.current += 1;
adjustPollingInterval();
}
};

const adjustPollingInterval = () => {
const nextInterval = Math.min(
Expand All @@ -77,11 +141,9 @@ export const useStreamStatus = (streamId: string) => {
intervalIdRef.current = setInterval(fetchStatus, BASE_POLLING_INTERVAL);
};

// Initial fetch and start polling
fetchStatus();
intervalIdRef.current = setInterval(fetchStatus, BASE_POLLING_INTERVAL);

// Cleanup on component unmount
return () => {
if (intervalIdRef.current) {
clearInterval(intervalIdRef.current);
Expand Down

0 comments on commit 735bb85

Please sign in to comment.