Skip to content

Commit

Permalink
Stream manager support
Browse files Browse the repository at this point in the history
  • Loading branch information
0xspeedybird committed Dec 13, 2024
1 parent efc0d1f commit 81f65c2
Show file tree
Hide file tree
Showing 26 changed files with 1,466 additions and 115 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ yarn-error.log*
*.pem

# Sentry Config File
.env.sentry-build-plugin
.env.sentry-build-plugin

# IDE ignores
.idea
.vscode
9 changes: 8 additions & 1 deletion apps/app/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ BETTERSTACK_API_KEY=""
BETTERSTACK_URL=""
FLAGS_SECRET=""
ARCJET_KEY=""
SUPABASE_URL=""
SUPABASE_ANON_KEY=""

# Client
NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY=""
Expand All @@ -24,4 +26,9 @@ NEXT_PUBLIC_POSTHOG_HOST=""
NEXT_PUBLIC_APP_URL="http://localhost:3000"
NEXT_PUBLIC_WEB_URL="http://localhost:3001"
NEXT_PUBLIC_DOCS_URL="http://localhost:3004"
NEXT_PUBLIC_VERCEL_PROJECT_PRODUCTION_URL="http://localhost:3000"
NEXT_PUBLIC_VERCEL_PROJECT_PRODUCTION_URL="http://localhost:3000"
NEXT_PUBLIC_AI_WEBRTC_BASE_URL="https://mediamtx.server/aiWebrtc/"
NEXT_PUBLIC_AI_WEBRTC_WHIP_PATH="%STREAM_KEY%/whip"
NEXT_PUBLIC_RTMP_URL="rtmp://rtmp.ingest.server/live/%STREAM_KEY%"
NEXT_PUBLIC_LIVEPEER_STUDIO_API_KEY=""
NEXT_PUBLIC_STREAM_STATUS_ENDPOINT_URL="https://mygateway.nope/stream-status"
36 changes: 36 additions & 0 deletions apps/app/app/api/streams/[id]/status/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import type { NextRequest } from 'next/server';
import { NextResponse } from "next/server";

const ERROR_MESSAGES = {
UNAUTHORIZED: "Authentication required",
INVALID_RESPONSE: "Invalid stream status response from Gateway",
INTERNAL_ERROR: "An unexpected error occurred",
} as const;

export async function GET(request: NextRequest,
{ params }: { params: Promise<{ id: string }> }
) {
const streamId = (await params).id;
const userId = request.headers.get("x-user-id");
if (!userId) {
return createErrorResponse(401, ERROR_MESSAGES.UNAUTHORIZED);
}

try {
const response = await fetch(`${process.env.NEXT_PUBLIC_STREAM_STATUS_ENDPOINT_URL}/${streamId}`);
if (!response.ok) {
return createErrorResponse(200, ERROR_MESSAGES.INVALID_RESPONSE + ` - [${response.status}] ${response.statusText}`);
}
const data = await response.json();
return NextResponse.json({ success: true, error: null, data: data }, { status: 200 });
} catch (error) {
return createErrorResponse(500, ERROR_MESSAGES.INTERNAL_ERROR + " - " + error);
}
}

function createErrorResponse(status: number, message: unknown) {
return NextResponse.json({ success: false, error: message }, { status });
}



95 changes: 69 additions & 26 deletions apps/app/app/api/streams/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,24 @@
import { createServerClient } from "@repo/supabase";
import { z } from "zod";
import { Livepeer } from "livepeer";
import { Stream } from "stream";
import crypto from "crypto";
import { newId } from "@/lib/generate-id";
import { getRtmpUrl } from "@/lib/url-helpers";

