Skip to content

Commit 6d85bab

Browse files
authored
Order update (#4)
* order_update: add event-store-db to docker-compose start instruction * order_update: enable monitoring on nats server * order_update: add node-nats-streaming library in order service for communication on nats network * order_update: add name option in nats connection option so such each services can be accurately identified in the http monitoring * order_update: use prefix to determine pre approved payment in payment service instead of exact keyword in the order description * order_update: merge mocked random payment provider with configurable payment provider and set non production environment to always use the mocked payment provider * order_update: propagate aggregate_id in event_data so that the subsequent processes have the correct order id for further processing * order_update: add nats-streaming client in order service * order_update: implement update order status by id in order service * order_update: add nats-worker * order_update: wrap application with nats-streaming connection and activate nats-worker * order_update: rename aggregate_id to event_id in event_data published onto nats-network instead * order_update: process confirmed order * order_update: fix incorrect event data json stringification in nats worker in payment service * order_update: fix predeclined payment prefixes is not correctly checked against * order_update: process declined payments
1 parent a10ef69 commit 6d85bab

15 files changed

+255
-32
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,6 @@ Event Sourcing Order-Payment with NATS
99
**IMPORTANT**: Due to the docker nats-streaming image not having a shell, it is not possible to wait for the db connection before start itself. Hence the docker services have to be started exactly in the following order:
1010

1111
```shell
12-
docker-compose up -d order-db nats-streaming-db
12+
docker-compose up -d order-db event-store-db nats-streaming-db
1313
docker-compose up -d nats-streaming
1414
```

docker-compose.yml

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,5 @@ services:
3737
- "--sql_driver=postgres"
3838
- "--sql_source=postgres://postgres@nats-streaming-db/postgres?sslmode=disable"
3939
- "--sql_no_caching=true"
40-
- "--sql_max_open_conns=80"
40+
- "--sql_max_open_conns=80"
41+
- "--http_port=8222"

event-store/clients/nats-streaming.js

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const natsUrl = process.env.NATS_STREAMING_SERVER_URL || "nats://0.0.0.0:4222";
77
const connectionOptions = {
88
url: natsUrl,
99
ackTimeout: 3000,
10+
name: clientId,
1011
};
1112

1213
const sc = connect(clusterId, clientId, connectionOptions);

order/clients/nats-streaming.js

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import { connect } from "node-nats-streaming";
2+
3+
const clusterId = process.env.NATS_STREAMING_CLUSTER_ID || "dev";
4+
const clientId = process.env.NATS_STREAMING_CLIENT_ID || "order";
5+
const natsUrl = process.env.NATS_STREAMING_SERVER_URL || "nats://0.0.0.0:4222";
6+
7+
const connectionOptions = {
8+
url: natsUrl,
9+
ackTimeout: 3000,
10+
name: clientId,
11+
};
12+
13+
const sc = connect(clusterId, clientId, connectionOptions);
14+
15+
export default sc;

order/package-lock.json

+42
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

order/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
"grpc": "^1.24.2",
2020
"http-status-codes": "^1.4.0",
2121
"morgan": "~1.9.1",
22+
"node-nats-streaming": "^0.3.2",
2223
"pg": "^8.2.1",
2324
"uuid": "^8.1.0"
2425
},

order/src/app.js

+10-4
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,19 @@ import express from "express";
22
import path from "path";
33
import logger from "morgan";
44
import orderRouter from "./routes/orders";
5+
import sc from "../clients/nats-streaming";
6+
import NATSStreamingWorker from "./workers/nats-streaming";
57

68
const app = express();
9+
const natsStreamingWorker = new NATSStreamingWorker(sc);
10+
sc.on("connect", ()=>{
11+
app.use(logger("dev"));
12+
app.use(express.json());
13+
app.use(express.urlencoded({ extended: false }));
714

8-
app.use(logger("dev"));
9-
app.use(express.json());
10-
app.use(express.urlencoded({ extended: false }));
15+
app.use("/", orderRouter);
1116

12-
app.use("/", orderRouter);
17+
natsStreamingWorker.activate();
18+
});
1319

1420
export default app;

order/src/services/order.js

+11
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,17 @@ class OrderService {
4949
return e;
5050
}
5151
}
52+
53+
static async updateOrderStatusById({ id, status }) {
54+
try {
55+
await pool.query(
56+
"UPDATE orders SET status=$1, updated_at=$2 WHERE id=$3",
57+
[ status, new Date().toISOString(), id ]
58+
);
59+
} catch (e) {
60+
return e;
61+
}
62+
}
5263
}
5364

5465
export default OrderService;

order/src/workers/nats-streaming.js

