Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore: refactor messages #1357

Merged
merged 8 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
updates
  • Loading branch information
SamSalvatico committed Dec 19, 2024
commit 47ca8ff5c98093ad29cd6d513f6bd7d091af4faa
2 changes: 1 addition & 1 deletion apps/messaging-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"@fastify/type-provider-typebox": "^5.1.0",
"@fastify/under-pressure": "^9.0.2",
"@ogcio/api-auth": "^5.1.1",
"@ogcio/building-blocks-sdk": "^0.1.12",
"@ogcio/building-blocks-sdk": "^0.1.13",
"@ogcio/fastify-error-handler": "^5.1.0",
"@ogcio/fastify-logging-wrapper": "^5.1.2",
"@sinclair/typebox": "^0.34.11",
Expand Down
17 changes: 8 additions & 9 deletions apps/messaging-api/src/routes/messages/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { ensureUserCanAccessUser } from "@ogcio/api-auth";
import type { FastifyInstance } from "fastify";
import { processMessages } from "../../services/messages/messages.js";
import { Permissions } from "../../types/permissions.js";
import { ensureOrganizationIdIsSet, ensureUserIdIsSet } from "../../utils/authentication-factory.js";
import {
Expand All @@ -21,6 +20,7 @@ import {
import {
listMessages,
getMessage,
processMessages,
} from "../../services/messages/message-service.js";

export const prefix = "/messages";
Expand Down Expand Up @@ -105,29 +105,28 @@ export default async function messages(app: FastifyInstance) {
request.userData,
request.body.recipientUserId,
);
const userOrganizationId = ensureOrganizationIdIsSet
const senderUser = {
profileId: ensureUserIdIsSet(request),
organizationId: ensureOrganizationIdIsSet(request),
isM2MApplication: request.userData?.isM2MApplication ?? false,
};
const messages = await processMessages({
inputMessages: [
messages: [
{
receiverUserId: request.body.recipientUserId,
...request.body,
...request.body.message,
organisationId: userData.organizationId ?? "",
senderUserProfileId: ensureUserIdIsSet(request),
attachments: request.body.attachments ?? [],
},
senderApplicationId: request.userData?.isM2MApplication ? userData.userId : null
}
],
scheduleAt: request.body.scheduleAt,
pgPool: app.pg.pool,
pool: app.pg.pool,
logger: request.log,
senderUser,
isM2MApplicationSender: request.userData?.isM2MApplication ?? false,
allOrNone: true,
organizationId: userData.organizationId,
sender: senderUser,
throwAtFirstError: true,
});
if (messages.errors.length > 0) {
throw messages.errors[0];
Expand Down
150 changes: 123 additions & 27 deletions apps/messaging-api/src/services/messages/message-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@ import {
} from "./eventLogger.js";
import { MessagingService, newMessagingService } from "./messaging.js";
import { FastifyBaseLogger } from "fastify";
import {
Profile,
Upload,
} from "@ogcio/building-blocks-sdk/dist/types/index.js";
import {
getUserProfiles,
MessagingUserProfile,
} from "../users/shared-users.js";
import { BuildingBlocksSDK } from "@ogcio/building-blocks-sdk";

export type CreateMessageParams = {
receiverUserId: string;
Expand Down Expand Up @@ -129,7 +126,12 @@ export async function getMessage(params: {
return data.rows[0];
}

export async function createMessages(params: {
type CreateMessagesOutput = {
scheduledMessages: { jobId: string; userId: string; entityId: string }[];
errors: HttpError[];
};

export async function processMessages(params: {
pool: Pool;
sender: {
profileId: string;
Expand All @@ -138,8 +140,11 @@ export async function createMessages(params: {
};
messages: CreateMessageParams[];
logger: FastifyBaseLogger;
}): Promise<string> {
const { pool, sender, messages, logger } = params;
throwAtFirstError: boolean;
scheduleAt: string;
}): Promise<CreateMessagesOutput> {
const { pool, sender, messages, logger, throwAtFirstError, scheduleAt } =
params;
await ensureCanSendMessagesWithoutBypassingConsent({
organizationId: sender.organizationId,
inputMessages: messages,
Expand All @@ -149,16 +154,16 @@ export async function createMessages(params: {
const client = await pool.connect();
const messageService = newMessagingService(client);
const eventLogger = newMessagingEventLogger(pool, logger);
const toScheduleMessages = [];
const eventLoggingEntries = [];
const outputMessages: {
scheduledMessages: { jobId: string; userId: string; entityId: string }[];
errors: HttpError[];
} = { scheduledMessages: [], errors: [] };

const uploadSdk = await getUploadSdk(logger, sender.organizationId);
const senderData = await getSenderData(sender, logger);
let categorizedMessages: {
toBeScheduled: { messageId: string; userId: string }[];
withError: HttpError[];
eventLoggingEntries: { messageId: string }[];
} = { toBeScheduled: [], withError: [], eventLoggingEntries: [] };
try {
await client.query("BEGIN");
const insertedMessages = await insertMessages({
client,
messageService,
Expand All @@ -167,10 +172,43 @@ export async function createMessages(params: {
senderData,
eventLogger,
});
categorizedMessages = await categorizeMessages({
messages: insertedMessages,
throwAtFirstError,
});
await client.query("COMMIT");
} catch (error) {
await client.query("ROLLBACK");
httpErrors.createError(500, "Message creation failed", { parent: error });
throw httpErrors.createError(500, "Message creation failed", {
parent: error,
});
} finally {
client.release();
}

const output: CreateMessagesOutput = {
scheduledMessages: [],
errors: categorizedMessages.withError,
};
if (categorizedMessages.toBeScheduled.length === 0) {
return output;
}

await eventLogger.log(
MessagingEventType.scheduleMessage,
categorizedMessages.eventLoggingEntries,
);

output.scheduledMessages = await scheduleMessagesWithLog({
messageService,
eventLogger,
organizationId: sender.organizationId,
toScheduleMessages: categorizedMessages.toBeScheduled,
logger,
scheduleAt,
});

return output;
}

type SenderData = { organizationId: string } & (
Expand Down Expand Up @@ -236,26 +274,24 @@ async function getSenderData(
}

type InsertedMessage = {
createdMessage?: {
id: string;
user_id: string;
profile: MessagingUserProfile & {
fullName: string;
};
createdMessage?: {
id: string;
user_id: string;
profile: MessagingUserProfile & {
fullName: string;
};
error?: HttpError;
};
error?: HttpError;
};

async function insertMessages(params: {
client: PoolClient;
messageService: Readonly<MessagingService>;
eventLogger: MessagingEventLogger;
uploadSdk: Upload;
uploadSdk: BuildingBlocksSDK["upload"];
messages: CreateMessageParams[];
senderData: SenderData;
}): Promise<
InsertedMessage[]
> {
}): Promise<InsertedMessage[]> {
const createPromises = [];
for (const toCreate of params.messages) {
createPromises.push(
Expand All @@ -274,6 +310,39 @@ InsertedMessage[]
return Promise.all(createPromises);
}

async function categorizeMessages(params: {
messages: InsertedMessage[];
throwAtFirstError: boolean;
}): Promise<{
toBeScheduled: { messageId: string; userId: string }[];
withError: HttpError[];
eventLoggingEntries: { messageId: string }[];
}> {
const eventLoggingEntries = [];
const toBeScheduled = [];
const errors = [];
for (const createdMessage of params.messages) {
const messageData = createdMessage.createdMessage;
if (!messageData) {
// if all or none is set to true
// fails if one creation fails
if (params.throwAtFirstError) {
throw createdMessage.error;
}
errors.push(createdMessage.error ?? httpErrors.internalServerError());
continue;
}

eventLoggingEntries.push({ messageId: messageData.id });

toBeScheduled.push({
messageId: messageData.id,
userId: messageData.user_id,
});
}
return { toBeScheduled, eventLoggingEntries, withError: errors };
}

async function ensureCanSendMessagesWithoutBypassingConsent(params: {
organizationId: string;
pool: Pool;
Expand Down Expand Up @@ -527,7 +596,7 @@ const createMessageWithLog = async (params: {
eventLogger: MessagingEventLogger;
createMessageParams: Omit<CreateMessageParams, "senderApplicationId">;
poolClient: PoolClient;
uploadClient: Upload;
uploadClient: BuildingBlocksSDK["upload"];
organizationId?: string;
}): Promise<{
createdMessage?: {
Expand Down Expand Up @@ -617,7 +686,7 @@ const createMessageWithLog = async (params: {
};

async function checkAttachments(params: {
uploadClient: Upload;
uploadClient: BuildingBlocksSDK["upload"];
userProfileId: string;
attachmentIds: string[];
organizationId?: string;
Expand Down Expand Up @@ -653,3 +722,30 @@ async function checkAttachments(params: {
}
}
}

async function scheduleMessagesWithLog(params: {
messageService: MessagingService;
eventLogger: MessagingEventLogger;
organizationId: string;
scheduleAt: string;
toScheduleMessages: { messageId: string; userId: string }[];
logger: FastifyBaseLogger;
}): Promise<{ jobId: string; userId: string; entityId: string }[]> {
try {
return await params.messageService.scheduleMessages(
params.toScheduleMessages,
params.scheduleAt,
params.organizationId,
params.logger,
);
} catch (error) {
await params.eventLogger.log(
MessagingEventType.scheduleMessageError,
params.toScheduleMessages,
);

throw httpErrors.createError(503, "Error scheduling messages", {
parent: error,
});
}
}
Loading