const streamSchema = z.object({
pipeline_id: z.string(),
id: z.string().optional(),
name: z.string(),
author: z.string(),
pipeline_id: z.string().optional(),
pipeline: z.object({
id: z.string()
}).optional(),
pipeline_params: z.record(z.any()).optional().default({}),
output_playback_id: z.string().optional(),
output_stream_url: z.string().optional(),
stream_key: z.string().optional()
}).refine(data => data.pipeline_id || data.pipeline, {
message: "Either pipeline_id or a nested pipeline object with an id must be provided",
path: ["pipeline_id", "pipeline.id"]
});

export async function createStream(body: any, userId: string) {
Expand All @@ -25,39 +36,71 @@ export async function createStream(body: any, userId: string) {
throw new z.ZodError(validationResult.error.errors);
}

const streamId = newId("stream");

const livepeerStream = await createLivepeerStream(streamId);
if (!validationResult) {
return { error: "No stream provided", data: null };
}

const streamData = {
...validationResult.data,
...validationResult.data
};

const isUpdate = !!streamData?.id;
const streamId = streamData?.id || newId("stream");

//make sure we can connect to livepeer before we try to create a stream
let livepeerStream;
if (!isUpdate) {
const result = await createLivepeerStream(streamData?.name);
if (result.error) {
console.error("Error creating livepeer stream. Perhaps the Livepeer Studio API Key is not configured?", result.error);
}else{
livepeerStream = result.stream;
}
}

// upsert the `streams` table
const streamKey = streamData?.stream_key || newId("stream_key");
const streamPayload = {
id: streamId,
stream_key: newId("stream_key"),
pipeline_params: validationResult.data.pipeline_params,
output_playback_id: livepeerStream?.playbackId,
output_stream_url: `rtmp://rtmp.livepeer.monster/live/${livepeerStream?.streamKey}`,
name: streamData.name,
output_playback_id: streamData.output_playback_id || livepeerStream?.playbackId || null,
output_stream_url: streamData.output_stream_url??(livepeerStream?.streamKey ? getRtmpUrl(livepeerStream.streamKey) : null),
stream_key: streamKey,
pipeline_params: streamData.pipeline_params,
pipeline_id: streamData.pipeline_id || streamData.pipeline?.id,
author: streamData.author,
created_at: new Date(),
};

console.log("Stream data:", streamData); // Debug log
const { data, error } = await supabase
.from("streams")
.insert(streamData)
const { data: upsertedStream, error: streamError } = await supabase
.from('streams')
.upsert([streamPayload])
.select()
.single();

if (error) throw new Error(error.message);
return data;
if (streamError) {
console.error("Error upserting stream:", streamError);
return { data: null, error: streamError?.message };
}

return { data: upsertedStream, error: null };
}

