Skip to content

Commit

Permalink
Get incoming and delivery reports working.
Browse files Browse the repository at this point in the history
  • Loading branch information
spakanati committed Oct 14, 2016
1 parent e378d11 commit 09a30e5
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 150 deletions.
48 changes: 22 additions & 26 deletions src/server/api/lib/message-sending.js
Original file line number Diff line number Diff line change
@@ -1,33 +1,29 @@
import { Message, PendingMessagePart, r } from '../../models'
import { r } from '../../models'

export async function getLastMessage({ userNumber, contactNumber, service }) {
const lastMessage = await r.table('message')
.getAll(contactNumber, { index: 'contact_number' })
.filter({
user_number: userNumber,
is_from_contact: false,
service
})
.orderBy(r.desc('created_at'))
.limit(1)
.pluck('assignment_id')(0)
.default(null)

const MAX_SEND_ATTEMPTS = 5

async function saveSentMessage(message, service, response, serviceMessageIds, hasError) {
const messageToSave = {
...message
}
return lastMessage
}

messageToSave.service = service
messageToSave.service_messages.push(response || null)


if (hasError) {
if (messageToSave.service_messages.length >= MAX_SEND_ATTEMPTS) {
messageToSave.send_status = 'ERROR'
}
Message.save(messageToSave, { conflict: 'update' })
.then((_, newMessage) => {
reject(err || (response ? new Error(JSON.stringify(response)) : new Error('Encountered unknown error')))
})
} else {
Message.save({
...messageToSave,
send_status: 'SENT'
}, { conflict: 'update' })
.then((saveError, newMessage) => {
resolve(newMessage)
})
}
export async function saveNewIncomingMessage (messageInstance) {
await messageInstance.save()

await r.table('campaign_contact')
.getAll(messageInstance.assignment_id, { index: 'assignment_id' })
.filter({ cell: messageInstance.contact_number })
.limit(1)
.update({ message_status: 'needsResponse' })
}
30 changes: 3 additions & 27 deletions src/server/api/lib/nexmo.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import Nexmo from 'nexmo'
import { getFormattedPhoneNumber } from '../../../lib/phone-format'
import { Message, PendingMessagePart, r } from '../../models'
import { getLastMessage } from './message-sending'
import { log } from '../../../lib'
import faker from 'faker'

Expand All @@ -13,32 +14,7 @@ if (process.env.NEXMO_API_KEY && process.env.NEXMO_API_SECRET) {
})
}

export async function getLastMessage({ userNumber, contactNumber }) {
const lastMessage = await r.table('message')
.getAll(contactNumber, { index: 'contact_number' })
.filter({
user_number: userNumber,
is_from_contact: false
})
.orderBy(r.desc('created_at'))
.limit(1)
.pluck('assignment_id')(0)
.default(null)

return lastMessage
}

export async function saveNewIncomingMessage (messageInstance) {
await messageInstance.save()

await r.table('campaign_contact')
.getAll(messageInstance.assignment_id, { index: 'assignment_id' })
.filter({ cell: messageInstance.contact_number })
.limit(1)
.update({ message_status: 'needsResponse' })
}

