Skip to content

centum/nats_app

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

37 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

NATS App

pypi versions test Apache 2 licensed

NATS App is wrapper application on NATS Connection

Create NATS Application

NATS_URL = "nats://localhost:4222"

nc = NATSApp(NATS_URL)
await nc.connect()

Add RPC Handler

@nc.push_subscribe("app.test.echo", queue="worker")
async def rpc_func_echo(msg: Any) -> str:
    return f"Response with Msg.data: {msg.data}"

Call RPC

res = await nc.request("app.test.echo", {"args": ["test"]})
print(res)

JetStream push subscription

@nc.js_push_subscribe("app.test.js.subs", queue="worker")
async def handler(msg: Msg):
    print(msg.data)
    await msg.ack()

JetStream pull subscription

@nc.js_pull_subscribe("app.subject.js.subs", batch=1)
async def handler(msgs: list[Msg]):
    for m in msgs:
        print(m)
        await m.ack()

JetStream publish

await nc.js.publish("app.subject.js.subs", b"TEST123")