Skip to content

Commit

Permalink
Added feedback handling logic, context derivation from logs, methods …
Browse files Browse the repository at this point in the history
…for providing feedback and validating tasks and preparation of new log entries.
  • Loading branch information
darielnoel committed Aug 28, 2024
1 parent 719c9ec commit e50ed00
Showing 1 changed file with 165 additions and 5 deletions.
170 changes: 165 additions & 5 deletions src/stores/teamStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {create} from 'zustand';
import { devtools, subscribeWithSelector } from 'zustand/middleware';
import { useAgentStore } from './agentStore';
import { useTaskStore } from './taskStore';
import { TASK_STATUS_enum, AGENT_STATUS_enum, WORKFLOW_STATUS_enum} from '../utils/enums';
import { TASK_STATUS_enum, AGENT_STATUS_enum, WORKFLOW_STATUS_enum, FEEDBACK_STATUS_enum} from '../utils/enums';
import { getTaskTitleForLogs, interpolateTaskDescription} from '../utils/tasks';
import {logger, setLogLevel} from '../utils/logger';
import {calculateTotalWorkflowCost} from '../utils/llmCostCalculator';
Expand Down Expand Up @@ -58,8 +58,7 @@ const createTeamStore = (initialState = {}) => {
addAgents: (agents) => {
const { env } = get();
agents.forEach(agent => {
agent.setStore(useTeamStore);
agent.setEnv(env);
agent.initialize(useTeamStore, env);
});
set(state => ({ agents: [...state.agents, ...agents] }));
},
Expand Down Expand Up @@ -224,7 +223,7 @@ const createTeamStore = (initialState = {}) => {
}));
},