export async function convertMessagePartsToMessage(messageParts) {
export async function convertNexmoMessagePartsToMessage(messageParts) {
const firstPart = messageParts[0]
const userNumber = firstPart.user_number
const contactNumber = firstPart.contact_number
Expand Down Expand Up @@ -104,7 +80,7 @@ export async function rentNewCell() {
throw new Error('Did not find any cell')
}

export async function sendMessage(message) {
export async function nexmoSendMessage(message) {
if (!nexmo) {
await Message.get(message.id)
.update({ send_status: 'SENT' })
Expand Down
132 changes: 51 additions & 81 deletions src/server/api/lib/twilio.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,63 +3,41 @@ import { getFormattedPhoneNumber } from '../../../lib/phone-format'
import { Message, PendingMessagePart, r } from '../../models'
import { log } from '../../../lib'
import faker from 'faker'
import { getLastMessage } from './message-sending'

let twilio = null
if (process.env.TWILIO_API_KEY && process.env.TWILIO_AUTH_TOKEN) {
twilio = Twilio(process.env.TWILIO_API_KEY, process.env.TWILIO_AUTH_TOKEN)
}

export async function getLastMessage({ userNumber, contactNumber }) {
const lastMessage = await r.table('message')
.getAll(contactNumber, { index: 'contact_number' })
.filter({
user_number: userNumber,
is_from_contact: false
})
.orderBy(r.desc('created_at'))
.limit(1)
.pluck('assignment_id')(0)
.default(null)

return lastMessage
}

export async function saveNewIncomingMessage (messageInstance) {
await messageInstance.save()

await r.table('campaign_contact')
.getAll(messageInstance.assignment_id, { index: 'assignment_id' })
.filter({ cell: messageInstance.contact_number })
.limit(1)
.update({ message_status: 'needsResponse' })
}

export async function convertMessagePartsToMessage(messageParts) {
// const firstPart = messageParts[0]
// const userNumber = firstPart.user_number
// const contactNumber = firstPart.contact_number
// const serviceMessages = messageParts.map((part) => part.service_message)
// const text = serviceMessages
// .map((serviceMessage) => serviceMessage.text)
// .join('')

// const lastMessage = await getLastMessage({ contactNumber, userNumber })

// return new Message({
// contact_number: contactNumber,
// user_number: userNumber,
// is_from_contact: true,
// text,
// service_messages: serviceMessages,
// service_message_ids: serviceMessages.map((doc) => doc.messageId),
// assignment_id: lastMessage.assignment_id,
// service: 'nexmo',
// send_status: 'DELIVERED'
// })
export async function convertTwilioMessagePartsToMessage(messageParts) {
const firstPart = messageParts[0]
const userNumber = firstPart.user_number
const contactNumber = firstPart.contact_number
const serviceMessages = messageParts.map((part) => part.service_message)
const text = serviceMessages
.map((serviceMessage) => serviceMessage.Body)
.join('')

const lastMessage = await getLastMessage({ contactNumber, userNumber })

return new Message({
contact_number: contactNumber,
user_number: userNumber,
is_from_contact: true,
text,
service_messages: serviceMessages,
service_message_ids: serviceMessages.map((doc) => doc.MessageSid),
assignment_id: lastMessage.assignment_id,
service: 'twilio',
send_status: 'DELIVERED'
})
}

export async function findNewCell() {
if (!twilio || process.env.NODE_ENV === 'development') {

if (!twilio) {
return { availablePhoneNumbers: [{ phone_number: '+15005550006' }] }
}
return new Promise((resolve, reject) => {
Expand All @@ -80,6 +58,7 @@ export async function twilioRentNewCell() {
const newCell = await findNewCell()

if (newCell && newCell.availablePhoneNumbers && newCell.availablePhoneNumbers[0] && newCell.availablePhoneNumbers[0].phone_number) {
console.log("creating ", newCell.availablePhoneNumbers[0].phone_number)
return new Promise((resolve, reject) => {
twilio.incomingPhoneNumbers.create({
phoneNumber: newCell.availablePhoneNumbers[0].phone_number,
Expand All @@ -102,7 +81,7 @@ export async function twilioRentNewCell() {


export async function twilioSendMessage(message) {
if (!twilio) {
if (!twilio) {
await Message.get(message.id)
.update({ send_status: 'SENT' })
return 'test_message_uuid'
Expand All @@ -116,6 +95,7 @@ export async function twilioSendMessage(message) {
to: message.contact_number,
from: message.user_number,
body: message.text,
statusCallback: process.env.TWILIO_STATUS_CALLBACK_URL
}, (err, response) => {
if (err) {
console.log("ERROR", err)
Expand Down Expand Up @@ -157,7 +137,7 @@ export async function handleTwilioDeliveryReport(report) {
const messageSid = report.MessageSid
if (messageSid) {
const messageStatus = report.MessageStatus
const message = r.table('message')
const message = await r.table('message')
.getAll(messageSid, { index: 'service_message_ids' })
.limit(1)(0)
.default(null)
Expand All @@ -176,36 +156,26 @@ export async function handleTwilioDeliveryReport(report) {
}

export async function handleTwilioIncomingMessage(message) {
console.log("Got message", message)
return
// if (!message.hasOwnProperty('to') ||
// !message.hasOwnProperty('msisdn') ||
// !message.hasOwnProperty('text') ||
// !message.hasOwnProperty('messageId')) {
// log.error(`This is not an incoming message: ${JSON.stringify(message)}`)
// }

// const { to, msisdn, concat } = message
// const isConcat = concat === 'true'
// const contactNumber = getFormattedPhoneNumber(msisdn)
// const userNumber = getFormattedPhoneNumber(to)

// let parentId = ''
// if (isConcat) {
// log.info(`Incoming message part (${message['concat-part']} of ${message['concat-total']} for ref ${message['concat-ref']}) from ${contactNumber} to ${userNumber}`)
// parentId = message['concat-ref']
// } else {
// log.info(`Incoming message part from ${contactNumber} to ${userNumber}`)
// }

// const pendingMessagePart = new PendingMessagePart({
// service: 'nexmo',
// parent_id: parentId,
// service_message: message,
// user_number: userNumber,
// contact_number: contactNumber
// })

// const part = await pendingMessagePart.save()
// return part.id
if (!message.hasOwnProperty('From') ||
!message.hasOwnProperty('To') ||
!message.hasOwnProperty('Body') ||
!message.hasOwnProperty('MessageSid')) {
log.error(`This is not an incoming message: ${JSON.stringify(message)}`)
}

const { From, To, Body, MessageSid } = message

const contactNumber = getFormattedPhoneNumber(From)
const userNumber = getFormattedPhoneNumber(To)

const pendingMessagePart = new PendingMessagePart({
service: 'twilio',
parent_id: '',
service_message: message,
user_number: userNumber,
contact_number: contactNumber
})

const part = await pendingMessagePart.save()
return part.id
}
1 change: 1 addition & 0 deletions src/server/models/pending-message-part.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ const PendingMessagePart = thinky.createModel('pending_message_part', type.objec
}).allowExtra(false))

PendingMessagePart.ensureIndex('parent_id')
PendingMessagePart.ensureIndex('service')

export default PendingMessagePart
41 changes: 32 additions & 9 deletions src/workers/incoming-message-handler.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,27 @@
import { saveNewIncomingMessage, getLastMessage, convertMessagePartsToMessage } from '../server/api/lib/nexmo'
import { convertNexmoMessagePartsToMessage } from '../server/api/lib/nexmo'
import { convertTwilioMessagePartsToMessage } from '../server/api/lib/twilio'
import { saveNewIncomingMessage, getLastMessage } from '../server/api/lib/message-sending'
import { r } from '../server/models'
import { log } from '../lib'

async function sleep(ms = 0) {
return new Promise(fn => setTimeout(fn, ms))
}

async function handlePendingIncomingMessageParts() {
const getMessageId = (part) => {
let messageId
if (part.service === 'nexmo') {
messageId = part.service_message.messageId
} else if (part.service === 'twilio') {
messageId = part.service_message.MessageSid
}
return messageId
}

async function handleIncomingMessageParts(service) {
const convertMessageParts = service === 'nexmo' ? convertNexmoMessagePartsToMessage : convertTwilioMessagePartsToMessage
const allParts = await r.table('pending_message_part')
.getAll(service, { index: 'service' })
const messagesToSave = []
let messagePartsToDelete = []
const concatMessageParts = {}
Expand All @@ -16,29 +30,37 @@ async function handlePendingIncomingMessageParts() {

for (let i = 0; i < allPartsCount; i++) {
const part = allParts[i]
const serviceMessageId = part.service_message.messageId

const serviceMessageId = getMessageId(part)
const savedCount = await r.table('message')
.getAll(serviceMessageId, { index: 'service_message_ids' })
.count()

const lastMessage = await getLastMessage({ userNumber: part.user_number, contactNumber: part.contact_number })
const lastMessage = await getLastMessage({
userNumber: part.user_number,
contactNumber: part.contact_number,
service
})

const duplicateMessageToSaveExists = !!messagesToSave.find((message) => message.service_message_ids.indexOf(serviceMessageId) !== -1 )
if (!lastMessage) {
log.info('Received message part with no thread to attach to', part)
messagePartsToDelete.push(part)
} else if (savedCount > 0) {
log.info(`Found already saved message matching part service message ID ${part.service_message.messageId}`)
log.info(`Found already saved message matching part service message ID ${getMessageId(part)}`)
messagePartsToDelete.push(part)
} else if (duplicateMessageToSaveExists) {
log.info(`Found duplicate message to be saved matching part service message ID ${part.service_message.messageId}`)
log.info(`Found duplicate message to be saved matching part service message ID ${getMessageId(part)}`)
messagePartsToDelete.push(part)
} else {
const parentId = part.parent_id
if (parentId === '') {
messagesToSave.push(await convertMessagePartsToMessage([part]))
messagesToSave.push(await convertMessageParts([part]))
messagePartsToDelete.push(part)
} else {
if (part.service !== 'nexmo') {
throw new Error('should not have a parent ID for twilio')
}
const groupKey = [parentId, part.contact_number, part.user_number]

if (!concatMessageParts.hasOwnProperty(groupKey)){
Expand All @@ -65,7 +87,7 @@ async function handlePendingIncomingMessageParts() {

if (messageParts.filter((part) => part === null).length === 0) {
messagePartsToDelete = messagePartsToDelete.concat(messageParts)
const message = await convertMessagePartsToMessage(messageParts)
const message = await convertMessageParts(messageParts)
messagesToSave.push(message)
}
}
Expand All @@ -88,7 +110,8 @@ async function handlePendingIncomingMessageParts() {
while (true) {
try {
await sleep(100)
await handlePendingIncomingMessageParts()
await handleIncomingMessageParts('twilio')
await handleIncomingMessageParts('nexmo')
} catch (ex) {
log.error(ex)
}
Expand Down
Loading

0 comments on commit 09a30e5

Please sign in to comment.