+138
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
import OrderService from "../services/order";
2+
import eventStoreService from "../../clients/event-store";
3+
4+
class NATSStreamingWorker {
5+
#client;
6+
7+
constructor(client) {
8+
this.#client = client;
9+
}
10+
11+
activate() {
12+
this.processConfirmedPayment();
13+
this.processDeclinedPayment();
14+
this.processConfirmedOrder();
15+
}
16+
17+
processConfirmedPayment() {
18+
const opts = this.#client.subscriptionOptions();
19+
opts.setDeliverAllAvailable();
20+
opts.setDurableName("durable_sub_payment_confirmed");
21+
22+
const durableSub = this.#client.subscribe("payment.confirmed", opts);
23+
durableSub.on("message", async (msg)=> {
24+
const eventData = JSON.parse(msg.getData());
25+
26+
const eventPayload = {
27+
event: "order_confirmed",
28+
aggregate_id: eventData.event_id,
29+
aggregate_type: "order",
30+
event_data: JSON.stringify({ event_id: eventData.event_id }),
31+
};
32+
33+
try {
34+
// update order to confirmed
35+
const res = OrderService.updateOrderStatusById({ id: eventData.event_id, status: "confirmed" });
36+
if (res instanceof Error) {
37+
console.log(res);
38+
// TODO: publish to error channel and requeue to be reprocessed (not within assessment scope)
39+
}
40+
41+
await new Promise((resolve, reject) => {
42+
eventStoreService.createEvent(eventPayload, async (err, resp) => {
43+
if (err) {
44+
reject(err);
45+
}
46+
47+
resolve();
48+
});
49+
});
50+
} catch (e) {
51+
console.log(e);
52+
// TODO: publish to error channel and requeue to be reprocessed (not within assessment scope)
53+
}
54+
});
55+
}
56+
57+
processDeclinedPayment() {
58+
const opts = this.#client.subscriptionOptions();
59+
opts.setDeliverAllAvailable();
60+
opts.setDurableName("durable_sub_order_confirmed");
61+
62+
const durableSub = this.#client.subscribe("payment.declined", opts);
63+
durableSub.on("message", async (msg)=> {
64+
const eventData = JSON.parse(msg.getData());
65+
66+
const eventPayload = {
67+
event: "order_cancelled",
68+
aggregate_id: eventData.event_id,
69+
aggregate_type: "order",
70+
event_data: JSON.stringify({ event_id: eventData.event_id }),
71+
};
72+
73+
try {
74+
// update order to confirmed
75+
const res = OrderService.updateOrderStatusById({ id:eventData.event_id, status: "cancelled" });
76+
if (res instanceof Error) {
77+
console.log(res);
78+
// TODO: publish to error channel and requeue to be reprocessed (not within assessment scope)
79+
}
80+
81+
await new Promise((resolve, reject) => {
82+
eventStoreService.createEvent(eventPayload, async (err, resp) => {
83+
if (err) {
84+
reject(err);
85+
}
86+
87+
resolve();
88+
});
89+
});
90+
} catch (e) {
91+
console.log(e);
92+
// TODO: publish to error channel and requeue to be reprocessed (not within assessment scope)
93+
}
94+
});
95+
}
96+
97+
processConfirmedOrder() {
98+
const opts = this.#client.subscriptionOptions();
99+
opts.setDeliverAllAvailable();
100+
opts.setDurableName("durable_sub_order_confirmed");
101+
102+
const durableSub = this.#client.subscribe("order.confirmed", opts);
103+
durableSub.on("message", async (msg)=> {
104+
const eventData = JSON.parse(msg.getData());
105+
106+
const eventPayload = {
107+
event: "order_delivered",
108+
aggregate_id: eventData.event_id,
109+
aggregate_type: "order",
110+
event_data: JSON.stringify({ event_id: eventData.event_id }),
111+
};
112+
113+
try {
114+
// update order to confirmed
115+
const res = OrderService.updateOrderStatusById({ id:eventData.event_id, status: "delivered" });
116+
if (res instanceof Error) {
117+
console.log(res);
118+
// TODO: publish to error channel and requeue to be reprocessed (not within assessment scope)
119+
}
120+
121+
await new Promise((resolve, reject) => {
122+
eventStoreService.createEvent(eventPayload, async (err, resp) => {
123+
if (err) {
124+
reject(err);
125+
}
126+
127+
resolve();
128+
});
129+
});
130+
} catch (e) {
131+
console.log(e);
132+
// TODO: publish to error channel and requeue to be reprocessed (not within assessment scope)
133+
}
134+
});
135+
}
136+
}
137+
138+
export default NATSStreamingWorker;
+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import { v4 as uuidV4 } from "uuid";
2+
3+
class MockedPaymentProvider {
4+
#preApprovedPaymentsPrefixes = ["test"];
5+
#preDeclinedPaymentsPrefixes = ["xxx"]
6+
7+
pay(eventData) {
8+
// check if payment description starts with pre approved keyword
9+
for (let i=0; i<this.#preApprovedPaymentsPrefixes.length; i++) {
10+
if (eventData.description.startsWith(this.#preApprovedPaymentsPrefixes[i])) {
11+
return { payment_ref: uuidV4() };
12+
}
13+
}
14+
15+
// check if payment description starts with pre declined keyword
16+
for (let i=0; i<this.#preDeclinedPaymentsPrefixes.length; i++) {
17+
if (eventData.description.startsWith(this.#preDeclinedPaymentsPrefixes[i])) {
18+
return Error("payment declined");
19+
}
20+
}
21+
22+
// else return random result
23+
return Math.random() > 0.5 ? Error("payment declined") : { payment_ref: uuidV4() };
24+
}
25+
}
26+
27+
export default MockedPaymentProvider;

payment/clients/nats-streaming.js

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ const natsUrl = process.env.NATS_STREAMING_SERVER_URL || "nats://0.0.0.0:4222";
66

77
const connectionOptions = {
88
url: natsUrl,
9+
name: clientId,
910
};
1011

1112
const sc = connect(clusterId, clientId, connectionOptions);

payment/clients/payment-configurable.js

-11
This file was deleted.

payment/clients/payment-random.js

-9
This file was deleted.

payment/services/payment.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import PaymentConfigurable from "../clients/payment-configurable";
2-
import PaymentRandom from "../clients/payment-random";
1+
import MockedPaymentProvider from "../clients/mocked-payment-provider";
32

4-
const PaymentClient = process.env.NODE_ENV == "dev" ? PaymentRandom : PaymentConfigurable;
3+
// TODO: if prod env use real payment provider client (not within the assessment scope)
4+
const PaymentClient = process.env.NODE_ENV != "prod" ? MockedPaymentProvider : null;
55

66
class PaymentService {
77
#client;

0 commit comments

Comments
 (0)