A simplified wrapper around BullMQ for easy queue management in Node.js applications. provides a straightforward way to define, manage, and process queues with minimal boilerplate.
- Simple queue definition interface
- Type-safe queue processing
- Easy worker management
- Simplified job addition API
- Built on top of robust BullMQ infrastructure
npm install @srttk/queue bullmq
Create a queue processor by defining a type for your payload and implementing the IQueueProcess
interface:
// greet.ts
import { IQueueProcess } from "@srttk/queue"
type GreetPayload = {
name: string
}
type Result {
message: string
}
export const greet: IQueueProcess<GreetPayload, Result> = {
name: "greet",
process: async ({ data }) => {
const message = `Hello ${data.name}`;
console.log(message)
// Return result - optional
return { messsage }
}
onCompleted: (job, result) => {
// Job completed
console.info(`Job ${job.id} completed `, result)
},
onFailed: async (job) => {
// Job Failed
console.error(`Job $job.id} failed.`)
},
}
Set up your queue manager with your defined queues:
// queue.ts
import { QueueManager } from "@srttk/queue"
import { greet } from './greet'
export default new QueueManager({ greet })
In your main application entry point:
// app.ts
import queue from "./queue"
async function start() {
// Initialize queue manager
await queue.startQueues();
await queue.startWorkers();
// Add job to queue
await queue.addJob("greet", "my-job", { name: "Luke Skywalker" });
}
start()
The queue system provides comprehensive event handling through various lifecycle hooks. These events allow you to monitor and respond to different states of your jobs and queues.
Triggered when a job is successfully completed.
onCompleted: (job, result) => {
console.log(`Job ${job.id} completed with result:`, result);
}
Triggered when a job fails due to an error.
onFailed: (job, error) => {
console.error(`Job ${job.id} failed:`, error.message);
}
Triggered when job progress is updated.
onProgress: (job, progress) => {
console.log(`Job ${job.id} is ${progress}% complete`);
}
Triggered when a job starts processing.
onActive: (job) => {
console.log(`Job ${job.id} has started processing`);
}
Triggered when a job is stalled (worker is not responding).
onStalled: (job) => {
console.warn(`Job ${job.id} has stalled`);
}
Triggered when the queue is ready to process jobs.
onReady: () => {
console.log('Queue is ready to process jobs');
}
Triggered when the queue is paused.
onPaused: () => {
console.log('Queue has been paused');
}
export const emailProcessor: IQueueProcess<EmailPayload> = {
name: "email",
process: async ({ data }) => {
// Process email
},
onCompleted: (job, result) => {
console.log(`Email sent successfully: ${job.id}`);
},
onFailed: (job, error) => {
console.error(`Failed to send email: ${error.message}`);
},
onProgress: (job, progress) => {
console.log(`Sending email... ${progress}%`);
},
onActive: (job) => {
console.log(`Starting to send email: ${job.id}`);
}
}
QueueManager provides a graceful shutdown mechanism to ensure that in-progress jobs are completed and resources are properly released when your application terminates.
let signals = ["SIGINT", "SIGTERM"];
signals.map((signal) => {
process.on(signal, async () => {
// close all queuee and workers
await queue.shutdown();
});
});
Namespaces allow you to isolate and group your queues, preventing naming conflicts across different applications or environments. By using namespaces, you can
- Separate queues for different applications
- Create isolated environments (development, staging, production)
- Avoid queue name collisions in shared infrastructure
export default new QueueManager({ greet }, { namespace:"app1_development"})
QueueManager accepts Redis connection settings as part of its initialization options. These settings follow the same format as BullMQ connection options.
export default new QueueManager(
{ greet},
{
connection: {
host: "localhost",
port: 6379,
}
}
);
- connection: Redis connection settings
host
: Redis server hostname (default: "localhost")port
: Redis server port (default: 6379)- Additional BullMQ connection options are supported
The Queue Manager supports organizing workers into logical groups, allowing you to selectively start specific sets of workers. This is particularly useful when your application needs to run only certain workers based on different contexts or deployment scenarios.
You can assign workers to groups by specifying a groupName
in the queue process definition:
export const greet: IQueueProcess<GreetPayload, Result> = {
name: "greet",
process: async ({ data }) => {
const message = `Hello ${data.name}`;
console.log(message);
return {
message,
};
},
groupName: "app1" // Assign this worker to the "app1" group
};
To start workers belonging to a specific group:
const qm = new QueueManager({ greet });
qm.startQueues();
qm.startWorkers('app1'); // Only starts workers in the "app1" group
- Microservices: Run different worker groups on different services
- Development Environment: Start only relevant workers during local development
- Resource Optimization: Run specific workers on dedicated servers based on their resource requirements
- Feature Segregation: Organize workers by feature or domain area
-
Workers without a
groupName
will not be started when usingstartWorkers(groupName)
-
The
groupName
is optional - if not using worker groups, you can omit this property
new QueueManager(queues: Record<string, IQueueProcess>)
startQueues()
: Initializes all defined queuesstartWorkers(groupName?: string)
: Starts workers for all registered queues, optional group nameaddJob(queueName: string, jobId: string, data: any)
: Adds a new job to the specified queuegetQueue(name: string)
: Get the bullmq queue instancegetWorker(name: string)
: Get bullmq worker instanceshutdown
: Close all queues and workers
interface IQueueProcess<T> {
name: string;
process: (job: { data: T }) => Promise<void>;
}
The package uses BullMQ under the hood. You can configure Redis connection settings and other options when initializing the QueueManager (documentation coming soon).
This package is written in TypeScript and includes type definitions. It provides full type safety for your queue payloads and processors.
Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature
) - Commit your changes (
git commit -m 'Add some amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
MIT
For issues and feature requests, please use the GitHub issues page.