Skip to content

Commit

Permalink
Added support for concurrent workers - get and lock next available N …
Browse files Browse the repository at this point in the history
…messages for a given DIRECT domain
  • Loading branch information
mariaro committed Nov 12, 2015
1 parent eb75966 commit 3f7e7c9
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 23 deletions.
36 changes: 33 additions & 3 deletions deploy/redhat/files/postfix.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ DROP TRIGGER IF EXISTS userAddressTrigger on users;
CREATE TRIGGER userAddressTrigger AFTER INSERT OR UPDATE OF address ON users
FOR EACH ROW EXECUTE PROCEDURE setUserDetails();





CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

CREATE TABLE IF NOT EXISTS messages
(
id serial NOT NULL,
Expand All @@ -57,6 +59,8 @@ CREATE TABLE IF NOT EXISTS messages
recipient character varying,
sender character varying,
domain character varying,
guid uuid,
processing_started timestamp,
CONSTRAINT id PRIMARY KEY (id)
)
WITH (
Expand All @@ -66,7 +70,33 @@ WITH (
ALTER TABLE messages
OWNER TO direct;



CREATE OR REPLACE FUNCTION get_and_lock_next_messages(count integer, message_domain character varying, processingExpiryAge integer)
RETURNS table(id integer, recipient character varying, sender character varying, guid uuid) AS
$BODY$

BEGIN
--reset expired processing timestamps
UPDATE messages m SET processing_started = null WHERE m.domain = message_domain and age(LOCALTIMESTAMP, processing_started) > processingExpiryAge * interval '1 second';

--get unallocated messages and set processing timestamp
return query
with ids as (
select m.id FROM messages m WHERE m.domain = message_domain and processing_started is null ORDER BY m.id LIMIT count
),
updated as (
update messages m SET processing_started = LOCALTIMESTAMP WHERE m.id in (select * from ids) returning m.id, m.recipient, m.sender, m.guid
)
select *
from updated
ORDER BY id;
END
$BODY$
LANGUAGE plpgsql;




CREATE TABLE IF NOT EXISTS domains
(
id serial NOT NULL,
Expand Down
31 changes: 30 additions & 1 deletion deploy/ubuntu/files/postfix.sql
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ DROP TRIGGER IF EXISTS userAddressTrigger on users;
CREATE TRIGGER userAddressTrigger AFTER INSERT OR UPDATE OF address ON users
FOR EACH ROW EXECUTE PROCEDURE setUserDetails();




CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

CREATE TABLE IF NOT EXISTS messages
(
Expand All @@ -70,6 +72,8 @@ CREATE TABLE IF NOT EXISTS messages
recipient character varying,
sender character varying,
domain character varying,
guid uuid,
processing_started timestamp,
CONSTRAINT id PRIMARY KEY (id)
)
WITH (
Expand All @@ -79,6 +83,31 @@ WITH (
ALTER TABLE messages
OWNER TO direct;


CREATE OR REPLACE FUNCTION get_and_lock_next_messages(count integer, message_domain character varying, processingExpiryAge integer)
RETURNS table(id integer, recipient character varying, sender character varying, guid uuid) AS
$BODY$

BEGIN
--reset expired processing timestamps
UPDATE messages m SET processing_started = null WHERE m.domain = message_domain and age(LOCALTIMESTAMP, processing_started) > processingExpiryAge * interval '1 second';

--get unallocated messages and set processing timestamp
return query
with ids as (
select m.id FROM messages m WHERE m.domain = message_domain and processing_started is null ORDER BY m.id LIMIT count
),
updated as (
update messages m SET processing_started = LOCALTIMESTAMP WHERE m.id in (select * from ids) returning m.id, m.recipient, m.sender, m.guid
)
select *
from updated
ORDER BY id;
END
$BODY$
LANGUAGE plpgsql;



CREATE TABLE IF NOT EXISTS domains
(
Expand Down
4 changes: 3 additions & 1 deletion src/api/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@ module.exports = {
baseUrl: baseUrl,

//paging for resource search
pageSize: 10
pageSize: 10,
//maximum time allowed for message processing (in seconds); after this, processing is considered to have failed and the message is available again
maxMessageProcessingTime: 60
};
11 changes: 7 additions & 4 deletions src/api/resources.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ var resources = {
},
message: {
queries: {
list: 'SELECT id, recipient, sender FROM messages %s ORDER BY id LIMIT $1 OFFSET $2;',
get_and_lock_next_messages: 'select get_and_lock_next_messages($1, $2, ' + config.maxMessageProcessingTime + ');',
list: 'SELECT id, recipient, sender, guid FROM messages %s ORDER BY id LIMIT $1 OFFSET $2;',
count: 'SELECT count(*) from messages %s;',
delete: 'DELETE FROM messages WHERE id = $1;'
},
Expand Down Expand Up @@ -192,10 +193,12 @@ function getEntities(req, res, next, type) {
var limit = 'ALL';
var offset = 0;

if (config.pageSize) {
limit = config.pageSize;
var size = (req.query._count !== undefined ? req.query._count : config.pageSize) || 0;

if (size) {
limit = size;
if (page) {
offset = (page - 1) * config.pageSize;
offset = (page - 1) * size;
}
}
if (offset && offset >= totalResults) {
Expand Down
126 changes: 113 additions & 13 deletions src/api/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ bundle.registerRoutes(server);
server.listen(port, function() {
console.log('%s listening at %s', server.name, server.url);
});

function getMessage(req, res, next) {
if(req.query.type && ['msg', 'original'].indexOf(req.query.type) === -1) {
res.send(400, 'Type parameter expects a value from list: msg, original.');
return next(false);
}
pg.connect(connString, function(err, client, done) {
pg.connect(connString, function(err, client, done) {
if(err) {
console.error('error fetching client from pool', err);
res.send(500);
Expand Down Expand Up @@ -89,11 +89,111 @@ function getMessage(req, res, next) {
}

function getMessages(req, res, next) {
resources.getEntities(req, res, next, 'Message');
req.query.lock = req.query.lock === 'true' ? true : req.query.lock;
if(!req.query.lock) {
resources.getEntities(req, res, next, 'Message');
} else {
getNextMessages(req, res, next);
}
}

function getNextMessages(req, res, next) {
var data = {
log : req.log
};
var connect = function(data, callback) {
pg.connect(connString, function(err, client, done) {
data.client = client;
data.done = done;
if(err) {
callback(err);
} else {
callback(null, data);
}
});
};

var beginTransaction = function(data, callback) {
console.log('Begin transaction');
data.client.query('BEGIN', function(err, result) {
if(err) {
console.log('Error beginning transaction');
return callback(err, data);
}
return callback(null, data);
});
};

var queryNextMessages = function(data, callback) {
var size = (req.params._count !== undefined ? req.params._count : config.pageSize) || 0;

var meta = resources.message;
data.client.query(meta.queries.get_and_lock_next_messages, [ size, req.params.domain ], function (err, result) {
data.done();
if (err) {
return callback(err, data);
}

var entities = {
entry: []
};

for (var i = 0; i < result.rows.length; i++) {
var row = result.rows[i].get_and_lock_next_messages.replace(/(^\()|(\)$)/, '').split(','); //remove parantheses and split by comma
// console.log(row);
var entity = {
id : row[0],
content : {
recipient : row[1],
sender : row[2],
guid : row[3]
}
};
entities.entry.push({
id: entity.id,
content: entity.content
});
}
entities.totalResults = entities.entry.length;
data.entities = entities;
callback(null, data);
});
};

function commitTransaction(data, callback) {
console.log('Commit transaction');
data.client.query('COMMIT', function(err, result) {
if(err) {
console.log('Error commiting transaction');
return callback(err, data);
}
callback(null, data);
});
}


async.waterfall([
function(cb) {
cb(null, data);
},
connect,
beginTransaction,
queryNextMessages,
commitTransaction
], function(err, data) {
if(err) {
console.error('error running query', err);
res.send(500);
return next(false);
}

res.send(200, data.entities);
next();
});
}

//function getMessages(req, res, next) {
// pg.connect(connString, function(err, client, done) {
//function getMessages(req, res, next) {
// pg.connect(connString, function(err, client, done) {
// if(err) {
// console.error('error fetching client from pool', err);
// res.send(500);
Expand Down Expand Up @@ -140,13 +240,13 @@ function sendMessage(req, res, next) {
},
saveMessage
], function(err, result) {
if(err === null) {
console.log('smimesend.py successfully sent a message');
if(err === null) {
console.log('smimesend.py successfully sent a message');
res.send(200);
return;
}
console.error('smimesend.py error:' + err);
console.error('smimesend.py error:' + err);

if(err.code === 2) {
res.send(400, 'Mail headers missing');
return;
Expand All @@ -155,8 +255,8 @@ function sendMessage(req, res, next) {
res.send(422, err.message);
return;
}
res.send(500, err);

res.send(500, err);
});
}

Expand All @@ -177,7 +277,7 @@ function saveMessage(req, callback) {
}
err.message = relevantErrMsg;
}

callback(err);
});
child.stdin.write(req.body);
Expand All @@ -186,7 +286,7 @@ function saveMessage(req, callback) {


function deleteMessage(req, res, next) {
pg.connect(connString, function(err, client, done) {
pg.connect(connString, function(err, client, done) {
if(err) {
console.error('error fetching client from pool', err);
res.send(500);
Expand Down
2 changes: 1 addition & 1 deletion src/smime/smimercv.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def save_message(queue_id, recipient, sender, msg_orig, msg_plain):
cur = conn.cursor();
tokens = recipient.split("@");
domain = tokens[1] if len(tokens) > 1 else None
cur.execute("INSERT INTO messages(queue_id,recipient,sender,original,msg,domain) VALUES(%s,%s,%s,%s,%s,%s);",(queue_id,recipient,sender,msg_orig,msg_plain,domain))
cur.execute("INSERT INTO messages(queue_id,recipient,sender,original,msg,domain,guid) VALUES(%s,%s,%s,%s,%s,%s,%s);",(queue_id,recipient,sender,msg_orig,msg_plain,domain,uuid_generate_v4()))
conn.commit()

def send_mdn(sender, recipient, orig_message_id, subject, msg_plain):
Expand Down

0 comments on commit 3f7e7c9

Please sign in to comment.