forked from PipedreamHQ/pipedream
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchanges-to-collection.js
80 lines (73 loc) · 2.61 KB
/
changes-to-collection.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
const fauna = require("../../faunadb.app.js");
const maxBy = require("lodash.maxby");
module.exports = {
key: "faunadb-changes-to-collection",
name: "New or Removed Documents in a Collection",
description:
"This source tracks add and remove events to documents in a specific collection. Each time you add or remove a document from this collection, this event source emits an event with the details of the document.",
version: "0.0.3",
dedupe: "unique", // Dedupe events based on the concatenation of event + document ref id
props: {
timer: {
type: "$.interface.timer",
default: {
intervalSeconds: 5 * 60,
},
},
db: "$.service.db",
fauna,
collection: {
propDefinition: [
fauna,
"collection",
],
},
emitEventsInBatch: {
type: "boolean",
label: "Emit changes as a single event",
description:
"If `true`, all events are emitted as an array, within a single Pipedream event. Defaults to `false`, emitting each event in Fauna as its own event in Pipedream",
optional: true,
default: false,
},
},
async run() {
// As soon as the script runs, mark the start time so we can fetch changes
// since this time on the next run. Fauna expects epoch ms as its cursor.
const ts = +new Date() * 1000;
const cursor = this.db.get("cursor") || ts;
const events = await this.fauna.getEventsInCollectionAfterTs(
this.collection,
cursor,
);
if (!events.length) {
console.log(`No new events in collection ${this.collection}`);
this.db.set("cursor", ts);
return;
}
console.log(`${events.length} new events in collection ${this.collection}`);
// Batched emits do not take advantage of the built-in deduper
if (this.emitEventsInBatch) {
this.$emit({
events,
}, {
summary: `${events.length} new event${events.length > 1
? "s"
: ""}`,
id: cursor,
});
} else {
for (const event of events) {
this.$emit(event, {
summary: `${event.action.toUpperCase()} - ${event.instance.id}`,
id: `${event.action}-${event.instance.id}`, // dedupes events based on this ID
});
}
}
// Finally, set cursor for the next run to the max timestamp of the changed events, ensuring we
// get all events after that on the next run. We need to add 1 since the timestamp filter in
// Fauna is inclusive: https://docs.fauna.com/fauna/current/api/fql/functions/paginate
const maxEventTs = maxBy(events, (event) => event.ts).ts + 1;
this.db.set("cursor", maxEventTs);
},
};