const createLivepeerStream = async (name: string) => {
const livepeer = new Livepeer({
serverURL: "https://livepeer.monster/api",
apiKey: process.env.NEXT_PUBLIC_LIVEPEER_STUDIO_API_KEY,
});
export const createLivepeerStream = async (name: string) => {
try{

const livepeer = new Livepeer({
serverURL: "https://livepeer.monster/api",
apiKey: process.env.NEXT_PUBLIC_LIVEPEER_STUDIO_API_KEY,
});

const { stream } = await livepeer.stream.create({
name: name,
});
const { stream, error } = await livepeer.stream.create({
name: name,
});

return stream;
return {stream, error};
}catch(e: any){
console.error("Error creating livepeer stream:", e);
return {stream: null, error: e.message};
}
};
33 changes: 33 additions & 0 deletions apps/app/app/api/streams/delete.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"use server";

import { createServerClient } from "@repo/supabase";
import { Livepeer } from "livepeer";

export async function deleteStream(streamId: string) {
const supabase = await createServerClient();
const { data, error } = await supabase
.from("streams")
.delete()
.eq('id', streamId);
if(!error){
deleteLivepeerStream(streamId);
}
return { data, error: error?.message };
}

export const deleteLivepeerStream = async (name: string) => {
try{

const livepeer = new Livepeer({
serverURL: "https://livepeer.monster/api",
apiKey: process.env.NEXT_PUBLIC_LIVEPEER_STUDIO_API_KEY,
});

const { error } = await livepeer.stream.delete(name);

return { error };
}catch(e: any){
console.error("Error deleting livepeer stream:", e);
return {stream: null, error: e.message};
}
};
13 changes: 13 additions & 0 deletions apps/app/app/api/streams/get-all.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"use server";

import { createServerClient } from "@repo/supabase";

export async function getAllStreams(userId: string) {
const supabase = await createServerClient();

const { data, error } = await supabase.from("streams").select("*").eq("user_id", userId);

if (error) throw new Error(error.message);

return data;
}
77 changes: 77 additions & 0 deletions apps/app/app/api/streams/get.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"use server";

import { createServerClient } from "@repo/supabase";

export async function getStream(streamId: string) {
const supabase = await createServerClient();
const { data, error } = await supabase
.from("streams")
.select(`
id,
name,
stream_key,
output_stream_url,
pipeline_params,
created_at,
pipeline_id,
output_playback_id,
author,
pipelines!inner (
id,
name,
config
)
`)
.eq('id', streamId)
.single();
return { data, error: error?.message };
}


export async function getStreams(userId: string, page: number = 1, limit: number = 10) {
const supabase = await createServerClient();
const offset = (page - 1) * limit;

const { data, error } = await supabase
.from("streams")
.select(`
id,
name,
stream_key,
output_stream_url,
pipeline_params,
created_at,
pipeline_id,
output_playback_id,
pipelines!inner (
id,
name
)
`)
.eq('author', userId)
.order('created_at', { ascending: false })
.range(offset, offset + limit - 1);

if (error) {
console.error("Error fetching Streams:", error);
throw new Error("Could not fetch Streams");
}

const totalCountQuery = await supabase
.from("streams")
.select('*', { count: 'exact', head: true })
.eq('author', userId);

if (totalCountQuery.error) {
console.error("Error fetching total count:", totalCountQuery.error);
throw new Error("Could not fetch total count");
}

const total = totalCountQuery.count || 0;
const totalPages = Math.ceil(total / limit);

return {
data,
totalPages,
};
}
39 changes: 38 additions & 1 deletion apps/app/app/api/streams/route.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { NextResponse } from "next/server";
import { createStream } from "./create";
import { getAllStreams } from "./get-all";
import { z } from "zod";
import { deleteStream } from "./delete";

const ERROR_MESSAGES = {
UNAUTHORIZED: "Authentication required",
INVALID_INPUT: "Invalid pipeline configuration",
INVALID_INPUT: "Invalid stream configuration",
INTERNAL_ERROR: "An unexpected error occurred",
} as const;

Expand Down Expand Up @@ -32,6 +34,41 @@ export async function POST(request: Request) {
}
}


export async function GET(request: Request) {
try {
const userId = request.headers.get("x-user-id");
if (!userId) {
return createErrorResponse(401, ERROR_MESSAGES.UNAUTHORIZED);
}

const streams = await getAllStreams(userId);

return NextResponse.json(streams, { status: 200 });
} catch (error) {
return createErrorResponse(500, ERROR_MESSAGES.INTERNAL_ERROR);
}
}

export async function DELETE(request: Request) {
try {
const userId = request.headers.get("x-user-id");
if (!userId) {
return createErrorResponse(401, ERROR_MESSAGES.UNAUTHORIZED);
}

const streamId = new URL(request.url).searchParams.get("id");
if (!streamId) {
return createErrorResponse(400, "Stream ID is required");
}

const stream = await deleteStream(streamId);
return NextResponse.json(stream, { status: 200 });
} catch (error) {
return createErrorResponse(500, ERROR_MESSAGES.INTERNAL_ERROR);
}
}

function createErrorResponse(status: number, message: unknown) {
return NextResponse.json({ success: false, error: message }, { status });
}
Loading

0 comments on commit 81f65c2

Please sign in to comment.