Skip to content

Commit

Permalink
Make retrieving inscriptions in block fast (ordinals#2333)
Browse files Browse the repository at this point in the history
  • Loading branch information
veryordinally authored Aug 17, 2023
1 parent 40e4807 commit 621a747
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 33 deletions.
32 changes: 8 additions & 24 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use {
BlockHashValue, Entry, InscriptionEntry, InscriptionEntryValue, InscriptionIdValue,
OutPointValue, SatPointValue, SatRange,
},
index::block_index::BlockIndex,
reorg::*,
updater::Updater,
},
Expand All @@ -13,7 +14,6 @@ use {
bitcoincore_rpc::{json::GetBlockHeaderResult, Client},
chrono::SubsecRound,
indicatif::{ProgressBar, ProgressStyle},
itertools::Itertools,
log::log_enabled,
redb::{
Database, MultimapTable, MultimapTableDefinition, ReadableMultimapTable, ReadableTable, Table,
Expand All @@ -23,6 +23,7 @@ use {
std::io::{BufWriter, Read, Write},
};

pub mod block_index;
mod entry;
mod fetcher;
mod reorg;
Expand Down Expand Up @@ -925,29 +926,12 @@ impl Index {
Ok((inscriptions, prev, next, lowest, highest))
}

pub(crate) fn get_inscriptions_in_block(&self, block_height: u64) -> Result<Vec<InscriptionId>> {
// This is a naive approach and will require optimization, but we don't have an index by block
let block_inscriptions = self
.database
.begin_read()?
.open_table(INSCRIPTION_ID_TO_INSCRIPTION_ENTRY)?
.iter()?
.filter_map(|result| match result {
Ok((key, entry_value)) => {
let entry = InscriptionEntry::load(entry_value.value());
if entry.height == block_height {
Some((InscriptionId::load(*key.value()), entry.number))
} else {
None
}
}
Err(_) => None,
})
.sorted_by_key(|&(_id, number)| number)
.map(|(id, _)| id)
.collect();

Ok(block_inscriptions)
pub(crate) fn get_inscriptions_in_block(
&self,
block_index: &BlockIndex,
block_height: u64,
) -> Result<Vec<InscriptionId>> {
block_index.get_inscriptions_in_block(self, block_height)
}

pub(crate) fn get_feed_inscriptions(&self, n: usize) -> Result<Vec<(i64, InscriptionId)>> {
Expand Down
201 changes: 201 additions & 0 deletions src/index/block_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
use super::*;

#[derive(Clone)]
pub struct BlockIndex {
first_inscription_height: u64,
lowest_blessed_by_block: Vec<i64>,
lowest_cursed_by_block: Vec<i64>,
highest_indexed_blessed: i64,
lowest_indexed_cursed: i64,
}

impl BlockIndex {
pub(crate) fn new(index: &Index) -> Result<BlockIndex> {
Ok(BlockIndex {
first_inscription_height: index.options.first_inscription_height(),
lowest_blessed_by_block: Vec::new(),
lowest_cursed_by_block: Vec::new(),
highest_indexed_blessed: i64::MIN,
lowest_indexed_cursed: i64::MAX,
})
}

pub(crate) fn update(&mut self, index: &Index) -> Result {
let index_height = index.block_count()?;
let inscribed_block_count = index_height.saturating_sub(self.first_inscription_height);
let indexed_up_to: isize = self
.lowest_blessed_by_block
.len()
.try_into()
.unwrap_or(isize::MAX);

let gap = inscribed_block_count.try_into().unwrap_or(isize::MAX) - indexed_up_to;
if gap <= 0 {
return Ok(());
}

log::debug!(
"Updating block index for {} new blocks ({} to {})",
gap,
indexed_up_to,
inscribed_block_count
);

self
.lowest_blessed_by_block
.resize(usize::try_from(inscribed_block_count)?, i64::MAX);

self
.lowest_cursed_by_block
.resize(usize::try_from(inscribed_block_count)?, i64::MAX);

let rtx = index.database.begin_read()?;

// Use a more efficient approach for the initial indexing - since we have
// to traverse all inscriptions, it is most efficient to do so using one table.
if indexed_up_to == 0 {
for result in rtx
.open_table(INSCRIPTION_ID_TO_INSCRIPTION_ENTRY)?
.iter()?
{
let (_, entry) = result?;
let entry = InscriptionEntry::load(entry.value());
let height_index: usize = entry
.height
.try_into()
.unwrap_or(usize::MAX)
.saturating_sub(self.first_inscription_height.try_into().unwrap());

if entry.number < 0 {
self.lowest_cursed_by_block[height_index] =
cmp::min(self.lowest_cursed_by_block[height_index], entry.number);
self.lowest_indexed_cursed = cmp::min(self.lowest_indexed_cursed, entry.number);
} else {
self.lowest_blessed_by_block[height_index] =
cmp::min(self.lowest_blessed_by_block[height_index], entry.number);
self.highest_indexed_blessed = cmp::max(self.highest_indexed_blessed, entry.number);
}
}
} else {
// Use default approach where we iterate in order of inscription number
// so we can easily skip over already indexed inscriptions.
let mut prev_block_height = usize::MAX;

for result in rtx
.open_table(INSCRIPTION_NUMBER_TO_INSCRIPTION_ID)?
.iter()?
{
let (number, id) = result?;

if number.value() >= self.lowest_indexed_cursed
&& number.value() <= self.highest_indexed_blessed
{
continue;
}

let inscription_id = InscriptionId::load(*id.value());

if let Some(entry) = index.get_inscription_entry(inscription_id)? {
let current_height = entry.height.try_into().unwrap_or(usize::MAX);

if prev_block_height != current_height {
prev_block_height = current_height;

if number.value() < 0 {
self.lowest_cursed_by_block[prev_block_height
.saturating_sub(usize::try_from(self.first_inscription_height)?)] = number.value();
self.lowest_indexed_cursed = cmp::min(self.lowest_indexed_cursed, number.value());
} else {
self.lowest_blessed_by_block[prev_block_height
.saturating_sub(usize::try_from(self.first_inscription_height)?)] = number.value();
self.highest_indexed_blessed = cmp::max(self.highest_indexed_blessed, number.value());
}
}
}
}
}

log::debug!(
"Updated block index for {} new blocks ({} to {})",
gap,
indexed_up_to,
inscribed_block_count
);

Ok(())
}

// Return all consecutively numbered inscriptions in the block at the given height, starting from the given number
fn get_inscriptions_in_block_from(
&self,
index: &Index,
block_height: u64,
from_number: i64,
cursed: bool,
) -> Result<Vec<InscriptionId>> {
let mut block_inscriptions = Vec::new();

let rtx = index.database.begin_read()?;
let inscription_id_by_number = rtx.open_table(INSCRIPTION_NUMBER_TO_INSCRIPTION_ID)?;

let highest = if cursed {
-1
} else {
match inscription_id_by_number.iter()?.next_back() {
Some(Ok((number, _id))) => number.value(),
Some(Err(err)) => return Err(err.into()),
None => i64::MIN,
}
};

for number in from_number..=highest {
match inscription_id_by_number.get(number)? {
Some(inscription_id) => {
let inscription_id = InscriptionId::load(*inscription_id.value());
if let Some(entry) = index.get_inscription_entry(inscription_id)? {
if entry.height != block_height {
break;
}
block_inscriptions.push(inscription_id);
}
}
None => break,
}
}

Ok(block_inscriptions)
}

pub(crate) fn get_inscriptions_in_block(
&self,
index: &Index,
block_height: u64,
) -> Result<Vec<InscriptionId>> {
if block_height >= index.block_count()? || block_height < self.first_inscription_height {
return Ok(Vec::new());
}
let lowest_cursed = self.lowest_cursed_by_block
[usize::try_from(block_height.saturating_sub(self.first_inscription_height))?];
let lowest_blessed = self.lowest_blessed_by_block
[usize::try_from(block_height.saturating_sub(self.first_inscription_height))?];

let mut inscriptions =
self.get_inscriptions_in_block_from(index, block_height, lowest_cursed, true)?;
inscriptions.extend(self.get_inscriptions_in_block_from(
index,
block_height,
lowest_blessed,
false,
)?);

log::debug!(
"Got {} inscriptions in block {} ({} - {})",
inscriptions.len(),
block_height,
lowest_cursed,
lowest_blessed
);

Ok(inscriptions)
}
}
38 changes: 31 additions & 7 deletions src/subcommand/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use {
error::{OptionExt, ServerError, ServerResult},
},
super::*,
crate::index::block_index::BlockIndex,
crate::page_config::PageConfig,
crate::templates::{
BlockHtml, ClockSvg, HomeHtml, InputHtml, InscriptionHtml, InscriptionJson, InscriptionsHtml,
Expand All @@ -29,7 +30,7 @@ use {
caches::DirCache,
AcmeConfig,
},
std::{cmp::Ordering, str, sync::Arc},
std::{cmp::Ordering, str, sync::Arc, sync::RwLock},
tokio_stream::StreamExt,
tower_http::{
compression::CompressionLayer,
Expand All @@ -46,6 +47,10 @@ pub struct ServerConfig {
pub is_json_api_enabled: bool,
}

struct BlockIndexState {
block_index: RwLock<BlockIndex>,
}

enum BlockQuery {
Height(u64),
Hash(BlockHash),
Expand Down Expand Up @@ -134,18 +139,38 @@ pub(crate) struct Server {
impl Server {
pub(crate) fn run(self, options: Options, index: Arc<Index>, handle: Handle) -> Result {
Runtime::new()?.block_on(async {
let block_index_state = BlockIndexState {
block_index: RwLock::new(BlockIndex::new(&index)?),
};

let block_index_state = Arc::new(block_index_state);

let index_clone = index.clone();
let block_index_clone = block_index_state.clone();

let index_thread = thread::spawn(move || loop {
if SHUTTING_DOWN.load(atomic::Ordering::Relaxed) {
break;
}
if let Err(error) = index_clone.update() {
log::warn!("{error}");
log::warn!("Updating index: {error}");
}
if let Err(error) = block_index_clone
.block_index
.write()
.unwrap()
.update(&index_clone)
{
log::warn!("Updating block index: {error}");
}
thread::sleep(Duration::from_millis(5000));
});
INDEXER.lock().unwrap().replace(index_thread);

let server_config = Arc::new(ServerConfig {
is_json_api_enabled: index.is_json_api_enabled(),
});

let config = options.load_config()?;
let acme_domains = self.acme_domains()?;

Expand All @@ -154,10 +179,6 @@ impl Server {
domain: acme_domains.first().cloned(),
});

let server_config = Arc::new(ServerConfig {
is_json_api_enabled: index.is_json_api_enabled(),
});

let router = Router::new()
.route("/", get(Self::home))
.route("/block/:query", get(Self::block))
Expand Down Expand Up @@ -193,6 +214,7 @@ impl Server {
.layer(Extension(index))
.layer(Extension(page_config))
.layer(Extension(Arc::new(config)))
.layer(Extension(block_index_state))
.layer(SetResponseHeaderLayer::if_not_present(
header::CONTENT_SECURITY_POLICY,
HeaderValue::from_static("default-src 'self'"),
Expand Down Expand Up @@ -1007,10 +1029,12 @@ impl Server {
async fn inscriptions_in_block(
Extension(page_config): Extension<Arc<PageConfig>>,
Extension(index): Extension<Arc<Index>>,
Extension(block_index_state): Extension<Arc<BlockIndexState>>,
Path(block_height): Path<u64>,
accept_json: AcceptJson,
) -> ServerResult<Response> {
let inscriptions = index.get_inscriptions_in_block(block_height)?;
let inscriptions = index
.get_inscriptions_in_block(&block_index_state.block_index.read().unwrap(), block_height)?;
Ok(if accept_json.0 {
Json(InscriptionsJson::new(inscriptions, None, None, None, None)).into_response()
} else {
Expand Down
11 changes: 9 additions & 2 deletions tests/json_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,15 @@ fn get_inscriptions_in_block() {
}
rpc_server.mine_blocks(1);

let server = TestServer::spawn_with_args(&rpc_server, &["--index-sats", "--enable-json-api"]);
let server = TestServer::spawn_with_args(
&rpc_server,
&[
"--index-sats",
"--enable-json-api",
"--first-inscription-height",
"0",
],
);

// get all inscriptions from block 11
let response = server.json_request(format!("/inscriptions/block/{}", 11));
Expand All @@ -330,7 +338,6 @@ fn get_inscriptions_in_block() {
let inscriptions_json: InscriptionsJson =
serde_json::from_str(&response.text().unwrap()).unwrap();

assert_eq!(inscriptions_json.inscriptions.len(), 3);
pretty_assert_eq!(
inscriptions_json.inscriptions,
vec![
Expand Down

0 comments on commit 621a747

Please sign in to comment.