Skip to content

Commit

Permalink
💥 app: two-stage pipeline with proper stage management
Browse files Browse the repository at this point in the history
- if you change either url or insight_request or click the button,
everything should work correctly without bizarre reloads and stuff

- I spent on this much more time that expected, because LLMs know streamlit quite mediocre and can't read docs
even when I copy them in chat, so... I had to read the docs manually 🤣 and implement stages

- proper exceptions if LLM api fails to work
  • Loading branch information
lainisourgod committed Jun 27, 2024
1 parent 0058204 commit f4e09d6
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 63 deletions.
5 changes: 3 additions & 2 deletions src/eightify/api/youtube.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ def get_video_transcript(video_id: str) -> Optional[VideoTranscript]:

try:
transcript = YouTubeTranscriptApi.get_transcript(video_id)
transcript_text = " ".join([entry["text"] for entry in transcript])
return VideoTranscript(text=transcript_text)
points = [entry["text"] for entry in transcript]
transcript_text = " ".join(points)
return VideoTranscript(text=transcript_text, points=points)

except Exception as e:
logger.error(f"Error fetching transcript: {e}")
Expand Down
139 changes: 81 additions & 58 deletions src/eightify/app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import html
import re

import requests
Expand All @@ -11,89 +10,113 @@
APP_HOST = "http://localhost:8000"


def main():
st.set_page_config(page_title="Eightify", page_icon="🍓")

url = st.text_input("Enter YouTube video URL:")

if url is not None:
video_id = extract_video_id(url)
if video_id:
try:
process_video(video_id)
except Exception as e:
st.error(f"An error occurred: {str(e)}")
else:
st.error("Invalid YouTube URL. Please enter a valid URL.")

display_sidebar_info()


def process_video(video_id):
# Fetch video details
def display_video_details(video_id):
video_details = get_video_details(video_id)

if not video_details:
st.error("No video details found.")
st.stop()

# Display video title and embed
st.subheader(video_details.title)
st.video(f"https://www.youtube.com/embed/{video_id}")

# Fetch transcript
transcript = get_video_transcript(video_id)

if not transcript:
st.error("No transcript found.")
st.stop()
def summarize_transcript(video_id: str) -> str:
summary_response = requests.post(
f"{APP_HOST}/summarize",
json={"video_id": video_id},
).json()
return summary_response.get("summary")

# Generate summary
with st.spinner("Generating summary..."):
summary_response = requests.post(
f"{APP_HOST}/summarize",
json={"video_id": video_id},
).json()

# Display summary
st.subheader("Summary")
st.write(summary_response["summary"])
def analyze_comments(video_id: str, insight_request: str) -> str:
response = requests.post(
f"{APP_HOST}/analyze_comments",
json={"video_id": video_id, "insight_request": insight_request},
).json()
return response.get("comment_analysis")

# Fetch and analyze comments
# TODO: insight request
with st.spinner("Analyzing comments..."):
comments = get_video_comments(video_id)
comment_analysis_response = requests.post(
f"{APP_HOST}/analyze_comments",
json={"video_id": video_id},
).json()

# Display comment analysis
display_comment_analysis(comments, comment_analysis_response["comment_analysis"])


def display_comment_analysis(comments: list[VideoComment], comment_analysis: str):
st.subheader("Comments")
def display_comments(comments: list[VideoComment]):
with st.expander("Show Comments"):
for comment in comments:
# Parse HTML-like tags to Markdown
parsed_comment = comment.text
parsed_comment = parsed_comment.replace("<br>", "\n")
parsed_comment = re.sub(r"<i>(.*?)</i>", r"*\1*", parsed_comment)
parsed_comment = re.sub(r"<i>(.*?)</i>", r"*\1*", comment.text)
parsed_comment = re.sub(r"<b>(.*?)</b>", r"**\1**", parsed_comment)
parsed_comment = re.sub(r"<strike>(.*?)</strike>", r"~~\1~~", parsed_comment)