performTask: async (agent, task) => {
workOnTask: async (agent, task) => {
if (task && agent) {
// Log the start of the task
logger.debug(`Task: ${getTaskTitleForLogs(task)} starting...`);
Expand All @@ -246,14 +245,165 @@ const createTeamStore = (initialState = {}) => {
task.inputs = get().inputs; // Pass the inputs to the task
const interpolatedTaskDescription = interpolateTaskDescription(task.description, get().inputs);
task.interpolatedTaskDescription = interpolatedTaskDescription;
await agent.executeTask(task, get().inputs, get().workflowContext);

// Get pending feedbacks directly from the task
const pendingFeedbacks = task.feedbackHistory.filter(f => f.status === FEEDBACK_STATUS_enum.PENDING);

// Derive the current context from workflowLogs, passing the current task ID
const currentContext = get().deriveContextFromLogs(get().workflowLogs, task.id);

// Check if the task has pending feedbacks
if (pendingFeedbacks.length > 0) {
// If there are pending feedbacks, work on feedback
await agent.workOnFeedback(task, task.feedbackHistory, currentContext);
} else {
// If no pending feedbacks, work on task as usual
await agent.workOnTask(task, get().inputs, currentContext);
}
// Once the task is completed, the handleAgentTaskCompleted will be triggered automatically
} catch (error) {
// We are let it propagate to the task execution and captured later on
throw error; // Re-throw the error to be caught by the task execution
}
}
},

deriveContextFromLogs: (logs, currentTaskId) => {
const taskResults = new Map();
const tasks = get().tasks; // Get the tasks array from the store
const currentTaskIndex = tasks.findIndex(task => task.id === currentTaskId);

if (currentTaskIndex === -1) {
console.warn(`Current task with ID ${currentTaskId} not found in the task list.`);
return ''; // Return empty context if current task is not found
}

// Iterate through logs to get the most recent result for each task
for (const log of logs) {
if (log.logType === 'TaskStatusUpdate' && log.taskStatus === TASK_STATUS_enum.DONE) {
const taskIndex = tasks.findIndex(task => task.id === log.task.id);

// Only include tasks that come before the current task in the workflow
if (taskIndex !== -1 && taskIndex < currentTaskIndex) {
taskResults.set(log.task.id, {
taskDescription: log.task.description,
result: log.metadata.result,
index: taskIndex // Store the index for sorting later
});
}
}
}

// Sort the results based on their original task order and create the context string
return Array.from(taskResults.values())
.sort((a, b) => a.index - b.index)
.map(({ taskDescription, result }) =>
`Task: ${taskDescription}\nResult: ${result}\n`
).join('\n');
},

provideFeedback: async (taskId, feedbackContent) => {
const { tasks } = get(); // Access the required actions and state from the store

// Find the task
const taskIndex = tasks.findIndex(t => t.id === taskId);
if (taskIndex === -1) {
logger.error("Task not found");
return;
}
const task = tasks[taskIndex];

// Create a feedback object
const newFeedback = {
content: feedbackContent,
status: FEEDBACK_STATUS_enum.PENDING,
timestamp: Date.now()
};

const newWorkflowLog = {
task,
agent: task.agent,
timestamp: Date.now(),
logDescription: `Workflow running again due to feedback on task.`,
workflowStatus: WORKFLOW_STATUS_enum.RUNNING,
metadata: {
feedback: newFeedback
},
logType: 'WorkflowStatusUpdate'
};


// Add the feedback to the task
const updatedTask = {
...task,
feedbackHistory: [...(task.feedbackHistory || []), newFeedback],
status: TASK_STATUS_enum.REVISE
};

const newTaskLog = get().prepareNewLog({
agent:updatedTask.agent,
task: updatedTask,
logDescription: `Task with feedback: ${getTaskTitleForLogs(updatedTask)}.`,
metadata: {
feedback: newFeedback
},
logType: 'TaskStatusUpdate'
});

set(state => ({
...state,
teamWorkflowStatus: WORKFLOW_STATUS_enum.RUNNING,
workflowLogs: [...state.workflowLogs, newWorkflowLog, newTaskLog],
tasks: state.tasks.map(t => t.id === taskId ? updatedTask : t),
}));
},

validateTask: async (taskId) => {
const task = get().tasks.find(t => t.id === taskId);
if (!task) {
logger.error("Task not found");
return state;
}

if (task.status !== TASK_STATUS_enum.AWAITING_VALIDATION) {
logger.error("Task is not awaiting validation");
return state;
}

const updatedTask = {
...task,
status: TASK_STATUS_enum.VALIDATED
};

const newWorkflowLog = {
task: updatedTask,
agent: updatedTask.agent,
timestamp: Date.now(),
logDescription: `Workflow running cause a task was validated.`,
workflowStatus: WORKFLOW_STATUS_enum.RUNNING,
metadata:{},
logType: 'WorkflowStatusUpdate'
};

const newTaskLog = get().prepareNewLog({
agent: updatedTask.agent,
task: updatedTask,
metadata:{},
logDescription: `Task validated: ${getTaskTitleForLogs(updatedTask)}.`,
logType: 'TaskStatusUpdate',
});

set(state => ({
...state,
tasks: state.tasks.map(t => t.id === taskId ? updatedTask : t),
workflowLogs: [...state.workflowLogs, newWorkflowLog, newTaskLog],
teamWorkflowStatus: WORKFLOW_STATUS_enum.RUNNING,
}));

//TODO: See if we can do it in a reactive way, using the same pattern we already have
get().handleTaskCompleted({agent: updatedTask.agent, task: updatedTask, result: updatedTask.result});
},


/**
* Prepares a new log entry to the centralized activity logs.
Expand Down Expand Up @@ -376,6 +526,12 @@ const createTeamStore = (initialState = {}) => {
duration: '[REDACTED]',
endTime: '[REDACTED]',
startTime: '[REDACTED]',
feedbackHistory: task.feedbackHistory ?
task.feedbackHistory.map(feedback => ({
...feedback,
timestamp: '[REDACTED]' // Redact the timestamp
}))
: [] // Provide an empty array if feedbackHistory is undefined
});

// Function to clean log metadata
Expand All @@ -384,6 +540,10 @@ const createTeamStore = (initialState = {}) => {
duration: metadata.duration ? '[REDACTED]' : metadata.duration,
endTime: metadata.endTime ? '[REDACTED]' : metadata.endTime,
startTime: metadata.startTime ? '[REDACTED]' : metadata.startTime,
feedback: metadata.feedback ? {
...metadata.feedback,
timestamp: '[REDACTED]' // Redact the timestamp
} : {} // Provide an empty object if feedback is undefined
});

// Clone and clean agents
Expand Down

0 comments on commit e50ed00

Please sign in to comment.