Efficient Convenient Asynchronous & Multithreaded Programming
pip install agently-stage
Asynchronous and multithreaded programming in Python has always been complex and confusing, especially when building applications for GenAI. In <Agently AI application development framework, we’ve introduced numerous solutions to enhance control over GenAI outputs. For example, with Agently Instant Mode, you can perform streaming parsing of formatted data like JSON while generating structured results. Additionally, Agently Workflow allows you to orchestrate scheduling relationships between multiple GenAI request results.
As we delved deeper into controlling GenAI outputs, we recognized that the inherent complexity of combining asynchronous and multithreaded programming in Python poses a significant challenge. This complexity often hinders developers from leveraging GenAI capabilities efficiently and creatively.
To solve this problem, we’ve spun off Agently Stage, a dedicated management solution specifically designed for mixed asynchronous and multithreaded programming, from the core Agently AI application development framework. With Agently Stage, we aim to help Python developers to do mixed asynchronous and multithreaded programming in Python much easier.
Read on to discover what revolutionary change Agently Stage can bring to asynchronous and multithreaded programming to make it efficiency and simplicity!
With Agently Stage, you can just use stage.go()
to start asynchronous function or normal function in main thread or any other thread. stage.go()
will return a StageResponse
/StageHybridGenerator
instance for you. You can use response.get()
to wait for the result some time after, or not if you don't care.
import time
import asyncio
from agently_stage import Stage
async def async_task(input_sentence:str):
print("Start Simulate Async Network Request...")
await asyncio.sleep(1)
print("Response Return...")
return f"Network Request Data: Your input is { input_sentence }"
def sync_task(a:int, b:int):
print("Start Simulate Sync Long Time Task...")
time.sleep(2)
print("Task Done...")
return f"Task Result: { a + b }"
stage = Stage()
async_response = stage.go(async_task, "Agently Stage is awesome!")
sync_response = stage.go(sync_task, 1, 2)
stage.close()
#Try remove this line below, it'll work perfectly too.
print(async_response.get(), "|", sync_response.get())
Start Simulate Sync Long Time Task...
Start Simulate Async Network Request...
Response Return...
Task Done...
Network Request Data: Your input is Agently Stage is awesome! | Task Result: 3
You can use with
to make code expression and context management easier:
with Stage() as stage:
async_response = stage.go(async_task, "Agently Stage is awesome!")
sync_response = stage.go(sync_task, 1, 2)
print(async_response.get(), "|", sync_response.get())
By default, each Stage dispatch environment has 1 independent daemon thread for its coroutine tasks. You can customize it as below if you want:
exception_handler
(function(Exception)->any
): Customize exception handler to handle exceptions those raised from tasks running in Agently Stage instance's dispatch environment. By default, we provide a handler that will raise exceptions in runtime and collect all exceptions and print all details in a list to the console after main thread exit.is_daemon
(bool
): When Agently Stage instance is daemon, the thread of it will automatically close when main thread is closing, if you want to close it manually usingwith
or.close()
otherwise keep the thread alive.
Decorator @<stage_instance>.func
can transform a normal function into StageFunction
instance with status management which can be started without blocking current thread in one place and wait for result in other places. StageFunction
will be run in the dispatch environment provided by Stage
instance.
import time
from agently_stage import Stage
stage = Stage()
@stage.func
def task(sentence:str):
time.sleep(1)
return f"Done: { sentence }"
# Defined but hasn't run
task
# Start running
task.go("First")
# or just `task()` like call a function normally
# Block current thread and wait until done to get result
result = task.wait()
print(result)
# Wait again won't restart it
result_2 = task.wait()
print(result_2)
# Reset make the function can be started again
task.reset()
task("Second")
result_3 = task.wait()
print(result_3)
Done: First
Done: First
Done: Second
How can this be useful? Think about a scenario like this:
import time
import asyncio
from agently_stage import Stage
# We create a handler in one dispatch
with Stage() as stage_1:
@stage_1.func
async def handler(sentence):
return f"Someone said: { sentence }"
# We wait this handler in another dispatch
with Stage() as stage_2:
def waiting():
result = handler.wait()
print(result)
stage_2.go(waiting)
# Some uncertain time later, the handler is called
time.sleep(1)
async def executor():
await asyncio.sleep(1)
handler("StageFunction is useful!")
stage_2.go(executor)
Someone said: StageFunction is useful!
If you try to run a generator function with Agently Stage, you will get a StageHybridGenerator
instance as response. You can iterate over StageHybridGenerator
using for
/async for
or call next()
/anext()
explicitly. You can also use this feature to transform an iter generator into an async iter generator or otherwise. In fact, using StageHybridGenerator
you don't event need to care about if this generator is an iter generator or an async iter generator anymore!
import asyncio
from agently_stage import Stage
with Stage() as stage:
async def start_gen(n:int):
for i in range(n):
await asyncio.sleep(1)
yield i+1
gen = stage.go(start_gen, 5)
for item in gen:
print(item)
1
2
3
4
5
You can also use <StageHybridGenerator instance>.get()
to get all results yielded from the generator function processing as items in a result list.
import asyncio
from agently_stage import Stage
with Stage() as stage:
async def start_gen(n:int):
for i in range(n):
await asyncio.sleep(1)
yield i+1
gen = stage.go(start_gen, 5)
result_list = gen.get()
print(result_list)
[1, 2, 3, 4, 5]
By default, generator function will be processed immediately when stage.go(<generator>)
no matter how later StageHybridGenerator
be used. But if you want to preserve the characteristic of an iter generator that starts execution only when called, you can use parameter lazy=True
in stage.go()
to do so.
Also, if you want to consume the original yielded value or exception and re-yield a new result as the final yielded value, you can use parameter on_success
and on_error
to append handlers to StageHybridGenerator
. Those handlers will executed in Agently Stage so both sync function and async function are available.
import time
import asyncio
from agently_stage import Stage
with Stage() as stage:
async def start_gen(n:int):
for i in range(n):
await asyncio.sleep(1)
yield i+1
raise Exception("Some Error")
gen = stage.go(
start_gen, 5,
lazy=True, #<- Set generator as lazy mode
# Define runtime handler to consume original yielded value and re-yield the final value
on_success=lambda item: item * 2,
# Define runtime handler to consume raised exception and re-yield the final value or exception
on_error=lambda e: str(e)
)
time.sleep(5)
# Generator function `start_gen` start here only when gen is iterated over by `for`
for item in gen:
print(item)
print(gen.get())
2
4
6
8
10
Some Error
[2, 4, 6, 8, 10, 'Some Error']
And when iterating over an async iter generator, we need to use await in the consumer to hand over coroutine control. If the interval between await calls is too short, it can increase CPU load, while a longer interval might affect execution efficiency. We have set this interval to 0.1 seconds
by default, but you can adjust it using parameter wait_interval
in stage.go()
.
gen = stage.go(start_gen, 5, wait_interval=0.5)
Transporting data between threads and coroutines in a streaming manner is so usual in GenAI developments because that's exactly how gen models work - predicting token by token. The work pattern from the source will directly influence the execution style of all subsequent downstream tasks. So we created Tunnel
to make tasks like this easier.
import time
import asyncio
from agently_stage import Stage, Tunnel
tunnel = Tunnel()
with Stage() as stage:
def consumer():
print("GO CONSUMER")
time.sleep(1)
# You can use `tunnel.get_gen()` to get a `StageHybridGenerator` from tunnel instance
gen = tunnel.get_generator()
for data in gen:
print("streaming:", data)
async def async_consumer():
print("GO A CONSUMER")
# Or you can just iterate over tunnel data by `for`/`async for`
async for data in tunnel:
print("async streaming:", data)
async def provider(n:int):
print("GO PROVIDER")
for i in range(n):
tunnel.put(i + 1)
await asyncio.sleep(0.1)
# If you forget to .put_stop(), tunnel will close after 10s by default
tunnel.put_stop()
# State consumer first
stage.go(consumer)
stage.go(async_consumer)
# Provider start providing data sometime later
time.sleep(1)
stage.get(provider, 5)
# You can also use `tunnel.get()` to get a final yielded item list
print(tunnel.get()[0])
GO CONSUMER
GO A CONSUMER
GO PROVIDER
async streaming: 1
async streaming: 2
async streaming: 3
async streaming: 4
async streaming: 5
streaming: 1
streaming: 2
streaming: 3
streaming: 4
streaming: 5
1
Sometimes, we won't know if the data transportation from upstream is done or not and want to set a timeout to stop waiting, parameter timeout
when creating Tunnel instance or in .get_gen()
, .get()
will help us to do so. By default, we set this timeout to 10 seconds
. You can set timeout value as None
manually if you want to keep waiting no matter what.
from agently_stage import Tunnel
tunnel = Tunnel(timeout=None)
# Timeout setting in .get_gen() or .get() will have higher priority
gen = tunnel.get_gen(timeout=1)
# But timeout setting only works at the first time that start the generator consumer
# In this case, that's `tunnel.get_gen()`, not `tunnel.get()`
result_list = tunnel.get(timeout=5)
In Python, especially when you're a senior node.js coder, if you want to build a copy module of EventEmitter from node.js, you will feel something is not right. How so? It's because if you want to allow event listener to be an async function, you have to make the event executor function that will call event listener also an async function. In simple way to build a copy module of EventEmitter, that means .emit()
must be an async function. And here's the funny part: we have to put an await
before we call emitter.emit()
. That's ridiculous because we can not ensure that we emit event in an async environment!
But event-driven development is so important to asynchronous and multithreaded programming, a REAL EventEmitter that WORKS EXACTLY THE SAME AS IT SHOULD BE USED IN NODEJS is required with no doubts.
So Agently Stage provide you an EventEmitter
can be used like this:
from agently_stage import Stage, EventEmitter
emitter = EventEmitter()
async def listener(data):
print(f"I got: { data }")
# You can return value to emitter
return True
emitter.on("data", listener)
with Stage() as stage:
# Submit task that wait to run later
stage.go(lambda: emitter.emit("data", "EventEmitter is Cool!"))
responses = emitter.emit("data", "I'll say it again, EventEmitter is Cool!")
# Get responses from all event listeners
for response in responses:
print(response.get())
I got: I'll say it again, EventEmitter is Cool!
True
I got: EventEmitter is Cool!
No asyncio.run()
! No await emitter.emit()
! No worries about choosing event loops! JUST .emit()
WHEREVER YOU WANT!💪💪💪
Agently Stage
is part of our main work Agently AI application development framework which aim to make GenAI application development faster, easier and smarter. Please ⭐️ this project and maybe try Agently framework by pip install Agently
later, thank you!
If you want to contact us, email to [email protected], leave comments in Issues or Discussions or leave messages to our X: