Skip to content

Commit

Permalink
Tip filtering changes.
Browse files Browse the repository at this point in the history
- This also pins mypy to a version that does not infinite loop.
  • Loading branch information
rt121212121 committed Apr 11, 2022
1 parent 368a6f8 commit 963b115
Show file tree
Hide file tree
Showing 10 changed files with 801 additions and 140 deletions.
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# A comma-separated list of package or module names from where C extensions may
# be loaded. Extensions are loading into the active Python interpreter and may
# run arbitrary code.
# extension-pkg-whitelist=PyQt5,XXX
extension-pkg-allow-list=refcuckoo

# Add files or directories to the blacklist. They should be base names, not
# paths.
Expand Down
4 changes: 2 additions & 2 deletions contrib/run_pylint.bat
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
@rem "to specify default python version to 3.9 create/edit ~/AppData/Local/py.ini with [default] set
@rem to python3=3.9"
set SDKDIR=%~dp0..
py -m pip install pylint -U
py -m pylint --rcfile %SDKDIR%\.pylintrc %SDKDIR%\esv_reference_server %SDKDIR%\server.py %SDKDIR%\contrib %SDKDIR%\unittests
py -m pip install pylint==2.12.2
py -m pylint --rcfile %SDKDIR%\.pylintrc %SDKDIR%\simple_indexer %SDKDIR%\server.py %SDKDIR%\contrib %SDKDIR%\unittests
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
bitcoinx
refcuckoo>=1.0
electrumsv_database>=1.6
pyzmq
requests>=2.21.0
4 changes: 3 additions & 1 deletion simple_indexer/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@
OUTPUT_MATCH = 1 << 0
INPUT_MATCH = 1 << 1

SERVER_HOST = "0.0.0.0"
SERVER_HOST = "127.0.0.1"
SERVER_PORT = 49241

COMMON_FILTER_LOOKAHEAD_BATCH_COUNT = 20
299 changes: 213 additions & 86 deletions simple_indexer/handlers.py

Large diffs are not rendered by default.

3 changes: 0 additions & 3 deletions simple_indexer/parse_pushdata.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import array
import logging
import struct
from hashlib import sha256
from struct import Struct

from bitcoinx.packing import struct_le_Q, struct_le_H, struct_le_I

logger = logging.getLogger('utils')

HEADER_OFFSET = 80
Expand Down
146 changes: 132 additions & 14 deletions simple_indexer/server.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
import aiohttp
import asyncio
import concurrent.futures
from http import HTTPStatus
import mmap
import os
import sys
import logging
from pathlib import Path
import queue
import threading
from typing import Dict, Optional
from typing import Dict, Optional, TypeVar

from aiohttp import web
from bitcoinx import BitcoinRegtest, CheckPoint, Headers
from bitcoinx import BitcoinRegtest, CheckPoint, hash_to_hex_str, Headers
from electrumsv_database.sqlite import DatabaseContext

from .constants import SERVER_HOST, SERVER_PORT
from .handlers_ws import SimpleIndexerWebSocket, WSClient
from . import handlers
from . import sqlite_db
from .synchronizer import Synchronizer
from .types import PushDataRow


MODULE_DIR = Path(os.path.dirname(os.path.abspath(__file__)))
Expand All @@ -25,6 +29,8 @@
aiohttp_logger = logging.getLogger("aiohttp")
aiohttp_logger.setLevel(logging.WARNING)

logger = logging.getLogger("reference")


NODE_HEADERS_MMAP_FILEPATH = MODULE_DIR.parent.joinpath('node_headers.mmap')
LOCAL_HEADERS_MMAP_FILEPATH = MODULE_DIR.parent.joinpath('local_headers.mmap')
Expand All @@ -36,6 +42,14 @@
"ffff7f2002000000"), height=0, prev_work=0
)

T2 = TypeVar("T2")


def future_callback(future: concurrent.futures.Future[None]) -> None:
if future.cancelled():
return
future.result()


class ApplicationState(object):
is_alive: bool = False
Expand All @@ -45,7 +59,7 @@ def __init__(self, app: web.Application, loop: asyncio.AbstractEventLoop) -> Non
self.app = app
self.loop = loop

if int(os.getenv('SIMPLE_INDEX_RESET', "1")):
if int(os.getenv('SIMPLE_INDEX_RESET', "0")):
self.reset_headers_stores()

self.node_headers = Headers(BitcoinRegtest, NODE_HEADERS_MMAP_FILEPATH, CHECKPOINT)
Expand All @@ -69,7 +83,7 @@ def reset_headers_stores(self) -> None:
mm.seek(0)
mm.write(b'\00' * mm.size())

# remove block headers - memory-mapped so need to do it this way to free memory immediately
# remove block headers - memory-mapped so need to do it to free memory immediately
if os.path.exists(LOCAL_HEADERS_MMAP_FILEPATH):
with open(LOCAL_HEADERS_MMAP_FILEPATH, 'w+') as f:
mm = mmap.mmap(f.fileno(), MMAP_SIZE)
Expand Down Expand Up @@ -115,16 +129,115 @@ def push_notifications_thread(self) -> None:
len(message_bytes))
for ws_client in self.get_ws_clients().values():
self.logger.debug("Sending message to websocket, ws_id=%s", ws_client.ws_id)
asyncio.run_coroutine_threadsafe(ws_client.websocket.send_bytes(message_bytes),
self.loop)
future = asyncio.run_coroutine_threadsafe(
ws_client.websocket.send_bytes(message_bytes), self.loop)
# Futures swallow exceptions so we must install a callback that raises them.
future.add_done_callback(future_callback)
except Exception:
self.logger.exception("unexpected exception in push_notifications_thread")
finally:
self.logger.info("Closing push notifications thread")
self.logger.info("Exiting push notifications thread")

def dispatch_tip_filter_notifications(self, matches: list[PushDataRow],
block_hash: Optional[bytes]) -> None:
"""
Non-blocking delivery of tip filter notifications.
"""
self.logger.debug("Starting task for dispatching tip filter notifications (%d)",
len(matches))
future = asyncio.run_coroutine_threadsafe(
self.dispatch_tip_filter_notifications_async(matches, block_hash), self.loop)
# Futures swallow exceptions so we must install a callback that raises any errors.
future.add_done_callback(future_callback)

async def dispatch_tip_filter_notifications_async(self, matches: list[PushDataRow],
block_hash: Optional[bytes]) -> None:
self.logger.debug("Entered task for dispatching tip filter notifications")
matches_by_hash = dict[bytes, list[PushDataRow]]()
for match in matches:
if match.pushdata_hash in matches_by_hash:
matches_by_hash[match.pushdata_hash].append(match)
else:
matches_by_hash[match.pushdata_hash] = [ match ]

# Get all the accounts and which pushdata they have registered.
rows = sqlite_db.read_indexer_filtering_registrations_for_notifications(
self.database_context, list(matches_by_hash))
self.logger.debug("Found %d registrations for tip filter notifications", len(rows))

if len(rows) == 0:
return

# This also allows us to identify the false positive matches. This is not critical and
# just for debugging/interest.
invalid_pushdata_hashes = set(matches_by_hash) - set(row.pushdata_hash for row in rows)
self.logger.debug("Ignored %d false positive filter matches", len(invalid_pushdata_hashes))

# Gather the true matches for each account so that we can notify them of those matches.
matches_by_account_id = dict[int, list[PushDataRow]]()
for row in rows:
matched_rows = matches_by_hash[row.pushdata_hash]
if row.account_id in matches_by_account_id:
matches_by_account_id[row.account_id].extend(matched_rows)
else:
matches_by_account_id[row.account_id] = list(matched_rows)

# TODO(1.4.0) Servers. Consider moving the callback metadata and the notifications made to
# it to the reference server. It can queue the results if they were not able to be
# delivered.
metadata_by_account_id = { row[0]: row for row
in sqlite_db.read_account_metadata(self.database_context, list(matches_by_account_id)) }
block_id = hash_to_hex_str(block_hash) if block_hash is not None else None
async with aiohttp.ClientSession() as session:
for account_id, matched_rows in matches_by_account_id.items():
if account_id not in metadata_by_account_id:
self.logger.error("Account does not have peer channel callback set, "
"account_id: %d for hashes: %s", account_id,
[ pdh.pushdata_hash.hex() for pdh in matched_rows ])
continue

account_metadata = metadata_by_account_id[account_id]
self.logger.debug("Posting matches for peer channel account %s", account_metadata)
assert account_metadata.tip_filter_callback_url is not None
assert account_metadata.tip_filter_callback_token is not None
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {account_metadata.tip_filter_callback_token}"
}
request_data = {
"blockId": block_id,
"matches": [
{
"pushDataHashHex": matched_row.pushdata_hash.hex(),
"transactionId": hash_to_hex_str(matched_row.tx_hash),
"transactionIndex": matched_row.idx,
"flags": sqlite_db.get_pushdata_match_flag(matched_row.ref_type),
} for matched_row in matched_rows
]
}
try:
async with session.post(account_metadata.tip_filter_callback_url,
headers=headers, json=request_data) as response:
if response.status != HTTPStatus.OK:
self.logger.error("Failed to post peer channel response "+
"status=%s, reason=%s", response.status, response.reason)
continue
self.logger.debug("Posted message to peer channel "+
"status=%s, reason=%s", response.status, response.reason)
except aiohttp.ClientError:
self.logger.exception("Failed to post peer channel response")

# def handle_asyncio_exception(loop: asyncio.AbstractEventLoop, context: dict[str, Any]) -> None:
# exception = context.get("exception")
# if exception is not None:
# logger.exception("Exception raised in asyncio loop", exc_info=exception)
# else:
# logger.error("Error in asyncio loop without exception, message: %s", context["message"])


def get_aiohttp_app() -> web.Application:
loop = asyncio.get_event_loop()
# loop.set_exception_handler(handle_asyncio_exception)
app = web.Application()
app_state = ApplicationState(app, loop)

Expand All @@ -137,21 +250,26 @@ def get_aiohttp_app() -> web.Application:
web.view("/ws", SimpleIndexerWebSocket),

web.get("/api/v1/endpoints", handlers.get_endpoints_data),
web.post("/api/v1/restoration/search", handlers.get_pushdata_filter_matches),

web.get("/api/v1/indexer", handlers.indexer_get_indexer_settings),
web.post("/api/v1/indexer", handlers.indexer_post_indexer_settings),

# These need to be registered before "/transaction/{txid}" to avoid clashes.
web.post("/api/v1/transaction/filter", handlers.indexer_post_transaction_filter),
web.post("/api/v1/transaction/filter:delete",
handlers.indexer_post_transaction_filter_delete),
# TODO(1.4.0) Technical debt. We can enforce txid with {txid:[a-fA-F0-9]{64}} in theory.
web.get("/api/v1/transaction/{txid}", handlers.get_transaction),

# TODO(1.4.0) Technical debt. We can enforce txid with {txid:[a-fA-F0-9]{64}} in theory.
web.get("/api/v1/merkle-proof/{txid}", handlers.get_merkle_proof),
web.post("/api/v1/restoration/search", handlers.get_restoration_matches),

web.post("/api/v1/output-spend", handlers.post_output_spends),
web.post("/api/v1/output-spend/notifications",
handlers.post_output_spend_notifications_register),
web.post("/api/v1/output-spend/notifications:unregister",
handlers.post_output_spend_notifications_unregister),

# TODO(1.4.0) Tip filter support.
# web.post("/api/v1/transaction/filter/{account_id}",
# handlers.indexer_post_transaction_filter),
# web.post("/api/v1/transaction/filter:delete/{account_id}",
# handlers.indexer_post_transaction_filter_delete),
])
return app

Expand Down
Loading

0 comments on commit 963b115

Please sign in to comment.