Skip to content

Commit

Permalink
update example
Browse files Browse the repository at this point in the history
  • Loading branch information
daluf committed Jun 25, 2019
1 parent bef3a67 commit 3081e79
Showing 1 changed file with 8 additions and 80 deletions.
88 changes: 8 additions & 80 deletions examples/simple-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,92 +3,18 @@ const rsmq = new RedisSMQ( {host: "127.0.0.1", port: 6379, ns: "rsmq"} );

/*
======================================
Create simple wrappers for rsmq functions
======================================
*/

// create a new queue
function createQueue(queue, cb) {
rsmq.createQueue({ qname: queue }, function (err, resp) {
if (err) {
cb(err);
return;
}

if (resp) {
cb(null);
}
});
}

// push a message into a queue
function sendMessage(queue, msg, cb) { // `Hello World at ${new Date().toISOString()}`
rsmq.sendMessage({ qname: queue, message: msg }, function (err, resp) {
if (err) {
cb(err);
return;
}

if (resp) {
cb(null, resp);
}
});
}

// retreive latest message from a queue
function receiveMessage(queue, cb) {
rsmq.receiveMessage({ qname: queue }, function (err, resp) {
if (err) {
cb(err);
return;
}

if (resp.id) {
cb(null, resp);
} else {
cb("No messages in queue...");
}
});
}

// delete a message from a queue
function deleteMessage(queue, msgid, cb) {
rsmq.deleteMessage({ qname: queue, id: msgid }, function (err, resp) {
if (err) {
cb(err);
return;
}

cb(null, resp);
});
}

function getQueueAttributes(queue, cb) {
rsmq.getQueueAttributes({ qname: queue }, function (err, resp) {
if (err) {
cb(err);
return;
}

cb(null, resp);
});
}

/*
======================================
Test all available functions
Make a simple queue and send/receive messages
======================================
*/

function main() {
const queuename = "testqueue";

// create a queue
createQueue(queuename, (err) => {
rsmq.createQueue({ qname: queuename }, (err) => {
if (err) {
console.error(err);

if (err.name !== "queueExists") {
console.error(err);
return;
} else {
console.log("queue exists.. resuming..");
Expand All @@ -104,7 +30,7 @@ main();
function sendMessageLoop(queuename) {
// push a message every 2 seconds into the queue
setInterval(() => {
sendMessage(queuename, `Hello World at ${new Date().toISOString()}`, (err) => {
rsmq.sendMessage({ qname: queuename, message: `Hello World at ${new Date().toISOString()}` }, (err) => {
if (err) {
console.error(err);
return;
Expand All @@ -119,7 +45,7 @@ function receiveMessageLoop(queuename) {
// check for new messages every 2.5 seconds
setInterval(() => {
// alternative to receiveMessage would be popMessage => receives the next message from the queue and deletes it.
receiveMessage(queuename, (err, resp) => {
rsmq.receiveMessage({ qname: queuename }, (err, resp) => {
if (err) {
console.error(err);
return;
Expand All @@ -130,14 +56,16 @@ function receiveMessageLoop(queuename) {
console.log("received message:", resp.message);

// we are done with working on our message, we can now safely delete it
deleteMessage(queuename, resp.id, (err) => {
rsmq.deleteMessage({ qname: queuename, id: resp.id }, (err) => {
if (err) {
console.error(err);
return;
}

console.log("deleted message with id", resp.id);
});
} else {
console.log("no available message in queue..")
}
});
}, 500);
Expand Down

0 comments on commit 3081e79

Please sign in to comment.