-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
301 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
ARCADE_API_KEY=... | ||
ARCADE_USER_ID=... | ||
OPENAI_API_KEY=... |
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
# main.py | ||
import logging | ||
from textwrap import dedent | ||
|
||
from arcadepy import Arcade | ||
from crewai import Agent, Crew, Task | ||
from dotenv import load_dotenv | ||
from langchain_openai import ChatOpenAI | ||
from tools.spotify_tools import SpotifyPlaylistTool | ||
|
||
load_dotenv() | ||
|
||
# Configure logging | ||
logging.basicConfig(level=logging.INFO) | ||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def authenticate_spotify(): | ||
""" | ||
Handles Spotify authentication using Arcade. | ||
Returns the access token if successful. | ||
""" | ||
client = Arcade() | ||
user_id = input("Enter your email: ") | ||
|
||
auth_response = client.auth.start( | ||
user_id=user_id, | ||
provider="spotify", | ||
scopes=["playlist-read-private", "playlist-read-collaborative"], | ||
) | ||
|
||
if auth_response.status != "completed": | ||
logger.info("Complete authorization in your browser:") | ||
logger.info(auth_response.authorization_url) | ||
auth_response = client.auth.wait_for_completion(auth_response) | ||
|
||
return auth_response.context.token | ||
|
||
|
||
def main_agent(llm, tools) -> Agent: | ||
"""Creates the main Agent for CrewAI.""" | ||
return Agent( | ||
role="Playlist Selector", | ||
backstory=dedent(""" | ||
You are a music expert specializing in playlist curation. Your role is to understand | ||
user preferences and select the most appropriate playlist from their Spotify library. | ||
"""), | ||
goal=dedent(""" | ||
Your goal is to analyze user requests and find the perfect playlist match using | ||
the available tools. | ||
"""), | ||
tools=tools, | ||
allow_delegation=False, | ||
verbose=True, | ||
llm=llm, | ||
) | ||
|
||
|
||
def create_task(agent): | ||
return Task( | ||
description=dedent(""" | ||
# Task | ||
You are a playlist selection expert tasked with finding the perfect playlist. | ||
# Guidelines | ||
Your responses should be: | ||
- Clear and specific about why you chose the playlist | ||
- Include the playlist name | ||
- Explain how it matches the user's request | ||
- Only select playlists from the list of available playlists | ||
# User Request | ||
Find the most suitable playlist based on this description: {user_request} | ||
"""), | ||
expected_output="A selected playlist name with explanation of why it was chosen", | ||
agent=agent, | ||
tools=agent.tools, | ||
) | ||
|
||
|
||
def main(): | ||
logger.info("Welcome to Spotify Playlist Selector!") | ||
|
||
# Authenticate and get the Spotify token | ||
access_token = authenticate_spotify() | ||
if not access_token: | ||
logger.error("Authentication failed") | ||
return | ||
|
||
# Get user input for playlist selection | ||
user_input = input("Describe the type of playlist you want: ") | ||
|
||
# Initialize the Spotify tool | ||
playlist_tool = SpotifyPlaylistTool(access_token=access_token) | ||
|
||
agent = main_agent(ChatOpenAI(model_name="gpt-4o"), tools=[playlist_tool]) | ||
task = create_task(agent) | ||
|
||
# Initialize crew | ||
crew = Crew(agents=[agent], tasks=[task], verbose=True) | ||
|
||
# Execute crew with user input | ||
crew.kickoff(inputs={"user_request": user_input}) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
58 changes: 58 additions & 0 deletions
58
examples/crewai/2. custom-auth-example/tools/spotify_tools.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
from typing import Any | ||
|
||
import requests | ||
from crewai.tools import BaseTool | ||
from pydantic import Field | ||
|
||
|
||
class SpotifyAPIError(Exception): | ||
"""Custom exception for Spotify API related errors.""" | ||
|
||
pass | ||
|
||
|
||
class SpotifyDataProcessingError(Exception): | ||
"""Custom exception for data processing related errors.""" | ||
|
||
pass | ||
|
||
|
||
class SpotifyPlaylistTool(BaseTool): | ||
"""Tool to fetch all playlists from Spotify.""" | ||
|
||
name: str = "spotify_playlist_tool" | ||
description: str = "Gets all available Spotify playlists" | ||
access_token: str = Field(..., description="Spotify API access token") | ||
timeout: int = Field(default=30, description="Request timeout in seconds") | ||
|
||
def _run(self) -> str: | ||
""" | ||
Fetches and returns all playlist names. | ||
Returns: | ||
str: List of all playlist names | ||
Raises: | ||
SpotifyAPIError: If there's an error communicating with Spotify API | ||
SpotifyDataProcessingError: If there's an error processing the response data | ||
""" | ||
try: | ||
# Fetch playlists from Spotify | ||
url = "https://api.spotify.com/v1/me/playlists" | ||
headers = {"Authorization": f"Bearer {self.access_token}"} | ||
response = requests.get(url, headers=headers, timeout=self.timeout) | ||
response.raise_for_status() | ||
|
||
# Extract playlist names | ||
playlists = response.json() | ||
playlist_names = [playlist["name"] for playlist in playlists["items"]] | ||
|
||
return "\n".join(playlist_names) | ||
|
||
except requests.exceptions.RequestException as e: | ||
raise SpotifyAPIError(f"Failed to fetch playlists: {e!s}") from e | ||
except (KeyError, IndexError) as e: | ||
raise SpotifyDataProcessingError(f"Error processing playlist data: {e!s}") from e | ||
|
||
def _arun(self, *args: Any, **kwargs: Any) -> Any: | ||
raise NotImplementedError("Async is not implemented for this tool.") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
import logging | ||
import os | ||
from textwrap import dedent | ||
|
||
from arcadepy import Arcade | ||
from crewai import Agent, Crew, Process, Task | ||
from crewai_arcade.manager import CrewAIToolManager | ||
from dotenv import load_dotenv | ||
from langchain_openai import ChatOpenAI | ||
|
||
logging.basicConfig(level=logging.WARNING) | ||
logger = logging.getLogger(__name__) | ||
|
||
load_dotenv() | ||
|
||
arcade_client = Arcade() | ||
user_id = os.getenv("ARCADE_USER_ID") | ||
|
||
manager = CrewAIToolManager(arcade_client, user_id) | ||
# Retrieve the tools from the specified toolkit | ||
tools = manager.get_tools(toolkits=["github", "slack", "google"]) | ||
|
||
|
||
def main_agent(llm, tools) -> Agent: | ||
"""Creates an Agent for handling both PR reviews and calendar management.""" | ||
return Agent( | ||
role="Workflow Coordinator", | ||
backstory=dedent(""" | ||
You are the Code Review Coordinator. Your purpose is to handle multiple tasks including | ||
monitoring GitHub PRs, coordinating with team members through Slack, and managing | ||
calendar events efficiently. | ||
"""), | ||
goal=dedent(""" | ||
Your objectives are to: | ||
1. Identify open PRs and ensure proper communication through Slack | ||
2. Create and manage calendar events while handling time zones appropriately | ||
"""), | ||
tools=tools, | ||
allow_delegation=True, | ||
verbose=True, | ||
llm=llm, | ||
) | ||
|
||
|
||
def workflow_tasks(agent): | ||
pr_check_task = Task( | ||
description=dedent(""" | ||
# Task: PR Review Check | ||
Check for open PRs in the pampa-labs repository called 'gabriela'. | ||
# Process | ||
1. Use GitHub tools to search for open PRs | ||
2. Return the PR details if found | ||
# Expected Format | ||
- PR check results including PR number, title, and URL | ||
# User Request | ||
{user_request} | ||
"""), | ||
expected_output="A detailed report of found PRs with their details.", | ||
agent=agent, | ||
tools=agent.tools, | ||
) | ||
|
||
calendar_task = Task( | ||
description=dedent(""" | ||
# Task: Schedule Review Call | ||
Create a calendar event for the PR review. | ||
# Process | ||
1. Use Google Calendar tools to create a 30-minute review event with lautaro | ||
2. Include PR details in the event description | ||
# Expected Format | ||
- Confirmation of calendar event creation with details | ||
- Event time, duration, and description | ||
# User Request | ||
{user_request} | ||
"""), | ||
expected_output="Confirmation of calendar event creation with all relevant details.", | ||
agent=agent, | ||
tools=agent.tools, | ||
) | ||
|
||
notification_task = Task( | ||
description=dedent(""" | ||
# Task: Slack Notification | ||
Notify relevant team members about the PR and scheduled review. | ||
# Process | ||
1. Use "Slack.SendDmToUser" tool to notify user 'lautaro' | ||
2. Include PR details and scheduled review time in the message | ||
# Expected Format | ||
- Confirmation of Slack notification sent | ||
- Message content summary | ||
# User Request | ||
{user_request} | ||
"""), | ||
expected_output="Confirmation of Slack notification sent with message details.", | ||
agent=agent, | ||
tools=agent.tools, | ||
) | ||
|
||
return [pr_check_task, calendar_task, notification_task] | ||
|
||
|
||
def main(): | ||
logger.info("Starting the workflow.") | ||
agent = main_agent(ChatOpenAI(model_name="gpt-4o"), tools=tools) | ||
|
||
tasks = workflow_tasks(agent) | ||
|
||
crew = Crew( | ||
agents=[agent], | ||
tasks=tasks, | ||
process=Process.sequential, | ||
verbose=True, | ||
) | ||
|
||
# Execute the workflow | ||
result = crew.kickoff( | ||
inputs={"user_request": """Check for any open PRs. Today is 12.03.2024"""} | ||
) | ||
logger.info(f"Workflow executed: {result}") | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |