Skip to content

Commit

Permalink
Merge branch 'multiple-workers'
Browse files Browse the repository at this point in the history
  • Loading branch information
mariaro committed Nov 27, 2015
2 parents eb75966 + 0ad388c commit 4ed9d6d
Show file tree
Hide file tree
Showing 7 changed files with 383 additions and 187 deletions.
46 changes: 43 additions & 3 deletions deploy/redhat/files/postfix.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ DROP TRIGGER IF EXISTS userAddressTrigger on users;
CREATE TRIGGER userAddressTrigger AFTER INSERT OR UPDATE OF address ON users
FOR EACH ROW EXECUTE PROCEDURE setUserDetails();






CREATE TABLE IF NOT EXISTS messages
(
id serial NOT NULL,
Expand All @@ -57,6 +58,8 @@ CREATE TABLE IF NOT EXISTS messages
recipient character varying,
sender character varying,
domain character varying,
guid uuid,
processing_started timestamp,
CONSTRAINT id PRIMARY KEY (id)
)
WITH (
Expand All @@ -66,7 +69,44 @@ WITH (
ALTER TABLE messages
OWNER TO direct;



CREATE OR REPLACE FUNCTION get_and_lock_next_messages(count integer, message_domains character varying[], lock_message boolean, processingExpiryAge integer)
RETURNS table(id integer, recipient character varying, sender character varying, guid uuid) AS
$BODY$
BEGIN
if not lock_message then
return query
select m.id, m.recipient, m.sender, m.guid
FROM messages m
WHERE m.domain = any(message_domains)
and (processing_started is null or
age(LOCALTIMESTAMP, processing_started) > processingExpiryAge * interval '1 second')
ORDER BY m.id
LIMIT count;
return;
end if;

--reset expired processing timestamps
UPDATE messages m SET processing_started = null WHERE m.domain = any(message_domains) and age(LOCALTIMESTAMP, processing_started) > processingExpiryAge * interval '1 second';

--get unallocated messages and set processing timestamp
return query
with ids as (
select m.id FROM messages m WHERE m.domain = any(message_domains) and processing_started is null ORDER BY m.id LIMIT count for update
),
updated as (
update messages m SET processing_started = LOCALTIMESTAMP WHERE m.id in (select * from ids) returning m.id, m.recipient, m.sender, m.guid
)
select *
from updated
ORDER BY id;
END
$BODY$
LANGUAGE plpgsql;




CREATE TABLE IF NOT EXISTS domains
(
id serial NOT NULL,
Expand Down
43 changes: 41 additions & 2 deletions deploy/ubuntu/files/postfix.sql
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ DROP TRIGGER IF EXISTS userAddressTrigger on users;
CREATE TRIGGER userAddressTrigger AFTER INSERT OR UPDATE OF address ON users
FOR EACH ROW EXECUTE PROCEDURE setUserDetails();





CREATE TABLE IF NOT EXISTS messages
(
Expand All @@ -70,6 +71,8 @@ CREATE TABLE IF NOT EXISTS messages
recipient character varying,
sender character varying,
domain character varying,
guid uuid,
processing_started timestamp,
CONSTRAINT id PRIMARY KEY (id)
)
WITH (
Expand All @@ -79,7 +82,43 @@ WITH (
ALTER TABLE messages
OWNER TO direct;



CREATE OR REPLACE FUNCTION get_and_lock_next_messages(count integer, message_domains character varying[], lock_message boolean, processingExpiryAge integer)
RETURNS table(id integer, recipient character varying, sender character varying, guid uuid) AS
$BODY$
BEGIN
if not lock_message then
return query
select m.id, m.recipient, m.sender, m.guid
FROM messages m
WHERE m.domain = any(message_domains)
and (processing_started is null or
age(LOCALTIMESTAMP, processing_started) > processingExpiryAge * interval '1 second')
ORDER BY m.id
LIMIT count;
return;
end if;

--reset expired processing timestamps
UPDATE messages m SET processing_started = null WHERE m.domain = any(message_domains) and age(LOCALTIMESTAMP, processing_started) > processingExpiryAge * interval '1 second';

--get unallocated messages and set processing timestamp
return query
with ids as (
select m.id FROM messages m WHERE m.domain = any(message_domains) and processing_started is null ORDER BY m.id LIMIT count for update
),
updated as (
update messages m SET processing_started = LOCALTIMESTAMP WHERE m.id in (select * from ids) returning m.id, m.recipient, m.sender, m.guid
)
select *
from updated
ORDER BY id;
END
$BODY$
LANGUAGE plpgsql;



CREATE TABLE IF NOT EXISTS domains
(
id serial NOT NULL,
Expand Down
4 changes: 3 additions & 1 deletion src/api/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@ module.exports = {
baseUrl: baseUrl,

//paging for resource search
pageSize: 10
pageSize: 10,
//maximum time allowed for message processing (in seconds); after this, processing is considered to have failed and the message is available again
maxMessageProcessingTime: 60
};
Loading

0 comments on commit 4ed9d6d

Please sign in to comment.