NATS App is wrapper application on NATS Connection
NATS_URL = "nats://localhost:4222"
nc = NATSApp(NATS_URL)
await nc.connect()
@nc.push_subscribe("app.test.echo", queue="worker")
async def rpc_func_echo(msg: Any) -> str:
return f"Response with Msg.data: {msg.data}"
res = await nc.request("app.test.echo", {"args": ["test"]})
print(res)
@nc.js_push_subscribe("app.test.js.subs", queue="worker")
async def handler(msg: Msg):
print(msg.data)
await msg.ack()
@nc.js_pull_subscribe("app.subject.js.subs", batch=1)
async def handler(msgs: list[Msg]):
for m in msgs:
print(m)
await m.ack()
await nc.js.publish("app.subject.js.subs", b"TEST123")