This repository has been archived by the owner on Feb 14, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 22
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Joseph Schultz
committed
Jun 26, 2018
0 parents
commit e48d489
Showing
12 changed files
with
2,834 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
# Firebase config | ||
FB_URL="" | ||
FB_ES_COLLECTION="search" | ||
FB_REQ="request" | ||
FB_RES="response" | ||
FB_ACC="service-account.json" | ||
|
||
BONSAI_URL="" | ||
|
||
# Optional if BONSAI_URL is set | ||
ES_HOST="" | ||
ES_PORT="" | ||
ES_USER="" | ||
ES_PASS="" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
node_modules | ||
lib | ||
*-error.log | ||
service-account.json | ||
.env |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# Elasticsearch + CloudFirestore = Elasticstore | ||
----------------- | ||
|
||
A pluggable integration with ElasticSearch to provide advanced content searches in Firestore. | ||
|
||
This script can: | ||
|
||
- monitor multiple firestore collections and add/modify/remove indexed elasticsearch data in real time | ||
- communicates with client completely via Firebase (no elasticsearch client required, though a query builder is recommended) | ||
- clean up old, outdated requests (WIP) | ||
|
||
Heavily Inspired by the Realtime Database implementation (Flashlight) by the [Firebase Team](https://github.com/firebase/flashlight) | ||
|
||
|
||
## Getting Started: | ||
|
||
*Coming Soon* |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
{ | ||
"name": "elasticstore", | ||
"version": "0.0.1", | ||
"description": "A pluggable union between Firebase CloudFirestore + ElasticSearch", | ||
"main": "lib/index.js", | ||
"repository": "https://github.com/acupajoe/elasticstore", | ||
"author": "Joseph Schultz <[email protected]>", | ||
"license": "MIT", | ||
"private": false, | ||
"keywords": [ | ||
"elasticsearch", | ||
"firestore", | ||
"cloudfirestore", | ||
"firebase" | ||
], | ||
"scripts": { | ||
"build": "tsc", | ||
"develop": "tsc --watch", | ||
"start": "npm run build && node ." | ||
}, | ||
"dependencies": { | ||
"colors": "^1.3.0", | ||
"dotenv": "^6.0.0", | ||
"elasticsearch": "^15.0.0", | ||
"firebase-admin": "^5.12.1", | ||
"typescript": "^2.9.2" | ||
}, | ||
"devDependencies": { | ||
"@types/dotenv": "^4.0.3", | ||
"@types/elasticsearch": "^5.0.24", | ||
"@types/node": "^10.3.5" | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
import * as path from 'path' | ||
import * as dotenv from 'dotenv' | ||
import { Record } from './types'; | ||
|
||
const envPath = path.resolve(__dirname, '..', '.env') | ||
dotenv.config({ path: envPath }) | ||
|
||
export interface ElasticSearchOptions { | ||
requestTimeout: number | ||
maxSockets: number | ||
log: string | ||
} | ||
|
||
function processBonsaiUrl(url: string) { | ||
var matches = url.match(/^https?:\/\/([^:]+):([^@]+)@([^/]+)\/?$/) | ||
process.env.ES_HOST = matches[3] | ||
process.env.ES_PORT = "80" | ||
process.env.ES_USER = matches[1] | ||
process.env.ES_PASS = matches[2] | ||
} | ||
|
||
if (process.env.BONSAI_URL) { | ||
processBonsaiUrl(process.env.BONSAI_URL) | ||
} | ||
|
||
// Records should be added here to be indexed / made searchable | ||
const records: Array<Record> = [ | ||
{ | ||
collection: 'users', | ||
type: 'users', | ||
index: 'firestore', | ||
include: ['firstName', 'lastName', 'email'] | ||
} | ||
] | ||
|
||
class Config { | ||
public FB_URL: string = process.env.FB_URL | ||
public FB_ES_COLLECTION: string = process.env.FB_ES_COLLECTION | ||
public FB_REQ: string = process.env.FB_REQ | ||
public FB_RES: string = process.env.FB_RES | ||
public FB_SERVICE_ACCOUNT: string = process.env.FB_ACC | ||
public ES_HOST: string = process.env.ES_HOST || 'localhost' | ||
public ES_PORT: string = process.env.ES_PORT || '9200' | ||
public ES_USER: string = process.env.ES_USER || null | ||
public ES_PASS: string = process.env.ES_PASS || null | ||
public ES_OPTS: ElasticSearchOptions = { | ||
requestTimeout: 60000, | ||
maxSockets: 100, | ||
log: 'error' | ||
} | ||
public CLEANUP_INTERVAL: number = process.env.NODE_ENV === 'production' ? 3600 * 1000 /* once an hour */ : 60 * 1000 /* once a minute */ | ||
records: Array<Record> = records | ||
} | ||
|
||
export default new Config() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
import Config from './config' | ||
import * as elasticsearch from 'elasticsearch' | ||
import * as colors from 'colors' | ||
import * as admin from 'firebase-admin' | ||
import Worker from './util/Worker'; | ||
|
||
// Create Elasticsearch Client | ||
const elasticsearchClient = new elasticsearch.Client({ | ||
hosts: [{ | ||
host: Config.ES_HOST, | ||
port: Config.ES_PORT, | ||
auth: (Config.ES_USER && Config.ES_PASS) ? Config.ES_USER + ':' + Config.ES_PASS : null | ||
}], | ||
requestTimeout: Config.ES_OPTS.requestTimeout, | ||
maxSockets: Config.ES_OPTS.maxSockets, | ||
log: Config.ES_OPTS.log | ||
}) | ||
|
||
console.log(colors.grey('Connecting to ElasticSearch host %s:%s'), Config.ES_HOST, Config.ES_PORT); | ||
|
||
// Verify we are connected to Elasticsearch before continuing | ||
const retryInterval = 5000; | ||
const timeout = setInterval(async () => { | ||
try { | ||
await elasticsearchClient.ping(null) | ||
console.log(colors.green('Connected to ElasticSearch host %s:%s'), Config.ES_HOST, Config.ES_PORT); | ||
clearInterval(timeout) | ||
elasticstore(); | ||
} catch (e) { | ||
console.log(colors.red('Failed to connect to ElasticSearch host %s:%s'), Config.ES_HOST, Config.ES_PORT) | ||
console.log(colors.yellow('Retrying in... %sms'), retryInterval) | ||
} | ||
}, retryInterval) | ||
|
||
|
||
// This is the bread and butter | ||
function elasticstore() { | ||
console.log(colors.grey('Connecting to Firebase %s'), Config.FB_URL); | ||
try { | ||
// Initialize firebase | ||
admin.initializeApp({ | ||
credential: admin.credential.cert(Config.FB_SERVICE_ACCOUNT), | ||
databaseURL: Config.FB_URL | ||
}); | ||
console.log(colors.green(`Connected to Firestore: ${Config.FB_URL}`)) | ||
console.log(colors.grey('Registering worker...')) | ||
Worker.register(elasticsearchClient) | ||
} catch (e) { | ||
console.log(colors.red(e.message)) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
import * as admin from 'firebase-admin' | ||
|
||
export type FirebaseDocChangeType = "added" | "modified" | "removed" | ||
|
||
export interface Record { | ||
collection: string | admin.firestore.Query | ||
type: string | ||
index: string // "firestore" | ||
include?: Array<string> | ||
exclude?: Array<string> | ||
filter?: (data: { [key: string]: any }) => boolean | null | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
import { Record, FirebaseDocChangeType } from "../types"; | ||
import { Client } from "elasticsearch"; | ||
import * as colors from 'colors' | ||
import * as admin from 'firebase-admin' | ||
|
||
|
||
/** | ||
* FirestoreCollectionHandler | ||
* This acts as the "state-keeper" between firestore and elasticsearch. | ||
* | ||
* A collection's children are watched for event changes and their corresponding | ||
* elasticsearch records are updated. | ||
* | ||
* Firestore fires the onSnapshot listener for *EVERY* document on bind. | ||
* THIS IS EXPENSIVE. | ||
*/ | ||
export default class FirestoreCollectionHandler { | ||
private record: Record | ||
private client: Client | ||
private unsubscribe: () => void | ||
|
||
constructor(client: Client, record: Record) { | ||
this.record = record | ||
this.client = client | ||
|
||
console.log(colors.grey(` | ||
Begin listening to changes for collection: '${this.record.collection} | ||
include: [ ${this.record.include ? this.record.include.join(', ') : ''} ] | ||
exclude: [ ${this.record.exclude ? this.record.exclude.join(', ') : ''} ] | ||
`)) | ||
|
||
if (this.record.collection instanceof admin.firestore.Query) { | ||
this.unsubscribe = this.record.collection.onSnapshot(this.handleSnapshot) | ||
} else { | ||
this.unsubscribe = admin.firestore().collection(this.record.collection as string).onSnapshot(this.handleSnapshot) | ||
} | ||
} | ||
|
||
private handleSnapshot = (snap: admin.firestore.QuerySnapshot) => { | ||
for (const change of snap.docChanges) { | ||
const type: FirebaseDocChangeType = change.type | ||
switch (type) { | ||
case "added": | ||
this.handleAdded(change.doc) | ||
break; | ||
case "modified": | ||
this.handleModified(change.doc) | ||
break; | ||
case "removed": | ||
this.handleRemoved(change.doc) | ||
break; | ||
} | ||
} | ||
} | ||
|
||
private handleAdded = async (doc: admin.firestore.DocumentSnapshot) => { | ||
let body: any = this.filter(doc.data()) | ||
|
||
// Filtering has excluded this record | ||
if (!body) return | ||
|
||
try { | ||
const exists = await this.client.exists({ id: doc.id, index: this.record.index, type: this.record.type }) | ||
if (exists) { | ||
await this.client.update({ id: doc.id, index: this.record.index, type: this.record.type, body: { doc: body } }) | ||
} else { | ||
await this.client.index({ id: doc.id, index: this.record.index, type: this.record.type, body: body }) | ||
} | ||
} catch (e) { | ||
console.error(`Error on FS_ADDED handler [doc@${doc.id}]: ${e.message}`) | ||
} | ||
} | ||
|
||
private handleModified = async (doc: admin.firestore.DocumentSnapshot) => { | ||
const body = this.filter(doc.data()) | ||
|
||
// Filtering has excluded this record | ||
if (!body) return | ||
|
||
try { | ||
await this.client.update({ id: doc.id, index: this.record.index, type: this.record.type, body: { doc: body } }) | ||
} catch (e) { | ||
console.error(`Error on FS_MODIFIED handler [doc@${doc.id}]: ${e.message}`) | ||
} | ||
} | ||
|
||
private handleRemoved = async (doc: admin.firestore.DocumentSnapshot) => { | ||
try { | ||
await this.client.delete({ id: doc.id, index: this.record.index, type: this.record.type }) | ||
} catch (e) { | ||
console.error(`Error on FS_REMOVE handler [doc@${doc.id}]: ${e.message}`) | ||
} | ||
} | ||
|
||
private filter = (data: any) => { | ||
let shouldInsert = true | ||
if (this.record.filter) { | ||
shouldInsert = this.record.filter.apply(this, data) | ||
} | ||
|
||
if (!shouldInsert) { | ||
return null | ||
} | ||
|
||
if (this.record.include) { | ||
for (const key of Object.keys(data)) { | ||
if (this.record.include.indexOf(key) === -1) { | ||
delete data[key] | ||
} | ||
} | ||
} | ||
|
||
if (this.record.exclude) { | ||
for (const key of this.record.exclude) { | ||
if (data[key]) { | ||
delete data[key] | ||
} | ||
} | ||
} | ||
|
||
return data | ||
} | ||
} |
Oops, something went wrong.