Skip to content

Commit

Permalink
sqs batching with some tweaks to process model so things can be "trul…
Browse files Browse the repository at this point in the history
…y" async rather than each job waiting for the others
  • Loading branch information
schuyler1d committed Aug 30, 2020
1 parent e1d377a commit 4a53873
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 72 deletions.
22 changes: 1 addition & 21 deletions lambda.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,27 +107,7 @@ exports.handler = async (event, context) => {
const job = jobs[event.command];
// behavior and arguments documented here:
// https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Lambda.html#invoke-property
const result = await job(event, function dispatcher(
dataToSend,
callback
) {
const lambda = new AWS.Lambda();
return lambda.invoke(
{
FunctionName: functionName,
InvocationType: "Event", //asynchronous
Payload: JSON.stringify(dataToSend)
},
function(err, dataReceived) {
if (err) {
console.error("Failed to invoke Lambda job: ", err);
}
if (callback) {
callback(err, dataReceived);
}
}
);
});
const result = await job(event, context);
return result;
} else {
console.error("Unfound command sent as a Lambda event: " + event.command);
Expand Down
52 changes: 28 additions & 24 deletions src/workers/job-processes.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ export const invokeJobFunction = async job => {
};

export async function processJobs() {
// DEPRECATED -- switch to job dispatchers. See src/extensions/job-runners/README.md
setupUserNotificationObservers();
console.log("Running processJobs");
// eslint-disable-next-line no-constant-condition
Expand All @@ -77,23 +78,34 @@ export async function processJobs() {

const twoMinutesAgo = new Date(new Date() - 1000 * 60 * 2);
// clear out stuck jobs
await clearOldJobs(twoMinutesAgo);
await clearOldJobs({ delay: twoMinutesAgo });
} catch (ex) {
log.error(ex);
}
}
}

export async function checkMessageQueue() {
export async function checkMessageQueue(event, contextVars) {
if (!process.env.TWILIO_SQS_QUEUE_URL) {
return;
}

console.log("checking if messages are in message queue");
while (true) {
try {
await sleep(10000);
processSqsMessages();
if (process.env.DEBUG) {
await sleep(10000);
}
await processSqsMessages();
if (
contextVars &&
typeof contextVars.remainingMilliseconds === "function"
) {
if (contextVars.remainingMilliseconds() < 5000) {
// rather than get caught half-way through a message batch, let's bail
return;
}
}
} catch (ex) {
log.error(ex);
}
Expand Down Expand Up @@ -232,7 +244,7 @@ export async function handleIncomingMessages() {
}
}

export async function runDatabaseMigrations(event, dispatcher, eventCallback) {
export async function runDatabaseMigrations(event, context, eventCallback) {
console.log("inside runDatabaseMigrations1");
console.log("inside runDatabaseMigrations2", event);
await r.k.migrate.latest();
Expand All @@ -243,11 +255,7 @@ export async function runDatabaseMigrations(event, dispatcher, eventCallback) {
return "completed migrations runDatabaseMigrations";
}

export async function databaseMigrationChange(
event,
dispatcher,
eventCallback
) {
export async function databaseMigrationChange(event, context, eventCallback) {
console.log("inside databaseMigrationChange", event);
if (event.up) {
await r.k.migrate.up();
Expand Down Expand Up @@ -282,26 +290,22 @@ const syncProcessMap = {
clearOldJobs
};

export async function dispatchProcesses(event, dispatcher, eventCallback) {
export async function dispatchProcesses(event, context, eventCallback) {
const toDispatch =
event.processes || (JOBS_SAME_PROCESS ? syncProcessMap : processMap);
for (let p in toDispatch) {
if (p in processMap) {
// / not using dispatcher, but another interesting model would be
// / to dispatch processes to other lambda invocations
// dispatcher({'command': p})
console.log("process", p);
toDispatch[p]()
.then()
.catch(err => {
await Promise.all(
Object.keys(toDispatch)
.filter(p => p in processMap)
.map(p =>
toDispatch[p](event, context).catch(err => {
console.error("Process Error", p, err);
});
}
}
})
)
);
return "completed";
}

export async function ping(event, dispatcher) {
export async function ping(event, context) {
return "pong";
}

Expand Down
57 changes: 30 additions & 27 deletions src/workers/jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
} from "../server/models";
import telemetry from "../server/telemetry";
import { log, gunzip, zipToTimeZone, convertOffsetsToStrings } from "../lib";
import { updateJob } from "./lib";
import { sleep, updateJob } from "./lib";
import serviceMap from "../server/api/lib/services";
import twilio from "../server/api/lib/twilio";
import {
Expand Down Expand Up @@ -173,33 +173,36 @@ export async function processSqsMessages() {
const p = new Promise((resolve, reject) => {
sqs.receiveMessage(params, async (err, data) => {
if (err) {
console.log(err, err.stack);
console.log("processSqsMessages Error", err, err.stack);
reject(err);
} else if (data.Messages) {
console.log(data);
for (let i = 0; i < data.Messages.length; i++) {
const message = data.Messages[i];
const body = message.Body;
console.log("processing sqs queue:", body);
const twilioMessage = JSON.parse(body);

await serviceMap.twilio.handleIncomingMessage(twilioMessage);

sqs.deleteMessage(
{
QueueUrl: process.env.TWILIO_SQS_QUEUE_URL,
ReceiptHandle: message.ReceiptHandle
},
(delMessageErr, delMessageData) => {
if (delMessageErr) {
console.log(delMessageErr, delMessageErr.stack); // an error occurred
} else {
console.log(delMessageData); // successful response
}
} else {
if (!data.Messages || !data.Messages.length) {
// Since we are likely in a while(true) loop let's avoid racing
await sleep(10000);
resolve();
} else {
console.log("processSqsMessages", data.Messages.length);
for (let i = 0; i < data.Messages.length; i++) {
const message = data.Messages[i];
const body = message.Body;
if (process.env.DEBUG) {
console.log("processSqsMessages message body", body);
}
);
const twilioMessage = JSON.parse(body);
await serviceMap.twilio.handleIncomingMessage(twilioMessage);
const delMessageData = await sqs
.deleteMessage({
QueueUrl: process.env.TWILIO_SQS_QUEUE_URL,
ReceiptHandle: message.ReceiptHandle
})
.promise()
.catch(reject);
if (process.env.DEBUG) {
console.log("processSqsMessages deleteresult", delMessageData);
}
}
resolve();
}
resolve();
}
});
});
Expand Down Expand Up @@ -1152,10 +1155,10 @@ export async function fixOrgless() {
} // if
} // function

export async function clearOldJobs(delay) {
export async function clearOldJobs(event) {
// to clear out old stuck jobs
const twoHoursAgo = new Date(new Date() - 1000 * 60 * 60 * 2);
delay = delay || twoHoursAgo;
const delay = (event && event.delay) || twoHoursAgo;
return await r
.knex("job_request")
.where({ assigned: true })
Expand Down

0 comments on commit 4a53873

Please sign in to comment.