Skip to content

Commit

Permalink
made BatchScheduler an expression. removed engine from its params and…
Browse files Browse the repository at this point in the history
… using bind decorator instead to bind to engines __call__ method. updated readme and package.json
  • Loading branch information
Aynur Adanbekova authored and Aynur Adanbekova committed Nov 3, 2024
1 parent 5ee9aa9 commit 20517e2
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 17 deletions.
9 changes: 1 addition & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,11 @@ class TestExpression(Expression):
def forward(self, input, kwargs):
return self.Symbol(input).query("Summarize this input", kwargs)


#Set up your engine (this example uses a mock engine)
from your_engine_module import MockGPTXChatEngine
engine = MockGPTXChatEngine()
EngineRepository.register("neurosymbolic", engine_instance=engine)


#Prepare your inputs
inputs = ["test1", "test2", "test3"]

#Create and run the BatchScheduler
scheduler = BatchScheduler(TestExpression, num_workers=2, engine=engine, dataset=inputs)
scheduler = BatchScheduler(TestExpression, num_workers=2, dataset=inputs, batch_size=2)
results = scheduler.run()


8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
{
"version": "0.0.1",
"name": "AynurAda/symbatcher",
"description": "<Project Description>",
"description": "the symbolicai batch scheduler for parallel execution of expressions and batched llm calls",
"expressions": [
{
"module": "src/func",
"type": "MyExpression"
"type": "BatchScheduler"
}
],
"run": {
"module": "src/func",
"type": "MyExpression"
"type": "BatchScheduler"
},
"dependencies": []
}
}s
13 changes: 8 additions & 5 deletions src/func.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
import logging
import threading
from typing import Any, Callable, Dict, List, Optional, Union

from symai.core_ext import bind
from symai import Expression

lgr = logging.getLogger()
lgr.setLevel(logging.CRITICAL)


class BatchScheduler:
"""
A class for scheduling and executing batch operations with Expressions from symbolicai.
Expand All @@ -16,19 +17,17 @@ class BatchScheduler:
utilizing multiple workers and an external engine for processing.
"""

def __init__(self, expr: Expression, num_workers: int, engine: Callable, dataset: List[Any], batch_size: int = 5):
def __init__(self, expr: Expression, num_workers: int, dataset: List[Any], batch_size: int = 5):
"""
Initialize the BatchScheduler for symbolicai Expressions.
Args:
expr (Expression): The symbolicai Expression to be executed.
num_workers (int): The number of worker threads to use.
engine (Callable): The engine function for processing batches of Expression results.
dataset (List[Any]): The list of data points to process through the Expression.
batch_size (int, optional): The size of each batch. Defaults to 5.
"""
self.num_workers: int = num_workers
self.engine: Callable = engine
self.dataset: List[Any] = dataset
self.results: Dict[Any, Any] = {}
self.arguments: List[Any] = []
Expand All @@ -40,7 +39,11 @@ def __init__(self, expr: Expression, num_workers: int, engine: Callable, dataset
self.llm_response_ready: Dict[int, threading.Event] = {}
self.pending_tasks: int = len(self.dataset)
self.expr: Expression = expr()


@bind(engine="neurosymbolic", property="__call__")
def engine(self):
pass

def single_expression(self, data_point: Any) -> Any:
"""
Execute the symbolicai Expression for a single data point.
Expand Down

0 comments on commit 20517e2

Please sign in to comment.