Skip to content

Commit

Permalink
added an option to specify max number of requeues when connection clo…
Browse files Browse the repository at this point in the history
…ses unexpectedly
  • Loading branch information
igorsechyn authored and andris9 committed Feb 21, 2020
1 parent 54390ef commit 8a927f5
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 13 deletions.
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"eslint.enable": false
}
65 changes: 52 additions & 13 deletions lib/smtp-pool/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class SMTPPool extends EventEmitter {

this._queue.push({
mail,
requeueAttempts: 0,
callback
});

Expand Down Expand Up @@ -391,23 +392,16 @@ class SMTPPool extends EventEmitter {

if (connection.queueEntry) {
// If the connection closed when sending, add the message to the queue again
// if max number of requeues is not reached yet
// Note that we must wait a bit.. because the callback of the 'error' handler might be called
// in the next event loop
setTimeout(() => {
if (connection.queueEntry) {
this.logger.debug(
{
tnx: 'pool',
cid: connection.id,
messageId: connection.queueEntry.messageId,
action: 'requeue'
},
'Re-queued message <%s> for #%s',
connection.queueEntry.messageId,
connection.id
);
this._queue.unshift(connection.queueEntry);
connection.queueEntry = false;
if (this._shouldRequeuOnConnectionClose(connection.queueEntry)) {
this._requeueEntryOnConnectionClose(connection)
} else {
this._failDeliveryOnConnectionClose(connection);
}
}
this._continueProcessing();
}, 50);
Expand All @@ -421,6 +415,51 @@ class SMTPPool extends EventEmitter {
return connection;
}

_shouldRequeuOnConnectionClose(queueEntry) {
if (this.options.maxRequeues === undefined || this.options.maxRequeues < 0) {
return true;
}

return queueEntry.requeueAttempts && queueEntry.requeueAttempts < this.options.maxRequeues;
}

_failDeliveryOnConnectionClose(connection) {
if (connection.queueEntry && connection.queueEntry.callback) {
try {
connection.queueEntry.callback(new Error('Reached maximum number of retries after connection was closed'));
} catch (E) {
this.logger.error(
{
err: E,
tnx: 'callback',
messageId: connection.queueEntry.messageId,
cid: connection.id
},
'Callback error for #%s: %s',
connection.id,
E.message
);
}
connection.queueEntry = false;
}
};

_requeueEntryOnConnectionClose(connection) {
this.logger.debug(
{
tnx: 'pool',
cid: connection.id,
messageId: connection.queueEntry.messageId,
action: 'requeue'
},
'Re-queued message <%s> for #%s',
connection.queueEntry.messageId,
connection.id
);
this._queue.unshift(connection.queueEntry);
connection.queueEntry = false;
}

/**
* Continue to process message if the pool hasn't closed
*/
Expand Down

0 comments on commit 8a927f5

Please sign in to comment.