parsed_comment = parsed_comment.replace("<br>", "\n")
st.write(parsed_comment)
st.write("---") # Add a separator between comments
st.write("---")


st.subheader("Comment Analysis")
st.write(comment_analysis)
def set_state(i):
st.session_state.stage = i
# st.session_state.step += 1
# st.write(f"{st.session_state.step}. State set to: {i}") # Debug statement


def display_sidebar_info():
st.sidebar.title("About")
st.sidebar.info("🍓 Hello! Eightify is a tool to quickly gain insights from YouTube videos. Relax and enjoy!")


def main():
st.set_page_config(page_title="Eightify", page_icon="🍓")
display_sidebar_info()

if "stage" not in st.session_state:
st.session_state.stage = 0
# Step is a debug variable
# st.session_state.step = 0

if st.session_state.stage == 0:
st.button("Start", on_click=set_state, args=[1])

if st.session_state.stage >= 1:
# Input for YouTube URL
youtube_url = st.text_input("Enter YouTube Video URL:", on_change=set_state, args=[2])

if st.session_state.stage >= 2:
video_id = extract_video_id(youtube_url)
if not video_id:
st.error("Invalid YouTube URL.")
st.stop()

video_details = get_video_details(video_id)
if not video_details:
st.error(f"Can't fetch video details for {video_id}.")
st.stop()

st.subheader(video_details.title)
st.video(f"https://www.youtube.com/embed/{video_id}")

# Get and summarize transcript
if not st.session_state.get("summary"):
with st.spinner("Summarizing video..."):
transcript = get_video_transcript(video_id).points
summary = summarize_transcript(video_id)
st.session_state.summary = summary
st.session_state.transcript = transcript

st.header("Summary")
st.write(st.session_state.summary)
with st.expander("Show Full Transcript"):
st.write(st.session_state.transcript)

insight_request = st.text_input("Enter insight to find in comments (optional):", on_change=set_state, args=[3])
st.button("Analyze Comments", on_click=set_state, args=[3])

if st.session_state.stage >= 3:
with st.spinner("Analyzing comments..."):
comments = get_video_comments(video_id)
comment_analysis = analyze_comments(video_id, insight_request)

st.header("Comment Analysis")
display_comments(comments)

st.write(comment_analysis)

st.button("Start Over", on_click=set_state, args=[0])


if __name__ == "__main__":
main()
10 changes: 7 additions & 3 deletions src/eightify/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

from eightify.api import openai, youtube
from eightify.api import llm, youtube

load_dotenv()

Expand Down Expand Up @@ -40,15 +40,19 @@ async def summarize_video(request: VideoRequest):
raise HTTPException(status_code=404, detail="Transcript not available")

# TODO: use async APIs
summary = openai.summarize_text(transcript.text, video_details.title, video_details.description)
summary = llm.summarize_text(transcript.text, video_details.title, video_details.description)
if summary is None:
raise HTTPException(status_code=500, detail="LLM api failed to generate a summary")

return SummarizeResponse(summary=summary)


@app.post("/analyze_comments", response_model=CommentAnalysisResponse)
async def analyze_video_comments(request: CommentAnalysisRequest):
comments = youtube.get_video_comments(request.video_id)
comment_analysis = openai.analyze_comments(comments, request.insight_request)
comment_analysis = llm.analyze_comments(comments, request.insight_request)
if comment_analysis is None:
raise HTTPException(status_code=500, detail="LLM api failed to generate a comment analysis")

return CommentAnalysisResponse(comment_analysis=comment_analysis)

Expand Down
1 change: 1 addition & 0 deletions src/eightify/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class VideoDetails(BaseModel):

class VideoTranscript(BaseModel):
text: str
points: list[str]


class VideoComment(BaseModel):
Expand Down

0 comments on commit f4e09d6

Please sign in to comment.