From d6c9362c621a540f72b3ecd9c7ae3c8d833f9fa6 Mon Sep 17 00:00:00 2001 From: masterchief164 <63920595+masterchief164@users.noreply.github.com> Date: Sat, 17 Jun 2023 17:43:50 +0530 Subject: [PATCH 1/5] feat: added headers first sync --- bin/bcoin | 4 +- lib/blockchain/chain.js | 16 ++- lib/net/peer.js | 12 ++ lib/net/pool.js | 79 +++++++++-- lib/node/neutrino.js | 301 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 397 insertions(+), 15 deletions(-) create mode 100644 lib/node/neutrino.js diff --git a/bin/bcoin b/bin/bcoin index 0df74c54f..840958d7b 100755 --- a/bin/bcoin +++ b/bin/bcoin @@ -43,8 +43,8 @@ for arg in "$@"; do --daemon) daemon=1 ;; - --spv) - cmd='spvnode' + --spv) + cmd='spvnode' ;; esac done diff --git a/lib/blockchain/chain.js b/lib/blockchain/chain.js index 9cd0a312f..38201fe8e 100644 --- a/lib/blockchain/chain.js +++ b/lib/blockchain/chain.js @@ -2006,16 +2006,18 @@ class Chain extends AsyncEmitter { if (this.options.checkpoints) { if (this.height < this.network.lastCheckpoint) return; - } - - if (this.tip.time < util.now() - this.network.block.maxTipAge) + } else if (!this.options.neutrino && + this.tip.time < util.now() - this.network.block.maxTipAge) return; if (!this.hasChainwork()) return; this.synced = true; - this.emit('full'); + if (this.options.neutrino) + this.emit('headersFull'); + else + this.emit('full'); } /** @@ -2616,6 +2618,7 @@ class ChainOptions { this.compression = true; this.spv = false; + this.neutrino = false; this.bip91 = false; this.bip148 = false; this.prune = false; @@ -2662,6 +2665,11 @@ class ChainOptions { this.spv = options.spv; } + if (options.neutrino != null) { + assert(typeof options.neutrino === 'boolean'); + this.neutrino = options.neutrino; + } + if (options.prefix != null) { assert(typeof options.prefix === 'string'); this.prefix = options.prefix; diff --git a/lib/net/peer.js b/lib/net/peer.js index 2271e7896..dac2e265d 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -1449,6 +1449,12 @@ class Peer extends EventEmitter { if (!(this.services & services.NETWORK)) throw new Error('Peer does not support network services.'); + if (this.options.neutrino) { + if (!(this.services & services.NODE_COMPACT_FILTERS)) { + throw new Error('Peer does not support Compact Filters.'); + } + } + if (this.options.headers) { if (this.version < common.HEADERS_VERSION) throw new Error('Peer does not support getheaders.'); @@ -2080,6 +2086,7 @@ class PeerOptions { this.agent = common.USER_AGENT; this.noRelay = false; this.spv = false; + this.neutrino = false; this.compact = false; this.headers = false; this.banScore = common.BAN_SCORE; @@ -2143,6 +2150,11 @@ class PeerOptions { this.spv = options.spv; } + if (options.neutrino != null) { + assert(typeof options.neutrino === 'boolean'); + this.neutrino = options.neutrino; + } + if (options.compact != null) { assert(typeof options.compact === 'boolean'); this.compact = options.compact; diff --git a/lib/net/pool.js b/lib/net/pool.js index 234b23bc2..6af141ac9 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -79,6 +79,7 @@ class Pool extends EventEmitter { this.pendingRefill = null; this.checkpoints = false; + this.neutrino = false; this.headerChain = new List(); this.headerNext = null; this.headerTip = null; @@ -213,7 +214,10 @@ class Pool extends EventEmitter { this.headerNext = null; const tip = this.chain.tip; - + if (this.options.neutrino) { + this.headerChain.push(new HeaderEntry(tip.hash, tip.height)); + return; + } if (tip.height < this.network.lastCheckpoint) { this.checkpoints = true; this.headerTip = this.getNextTip(tip.height); @@ -650,7 +654,10 @@ class Pool extends EventEmitter { return; this.syncing = true; - this.resync(false); + if (this.options.neutrino) { + this.startHeadersSync(); + } else + this.resync(false); } /** @@ -704,6 +711,32 @@ class Pool extends EventEmitter { this.compactBlocks.clear(); } + /** + * Start the headers sync using getHeaders messages. + * @private + * @return {Promise} + */ + + async startHeadersSync() { + if (!this.syncing) + return; + let locator; + try { + locator = await this.chain.getLocator(); + } catch (e) { + this.emit('error', e); + return; + } + + const peer = this.peers.load; + if (!peer) { + this.logger.info('No loader peer.'); + return; + } + this.chain.synced = false; + peer.sendGetHeaders(locator); + } + /** * Send a sync to each peer. * @private @@ -814,7 +847,10 @@ class Pool extends EventEmitter { peer.syncing = true; peer.blockTime = Date.now(); - + if (this.options.neutrino) { + peer.sendGetHeaders(locator); + return true; + } if (this.checkpoints) { peer.sendGetHeaders(locator, this.headerTip.hash); return true; @@ -2027,6 +2063,9 @@ class Pool extends EventEmitter { if (this.options.selfish) return; + if (this.options.neutrino) + return; + if (this.chain.options.spv) return; @@ -2139,7 +2178,8 @@ class Pool extends EventEmitter { async _handleHeaders(peer, packet) { const headers = packet.items; - if (!this.checkpoints) + if (!this.checkpoints && !this.options.neutrino) + // todo add support for checkpoints return; if (!this.syncing) @@ -2179,13 +2219,14 @@ class Pool extends EventEmitter { this.logger.warning( 'Peer sent a bad header chain (%s).', peer.hostname()); - peer.destroy(); + peer.increaseBan(10); return; } node = new HeaderEntry(hash, height); - if (node.height === this.headerTip.height) { + if (!this.options.neutrino && node.height === this.headerTip.height) { + // todo add support for checkpoints if (!node.hash.equals(this.headerTip.hash)) { this.logger.warning( 'Peer sent an invalid checkpoint (%s).', @@ -2200,6 +2241,8 @@ class Pool extends EventEmitter { this.headerNext = node; this.headerChain.push(node); + if (this.options.neutrino) + await this._addBlock(peer, header, chainCommon.flags.VERIFY_POW); } this.logger.debug( @@ -2219,7 +2262,12 @@ class Pool extends EventEmitter { } // Request more headers. - peer.sendGetHeaders([node.hash], this.headerTip.hash); + if (this.chain.synced) + return; + if (this.options.neutrino) + peer.sendGetHeaders([node.hash]); + else + peer.sendGetHeaders([node.hash], this.headerTip.hash); } /** @@ -2293,7 +2341,7 @@ class Pool extends EventEmitter { const hash = block.hash(); - if (!this.resolveBlock(peer, hash)) { + if (!this.options.neutrino && !this.resolveBlock(peer, hash)) { this.logger.warning( 'Received unrequested block: %h (%s).', block.hash(), peer.hostname()); @@ -3690,6 +3738,7 @@ class PoolOptions { this.prefix = null; this.checkpoints = true; this.spv = false; + this.neutrino = false; this.bip37 = false; this.bip157 = false; this.listen = false; @@ -3772,12 +3821,18 @@ class PoolOptions { if (options.spv != null) { assert(typeof options.spv === 'boolean'); - assert(options.spv === this.chain.options.spv); this.spv = options.spv; } else { this.spv = this.chain.options.spv; } + if (options.neutrino != null) { + assert(typeof options.neutrino === 'boolean'); + this.neutrino = options.neutrino; + assert(options.compact === false || + options.compact === undefined, 'We cannot use compact blocks'); + } + if (options.bip37 != null) { assert(typeof options.bip37 === 'boolean'); this.bip37 = options.bip37; @@ -3953,6 +4008,12 @@ class PoolOptions { this.listen = false; } + if (this.neutrino) { + this.requiredServices |= common.services.NODE_COMPACT_FILTERS; + this.checkpoints = true; + this.compact = false; + } + if (this.selfish) { this.services &= ~common.services.NETWORK; this.bip37 = false; diff --git a/lib/node/neutrino.js b/lib/node/neutrino.js new file mode 100644 index 000000000..3f693dc19 --- /dev/null +++ b/lib/node/neutrino.js @@ -0,0 +1,301 @@ +/*! + * neutrino.js - spv node for bcoin + * Copyright (c) 2023-2021, Shaswat Gupta (MIT License). + * https://github.com/bcoin-org/bcoin + */ + +'use strict'; + +const assert = require('bsert'); +const Chain = require('../blockchain/chain'); +const Pool = require('../net/pool'); +const Node = require('./node'); +const HTTP = require('./http'); +const RPC = require('./rpc'); +const blockstore = require('../blockstore'); +const FilterIndexer = require('../indexer/filterindexer'); + +/** + * Neutrino Node + * Create a neutrino node which only maintains + * a chain, a pool, and an http server. + * @alias module:node.Neutrino + * @extends Node + */ + +class Neutrino extends Node { + /** + * Create Neutrino node. + * @constructor + * @param {Object?} options + * @param {Buffer?} options.sslKey + * @param {Buffer?} options.sslCert + * @param {Number?} options.httpPort + * @param {String?} options.httpHost + */ + + constructor(options) { + super('bcoin', 'bcoin.conf', 'debug.log', options); + + this.opened = false; + + // SPV flag. + this.spv = false; + this.neutrino = true; + + // Instantiate block storage. + this.blocks = blockstore.create({ + network: this.network, + logger: this.logger, + prefix: this.config.prefix, + cacheSize: this.config.mb('block-cache-size'), + memory: this.memory, + spv: this.spv, + neutrino: this.neutrino + }); + + this.chain = new Chain({ + blocks: this.blocks, + network: this.network, + logger: this.logger, + prefix: this.config.prefix, + memory: this.memory, + maxFiles: this.config.uint('max-files'), + cacheSize: this.config.mb('cache-size'), + entryCache: this.config.uint('entry-cache'), + forceFlags: this.config.bool('force-flags'), + checkpoints: this.config.bool('checkpoints'), + bip91: this.config.bool('bip91'), + bip148: this.config.bool('bip148'), + spv: true, + neutrino: this.neutrino + }); + + this.filterIndexers.set( + 'BASIC', + new FilterIndexer({ + network: this.network, + logger: this.logger, + blocks: this.blocks, + chain: this.chain, + memory: this.config.bool('memory'), + prefix: this.config.str('index-prefix', this.config.prefix), + filterType: 'BASIC' + }) + ); + + this.pool = new Pool({ + network: this.network, + logger: this.logger, + chain: this.chain, + prefix: this.config.prefix, + checkpoints: true, + proxy: this.config.str('proxy'), + onion: this.config.bool('onion'), + upnp: this.config.bool('upnp'), + seeds: this.config.array('seeds'), + nodes: this.config.array('nodes'), + only: this.config.array('only'), + maxOutbound: this.config.uint('max-outbound'), + createSocket: this.config.func('create-socket'), + memory: this.memory, + selfish: true, + listen: false, + neutrino: this.neutrino, + spv: this.spv + }); + + this.rpc = new RPC(this); + + this.http = new HTTP({ + network: this.network, + logger: this.logger, + node: this, + prefix: this.config.prefix, + ssl: this.config.bool('ssl'), + keyFile: this.config.path('ssl-key'), + certFile: this.config.path('ssl-cert'), + host: this.config.str('http-host'), + port: this.config.uint('http-port'), + apiKey: this.config.str('api-key'), + noAuth: this.config.bool('no-auth'), + cors: this.config.bool('cors') + }); + + this.init(); + } + + /** + * Initialize the node. + * @private + */ + + init() { + console.log('Initializing Neutrino Node.'); + // Bind to errors + this.chain.on('error', err => this.error(err)); + this.pool.on('error', err => this.error(err)); + + if (this.http) + this.http.on('error', err => this.error(err)); + + this.pool.on('tx', (tx) => { + this.emit('tx', tx); + }); + + this.chain.on('connect', async (entry, block) => { + this.emit('connect', entry, block); + }); + + this.chain.on('disconnect', (entry, block) => { + this.emit('disconnect', entry, block); + }); + + this.chain.on('reorganize', (tip, competitor) => { + this.emit('reorganize', tip, competitor); + }); + + this.chain.on('reset', (tip) => { + this.emit('reset', tip); + }); + + this.chain.on('headersFull', () => { + if (this.chain.height === 0) + return; + this.logger.info('Block Headers are fully synced'); + }); + + this.loadPlugins(); + } + + /** + * Open the node and all its child objects, + * wait for the database to load. + * @returns {Promise} + */ + + async open() { + assert(!this.opened, 'Neutrino Node is already open.'); + this.opened = true; + + await this.handlePreopen(); + await this.blocks.open(); + await this.chain.open(); + await this.pool.open(); + + await this.openPlugins(); + + await this.http.open(); + await this.handleOpen(); + + this.logger.info('Node is loaded.'); + } + + /** + * Close the node, wait for the database to close. + * @returns {Promise} + */ + + async close() { + assert(this.opened, 'Neutrino Node is not open.'); + this.opened = false; + + await this.handlePreclose(); + await this.http.close(); + + await this.closePlugins(); + + await this.pool.close(); + await this.chain.close(); + await this.handleClose(); + } + + /** + * Scan for any missed transactions. + * Note that this will replay the blockchain sync. + * @param {Number|Hash} start - Start block. + * @returns {Promise} + */ + + async scan(start) { + throw new Error('Not implemented.'); + } + + /** + * Broadcast a transaction (note that this will _not_ be verified + * by the mempool - use with care, lest you get banned from + * bitcoind nodes). + * @param {TX|Block} item + * @returns {Promise} + */ + + async broadcast(item) { + try { + await this.pool.broadcast(item); + } catch (e) { + this.emit('error', e); + } + } + + /** + * Broadcast a transaction (note that this will _not_ be verified + * by the mempool - use with care, lest you get banned from + * bitcoind nodes). + * @param {TX} tx + * @returns {Promise} + */ + + sendTX(tx) { + return this.broadcast(tx); + } + + /** + * Broadcast a transaction. Silence errors. + * @param {TX} tx + * @returns {Promise} + */ + + relay(tx) { + return this.broadcast(tx); + } + + /** + * Connect to the network. + * @returns {Promise} + */ + + connect() { + return this.pool.connect(); + } + + /** + * Disconnect from the network. + * @returns {Promise} + */ + + disconnect() { + return this.pool.disconnect(); + } + + /** + * Start the blockchain sync. + */ + + startSync() { + return this.pool.startSync(); + } + + /** + * Stop syncing the blockchain. + */ + + stopSync() { + return this.pool.stopSync(); + } +} + +/* + * Expose + */ + +module.exports = Neutrino; From d21341824285a07c3f517a86d9bea73d64f51bca Mon Sep 17 00:00:00 2001 From: masterchief164 <63920595+masterchief164@users.noreply.github.com> Date: Mon, 19 Jun 2023 00:07:22 +0530 Subject: [PATCH 2/5] test: added headers first sync tests --- lib/net/pool.js | 9 ++++-- test/neutrino-test.js | 64 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 2 deletions(-) create mode 100644 test/neutrino-test.js diff --git a/lib/net/pool.js b/lib/net/pool.js index 6af141ac9..bebf8e90f 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -1674,6 +1674,12 @@ class Pool extends EventEmitter { if (this.checkpoints) return; + if (this.options.neutrino) { + const locator = await this.chain.getLocator(); + this.sendLocator(locator, peer); + return; + } + this.logger.debug( 'Received %d block hashes from peer (%s).', hashes.length, @@ -2179,7 +2185,6 @@ class Pool extends EventEmitter { const headers = packet.items; if (!this.checkpoints && !this.options.neutrino) - // todo add support for checkpoints return; if (!this.syncing) @@ -2255,7 +2260,7 @@ class Pool extends EventEmitter { peer.blockTime = Date.now(); // Request the blocks we just added. - if (checkpoint) { + if (checkpoint && !this.options.neutrino) { this.headerChain.shift(); this.resolveHeaders(peer); return; diff --git a/test/neutrino-test.js b/test/neutrino-test.js new file mode 100644 index 000000000..a73f2e61f --- /dev/null +++ b/test/neutrino-test.js @@ -0,0 +1,64 @@ +'use strict'; + +const FullNode = require('../lib/node/fullnode'); +const NeutrinoNode = require('../lib/node/neutrino'); +const {forValue} = require('./util/common'); +const assert = require('bsert'); +describe('neutrino', function () { + this.timeout(10000); + + const node1 = new NeutrinoNode({ + network: 'regtest', + memory: true, + port: 10000, + httpPort: 20000, + neutrino: true, + only: '127.0.0.1' + }); + + const node2 = new FullNode({ + network: 'regtest', + memory: true, + listen: true, + indexFilter: true, + bip157: true + }); + + async function mineBlocks(n) { + while (n) { + const block = await node2.miner.mineBlock(); + await node2.chain.add(block); + n--; + } + await forValue(node1.chain, 'height', node2.chain.height); + } + + before(async function () { + const waitForConnection = new Promise((resolve, reject) => { + node1.pool.once('peer open', async (peer) => { + resolve(peer); + }); + }); + + await node1.open(); + await node2.open(); + await node1.connect(); + await node2.connect(); + node1.startSync(); + node2.startSync(); + await mineBlocks(200); + await waitForConnection; + }); + + after(async () => { + await node1.close(); + await node2.close(); + }); + + describe('getheaders', () => { + it('should getheaders', async () => { + await mineBlocks(10); + assert.equal(node1.chain.height, node2.chain.height); + }); + }); +}); From 093b18f3ded5f8bcd13913ad30e9d1e1f6cf6e89 Mon Sep 17 00:00:00 2001 From: masterchief164 <63920595+masterchief164@users.noreply.github.com> Date: Sat, 17 Jun 2023 17:43:50 +0530 Subject: [PATCH 3/5] feat: added headers first sync --- lib/net/pool.js | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/lib/net/pool.js b/lib/net/pool.js index bebf8e90f..6af141ac9 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -1674,12 +1674,6 @@ class Pool extends EventEmitter { if (this.checkpoints) return; - if (this.options.neutrino) { - const locator = await this.chain.getLocator(); - this.sendLocator(locator, peer); - return; - } - this.logger.debug( 'Received %d block hashes from peer (%s).', hashes.length, @@ -2185,6 +2179,7 @@ class Pool extends EventEmitter { const headers = packet.items; if (!this.checkpoints && !this.options.neutrino) + // todo add support for checkpoints return; if (!this.syncing) @@ -2260,7 +2255,7 @@ class Pool extends EventEmitter { peer.blockTime = Date.now(); // Request the blocks we just added. - if (checkpoint && !this.options.neutrino) { + if (checkpoint) { this.headerChain.shift(); this.resolveHeaders(peer); return; From 90841467f4cee53a9503ec3a7acd00288585e828 Mon Sep 17 00:00:00 2001 From: masterchief164 <63920595+masterchief164@users.noreply.github.com> Date: Thu, 29 Jun 2023 02:05:58 +0530 Subject: [PATCH 4/5] feat: added neutrino mode client support --- bin/bcoin | 3 + bin/neutrino | 42 +++++++++ lib/bcoin-browser.js | 1 + lib/bcoin.js | 2 +- lib/blockchain/chain.js | 2 + lib/blockchain/chaindb.js | 51 +++++++++++ lib/blockchain/layout.js | 4 + lib/indexer/filterindexer.js | 42 +++++++++ lib/indexer/indexer.js | 1 + lib/net/peer.js | 66 ++++++++++++++ lib/net/pool.js | 161 +++++++++++++++++++++++++++++++++-- lib/node/neutrino.js | 9 ++ 12 files changed, 378 insertions(+), 6 deletions(-) create mode 100755 bin/neutrino diff --git a/bin/bcoin b/bin/bcoin index 840958d7b..56842efa5 100755 --- a/bin/bcoin +++ b/bin/bcoin @@ -43,6 +43,9 @@ for arg in "$@"; do --daemon) daemon=1 ;; + --neutrino) + cmd='neutrino' + ;; --spv) cmd='spvnode' ;; diff --git a/bin/neutrino b/bin/neutrino new file mode 100755 index 000000000..516f4207f --- /dev/null +++ b/bin/neutrino @@ -0,0 +1,42 @@ +#!/usr/bin/env node + +'use strict'; + +console.log('Starting bcoin'); +process.title = 'bcoin'; +const Neutrino = require('../lib/node/neutrino'); + +const node = new Neutrino({ + file: true, + argv: true, + env: true, + logFile: true, + logConsole: true, + logLevel: 'debug', + db: 'leveldb', + memory: false, + workers: true, + loader: require +}); + +(async () => { + await node.ensure(); + await node.open(); + await node.connect(); + node.startSync(); + + node.on('full', () => { + console.log('Full node'); + }); +})().catch((err) => { + console.error(err.stack); + process.exit(1); +}); + +process.on('unhandledRejection', (err, promise) => { + throw err; +}); + +process.on('SIGINT', async () => { + await node.close(); +}); diff --git a/lib/bcoin-browser.js b/lib/bcoin-browser.js index 1f7254be8..8b2d46cb5 100644 --- a/lib/bcoin-browser.js +++ b/lib/bcoin-browser.js @@ -89,6 +89,7 @@ bcoin.node = require('./node'); bcoin.Node = require('./node/node'); bcoin.FullNode = require('./node/fullnode'); bcoin.SPVNode = require('./node/spvnode'); +bcoin.Neutrino = require('./node/neutrino'); // Primitives bcoin.primitives = require('./primitives'); diff --git a/lib/bcoin.js b/lib/bcoin.js index 3e795f6f6..d8ae7e565 100644 --- a/lib/bcoin.js +++ b/lib/bcoin.js @@ -108,7 +108,7 @@ bcoin.define('node', './node'); bcoin.define('Node', './node/node'); bcoin.define('FullNode', './node/fullnode'); bcoin.define('SPVNode', './node/spvnode'); - +bcoin.define('Neutrino', './node/neutrino'); // Primitives bcoin.define('primitives', './primitives'); bcoin.define('Address', './primitives/address'); diff --git a/lib/blockchain/chain.js b/lib/blockchain/chain.js index 38201fe8e..ef55aeea4 100644 --- a/lib/blockchain/chain.js +++ b/lib/blockchain/chain.js @@ -23,6 +23,8 @@ const ChainEntry = require('./chainentry'); const CoinView = require('../coins/coinview'); const Script = require('../script/script'); const {VerifyError} = require('../protocol/errors'); +const {filters} = require('../blockstore/common'); +const {filtersByVal} = require('../net/common'); const thresholdStates = common.thresholdStates; /** diff --git a/lib/blockchain/chaindb.js b/lib/blockchain/chaindb.js index cb91accaa..a49cc75b2 100644 --- a/lib/blockchain/chaindb.js +++ b/lib/blockchain/chaindb.js @@ -46,6 +46,7 @@ class ChainDB { this.state = new ChainState(); this.pending = null; this.current = null; + this.neutrinoState = null; this.cacheHash = new LRU(this.options.entryCache, null, BufferMap); this.cacheHeight = new LRU(this.options.entryCache); @@ -90,6 +91,11 @@ class ChainDB { this.logger.info('ChainDB successfully initialized.'); } + if (this.options.neutrino) { + if (!this.neutrinoState) + this.neutrinoState = await this.getNeutrinoState(); + } + this.logger.info( 'Chain State: hash=%h tx=%d coin=%d value=%s.', this.state.tip, @@ -1670,6 +1676,29 @@ class ChainDB { b.put(layout.O.encode(), flags.toRaw()); return b.write(); } + + /** + * Get Neutrino State + * @returns {Promise} - Returns neutrino state + */ + + async getNeutrinoState() { + const data = await this.db.get(layout.N.encode()); + if (!data) + return new NeutrinoState(); + return NeutrinoState.fromRaw(data); + } + + /** + * Save Neutrino State + * @returns {void} + */ + async saveNeutrinoState() { + const state = this.neutrinoState.toRaw(); + const b = this.db.batch(); + b.put(layout.N.encode(), state); + return b.write(); + } } /** @@ -1952,6 +1981,28 @@ function fromU32(num) { return data; } +class NeutrinoState { + constructor() { // TODO: do we add support for multiple filters? + this.headerHeight = 0; + this.filterHeight = 0; + } + + toRaw() { + const bw = bio.write(8); + bw.writeU32(this.headerHeight); + bw.writeU32(this.filterHeight); + return bw.render(); + } + + static fromRaw(data) { + const state = new NeutrinoState(); + const br = bio.read(data); + state.headersHeight = br.readU32(); + state.filterHeight = br.readU32(); + return state; + } +} + /* * Expose */ diff --git a/lib/blockchain/layout.js b/lib/blockchain/layout.js index 337f95900..2877c7d82 100644 --- a/lib/blockchain/layout.js +++ b/lib/blockchain/layout.js @@ -14,6 +14,8 @@ const bdb = require('bdb'); * O -> chain options * R -> tip hash * D -> versionbits deployments + * N -> Neutrino Status + * F[hash] -> filterHeader * e[hash] -> entry * h[hash] -> height * H[height] -> hash @@ -33,6 +35,8 @@ const layout = { O: bdb.key('O'), R: bdb.key('R'), D: bdb.key('D'), + N: bdb.key('N'), + F: bdb.key('H', ['hash256']), e: bdb.key('e', ['hash256']), h: bdb.key('h', ['hash256']), H: bdb.key('H', ['uint32']), diff --git a/lib/indexer/filterindexer.js b/lib/indexer/filterindexer.js index 97265253b..ce25df7e6 100644 --- a/lib/indexer/filterindexer.js +++ b/lib/indexer/filterindexer.js @@ -85,6 +85,48 @@ class FilterIndexer extends Indexer { this.put(layout.f.encode(hash), gcsFilter.hash()); } + /** + * save filter header + * @param {Hash} blockHash + * @param {Hash} filterHeader + * @param {Hash} filterHash + * @returns {Promise} + */ + + async saveFilterHeader(blockHash, filterHeader, filterHash) { + assert(blockHash); + assert(filterHeader); + assert(filterHash); + + const filter = new Filter(); + filter.header = filterHeader; + + await this.blocks.writeFilter(blockHash, filter.toRaw(), this.filterType); + console.log(layout.f.encode(blockHash)); + this.put(layout.f.encode(blockHash), filterHash); + } + + /** + * Save filter + * @param {Hash} blockHash + * @param {BasicFilter} basicFilter + * @param {Hash} filterHeader + * @returns {Promise} + */ + + async saveFilter(blockHash, basicFilter, filterHeader) { + assert(blockHash); + assert(basicFilter); + assert(filterHeader); + + const filter = new Filter(); + filter.filter = basicFilter.toRaw(); + filter.header = filterHeader; + + await this.blocks.writeFilter(blockHash, filter.toRaw(), this.filterType); + this.put(layout.f.encode(blockHash), basicFilter.hash()); + } + /** * Prune compact filters. * @private diff --git a/lib/indexer/indexer.js b/lib/indexer/indexer.js index 97d85f76b..68f8487f9 100644 --- a/lib/indexer/indexer.js +++ b/lib/indexer/indexer.js @@ -76,6 +76,7 @@ class Indexer extends EventEmitter { */ put(key, value) { + console.log('put', key, value.toString('hex')); this.batch.put(key, value); } diff --git a/lib/net/peer.js b/lib/net/peer.js index dac2e265d..154c83e52 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -1009,6 +1009,12 @@ class Peer extends EventEmitter { case packetTypes.GETHEADERS: this.request(packetTypes.HEADERS, timeout * 2); break; + case packetTypes.GETCFHEADERS: + this.request(packetTypes.CFHEADERS, timeout); + break; + case packetTypes.GETCFILTERS: + this.request(packetTypes.CFILTER, timeout); + break; case packetTypes.GETDATA: this.request(packetTypes.DATA, timeout * 2); break; @@ -1751,6 +1757,26 @@ class Peer extends EventEmitter { this.send(packet); } + /** + * @param {Number} filterType - `0` = basic + * @param {Number} startHeight - Height to start at. + * @param {Hash} stopHash - Hash to stop at. + * @returns {void} + * @description Send `getcfilters` to peer. + */ + sendGetCFilters(filterType, startHeight, stopHash) { + const packet = new packets.GetCFiltersPacket( + filterType, + startHeight, + stopHash); + + this.logger.debug( + 'Sending getcfilters (type=%d, startHeight=%d, stopHash=%h).', + filterType, startHeight, stopHash); + + this.send(packet); + } + /** * Send `cfheaders` to peer. * @param {Number} filterType @@ -1773,6 +1799,27 @@ class Peer extends EventEmitter { this.send(packet); } + /** + * @param {Number} filterType + * @param {Number} startHeight + * @param {Hash} stopHash + * @returns {void} + * @description Send `getcfheaders` to peer. + */ + + sendGetCFHeaders(filterType, startHeight, stopHash) { + const packet = new packets.GetCFHeadersPacket( + filterType, + startHeight, + stopHash); + + this.logger.debug( + 'Sending getcfheaders (type=%d, start=%h, stop=%h).', + filterType, startHeight, stopHash); + + this.send(packet); + } + /** * send `cfcheckpt` to peer. * @param {Number} filterType @@ -1793,6 +1840,25 @@ class Peer extends EventEmitter { this.send(packet); } + /** + * Send `getcfcheckpt` to peer. + * @param {Number} filterType + * @param {Hash} stopHash + * @returns {void} + */ + + sendGetCFCheckpt(filterType, stopHash) { + const packet = new packets.GetCFCheckptPacket( + filterType, + stopHash); + + this.logger.debug( + 'Sending getcfcheckpt (type=%d, stop=%h).', + filterType, stopHash); + + this.send(packet); + } + /** * Send `mempool` to peer. */ diff --git a/lib/net/pool.js b/lib/net/pool.js index 6af141ac9..2a1bd7d9e 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -35,6 +35,7 @@ const packetTypes = packets.types; const scores = HostList.scores; const {inspectSymbol} = require('../utils'); const {consensus} = require('../protocol'); +const BasicFilter = require('../golomb/basicFilter'); /** * Pool @@ -79,7 +80,7 @@ class Pool extends EventEmitter { this.pendingRefill = null; this.checkpoints = false; - this.neutrino = false; + this.neutrino = this.options.neutrino; this.headerChain = new List(); this.headerNext = null; this.headerTip = null; @@ -216,6 +217,8 @@ class Pool extends EventEmitter { const tip = this.chain.tip; if (this.options.neutrino) { this.headerChain.push(new HeaderEntry(tip.hash, tip.height)); + this.cfHeaderChain = new List(); + this.cfHeaderChain.push(new CFHeaderEntry(consensus.ZERO_HASH, 0)); return; } if (tip.height < this.network.lastCheckpoint) { @@ -711,6 +714,46 @@ class Pool extends EventEmitter { this.compactBlocks.clear(); } + /** + * Start the filters headers sync. + */ + + async startFilterHeadersSync() { + this.logger.info('Starting filter headers sync (%s).', + this.chain.options.network); + if (!this.opened || !this.connected) + return; + + const startHeight = this.chain.db.neutrinoState.headerHeight + 1; + const chainHeight = await this.chain.tip.height; + const stopHeight = chainHeight > 2000 ? 2000 : chainHeight; + const stopHash = await this.chain.getHash(stopHeight); + this.peers.load.sendGetCFHeaders( + common.FILTERS.BASIC, + startHeight, + stopHash); + } + + /** + * Start the filters sync. + */ + + async startFiltersSync() { + this.logger.info('Starting filter sync (%s).', + this.chain.options.network); + if (!this.opened || !this.connected) + return; + + const startHeight = this.chain.db.neutrinoState.filterHeight + 1; + const chainHeight = await this.chain.tip.height; + const stopHeight = chainHeight > 1000 ? 1000 : chainHeight; + const stopHash = await this.chain.getHash(stopHeight); + this.peers.load.sendGetCFilters( + common.FILTERS.BASIC, + startHeight, + stopHash); + } + /** * Start the headers sync using getHeaders messages. * @private @@ -849,6 +892,8 @@ class Pool extends EventEmitter { peer.blockTime = Date.now(); if (this.options.neutrino) { peer.sendGetHeaders(locator); + if (!this.syncing) + this.startFilterHeadersSync(); return true; } if (this.checkpoints) { @@ -1230,6 +1275,12 @@ class Pool extends EventEmitter { case packetTypes.GETCFCHECKPT: await this.handleGetCFCheckpt(peer, packet); break; + case packetTypes.CFHEADERS: + await this.handleCFHeaders(peer, packet); + break; + case packetTypes.CFILTER: + await this.handleCFilter(peer, packet); + break; case packetTypes.GETBLOCKS: await this.handleGetBlocks(peer, packet); break; @@ -1281,8 +1332,6 @@ class Pool extends EventEmitter { case packetTypes.BLOCKTXN: await this.handleBlockTxn(peer, packet); break; - case packetTypes.CFILTER: - case packetTypes.CFHEADERS: case packetTypes.CFCHECKPT: case packetTypes.UNKNOWN: await this.handleUnknown(peer, packet); @@ -1674,6 +1723,14 @@ class Pool extends EventEmitter { if (this.checkpoints) return; + if (this.options.neutrino) { + const locator = await this.chain.getLocator(); + this.sendLocator(locator, peer); + if (!this.syncing) + this.startFilterHeadersSync(); + return; + } + this.logger.debug( 'Received %d block hashes from peer (%s).', hashes.length, @@ -2048,6 +2105,87 @@ class Pool extends EventEmitter { peer.sendCFCheckpt(packet.filterType, packet.stopHash, filterHeaders); } + /** + * Handle peer `cfheaders` packet. + * @method + * @private + * @param {Peer} peer + * @param {CFHeadersPacket} packet + */ + + async handleCFHeaders(peer, packet) { + if (!this.chain.synced) + return; + if (!this.options.neutrino) + return; + + this.logger.info('Received cfheaders (%s).', peer.hostname()); + const filterType = packet.filterType; + const stopHash = packet.stopHash; + let previousFilterHeader = packet.previousFilterHeader; + const filterHashes = packet.filterHashes; + let blockHeight = await this.chain.getHeight(stopHash) + - filterHashes.length; + const stopHeight = await this.chain.getHeight(stopHash); + + for (const filterHash of filterHashes) { + const basicFilter = new BasicFilter(); + basicFilter._hash = filterHash; + const filterHeader = basicFilter.header(previousFilterHeader); + const lastFilterHeader = this.cfHeaderChain.tail; + const cfHeaderEntry = new CFHeaderEntry( + filterHash, lastFilterHeader.height + 1); + this.cfHeaderChain.push(cfHeaderEntry); + // todo: verify the filterHeader + const blockHash = await this.chain.getHash(blockHeight); + const indexer = this.getFilterIndexer(filtersByVal[filterType]); + await indexer.saveFilterHeader(blockHash, filterHeader, filterHash); + previousFilterHeader = filterHeader; + // todo: add a function for this in chain.js + this.chain.db.neutrinoState.headerHeight = blockHeight; + blockHeight++; + } + await this.chain.db.saveNeutrinoState(); + console.log(this.headerChain.tail.height, stopHeight); + if (this.headerChain.tail.height <= stopHeight) { + this.logger.info('CFHeaders sync complete.'); + this.emit('cfheaders'); + } else { + const nextStopHeight = stopHeight + 2000 < this.chain.height + ? stopHeight + 2000 : this.chain.height; + const nextStopHash = await this.chain.getHash(nextStopHeight); + peer.sendGetCFHeaders(filterType, stopHeight + 1, nextStopHash); + } + } + + /** + * Handle peer `cfilter` packet. + * @method + * @private + * @param {Peer} peer + * @param {CFilterPacket} packet + * @returns {Promise} + */ + + async handleCFilter(peer, packet) { + if (!this.chain.synced) + return; + if (!this.options.neutrino) + return; + + this.logger.info('Received cfilter (%s).', peer.hostname()); + const filterType = packet.filterType; + const blockHash = packet.blockHash; + const filterBytes = packet.filterBytes; + const indexer = this.getFilterIndexer(filtersByVal[filterType]); + const basicFilter = new BasicFilter().fromRaw(filterBytes); + const filterHeader = indexer.getFilterHeader(blockHash); + await indexer.saveFilter(blockHash, basicFilter, filterHeader); + this.chain.db.neutrinoState.headerHeight = + await this.chain.getHeight(blockHash); + await this.chain.db.saveNeutrinoState(); + } + /** * Handle `getblocks` packet. * @method @@ -2179,7 +2317,6 @@ class Pool extends EventEmitter { const headers = packet.items; if (!this.checkpoints && !this.options.neutrino) - // todo add support for checkpoints return; if (!this.syncing) @@ -2255,7 +2392,7 @@ class Pool extends EventEmitter { peer.blockTime = Date.now(); // Request the blocks we just added. - if (checkpoint) { + if (checkpoint && !this.options.neutrino) { this.headerChain.shift(); this.resolveHeaders(peer); return; @@ -4555,6 +4692,20 @@ class HeaderEntry { } } +class CFHeaderEntry { + /** + * Create cfheader entry. + * @constructor + */ + + constructor(hash, height) { + this.hash = hash; + this.height = height; + this.prev = null; + this.next = null; + } +} + /* * Expose */ diff --git a/lib/node/neutrino.js b/lib/node/neutrino.js index 3f693dc19..684384b84 100644 --- a/lib/node/neutrino.js +++ b/lib/node/neutrino.js @@ -90,6 +90,7 @@ class Neutrino extends Node { chain: this.chain, prefix: this.config.prefix, checkpoints: true, + filterIndexers: this.filterIndexers, proxy: this.config.str('proxy'), onion: this.config.bool('onion'), upnp: this.config.bool('upnp'), @@ -163,6 +164,14 @@ class Neutrino extends Node { if (this.chain.height === 0) return; this.logger.info('Block Headers are fully synced'); + this.pool.startFilterHeadersSync(); + }); + + this.pool.on('cfheaders', () => { + if (this.chain.height === 0) + return; + this.logger.info('Filter Headers are fully synced'); + this.pool.startFiltersSync(); }); this.loadPlugins(); From 4c3681bbbbe8c4fd7bf619e681b17d1a85f580f8 Mon Sep 17 00:00:00 2001 From: masterchief164 <63920595+masterchief164@users.noreply.github.com> Date: Tue, 4 Jul 2023 12:18:36 +0530 Subject: [PATCH 5/5] feat: cfilters sync complete --- lib/blockchain/chain.js | 2 -- lib/indexer/filterindexer.js | 1 + lib/indexer/indexer.js | 1 - lib/net/pool.js | 21 +++++++++++++++++++-- lib/node/neutrino.js | 8 ++++++++ 5 files changed, 28 insertions(+), 5 deletions(-) diff --git a/lib/blockchain/chain.js b/lib/blockchain/chain.js index ef55aeea4..38201fe8e 100644 --- a/lib/blockchain/chain.js +++ b/lib/blockchain/chain.js @@ -23,8 +23,6 @@ const ChainEntry = require('./chainentry'); const CoinView = require('../coins/coinview'); const Script = require('../script/script'); const {VerifyError} = require('../protocol/errors'); -const {filters} = require('../blockstore/common'); -const {filtersByVal} = require('../net/common'); const thresholdStates = common.thresholdStates; /** diff --git a/lib/indexer/filterindexer.js b/lib/indexer/filterindexer.js index ce25df7e6..15ddfb231 100644 --- a/lib/indexer/filterindexer.js +++ b/lib/indexer/filterindexer.js @@ -124,6 +124,7 @@ class FilterIndexer extends Indexer { filter.header = filterHeader; await this.blocks.writeFilter(blockHash, filter.toRaw(), this.filterType); + console.log(layout.f.encode(blockHash)); this.put(layout.f.encode(blockHash), basicFilter.hash()); } diff --git a/lib/indexer/indexer.js b/lib/indexer/indexer.js index 68f8487f9..97d85f76b 100644 --- a/lib/indexer/indexer.js +++ b/lib/indexer/indexer.js @@ -76,7 +76,6 @@ class Indexer extends EventEmitter { */ put(key, value) { - console.log('put', key, value.toString('hex')); this.batch.put(key, value); } diff --git a/lib/net/pool.js b/lib/net/pool.js index 2a1bd7d9e..687f3a982 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -2156,6 +2156,8 @@ class Pool extends EventEmitter { const nextStopHash = await this.chain.getHash(nextStopHeight); peer.sendGetCFHeaders(filterType, stopHeight + 1, nextStopHash); } + this.logger.info(`CFilterHeaders sync height + ${this.chain.db.neutrinoState.headerHeight}`); } /** @@ -2179,11 +2181,26 @@ class Pool extends EventEmitter { const filterBytes = packet.filterBytes; const indexer = this.getFilterIndexer(filtersByVal[filterType]); const basicFilter = new BasicFilter().fromRaw(filterBytes); - const filterHeader = indexer.getFilterHeader(blockHash); + const filterHeader = await indexer.getFilterHeader(blockHash); await indexer.saveFilter(blockHash, basicFilter, filterHeader); - this.chain.db.neutrinoState.headerHeight = + this.chain.db.neutrinoState.filterHeight = await this.chain.getHeight(blockHash); await this.chain.db.saveNeutrinoState(); + if (this.chain.db.neutrinoState.filterHeight <= this.chain.height) { + this.logger.info('CFilter sync complete.'); + this.emit('cfilter'); + } else { + const startHeight = this.chain.db.neutrinoState.filterHeight + 1; + const chainHeight = await this.chain.tip.height; + const stopHeight = chainHeight > 1000 ? 1000 : chainHeight; + const stopHash = await this.chain.getHash(stopHeight); + peer.sendGetCFilters( + common.FILTERS.BASIC, + startHeight, + stopHash); + } + this.logger.info(`CFilter sync height + ${this.chain.db.neutrinoState.filterHeight}`); } /** diff --git a/lib/node/neutrino.js b/lib/node/neutrino.js index 684384b84..0e9291f38 100644 --- a/lib/node/neutrino.js +++ b/lib/node/neutrino.js @@ -197,6 +197,10 @@ class Neutrino extends Node { await this.http.open(); await this.handleOpen(); + for (const filterindex of this.filterIndexers.values()) { + await filterindex.open(); + } + this.logger.info('Node is loaded.'); } @@ -217,6 +221,10 @@ class Neutrino extends Node { await this.pool.close(); await this.chain.close(); await this.handleClose(); + + for (const filterindex of this.filterIndexers.values()) { + await filterindex.close(); + } } /**