Skip to content

Commit

Permalink
Add support for an optional address-based transaction index.
Browse files Browse the repository at this point in the history
* Address index is built up concurrently with the `--addrindex` flag.
* Entire index can be deleted with `--dropaddrindex`.
* New RPC call: `searchrawtransaction`
  * Returns all transacitons related to a particular address
  * Includes mempool transactions
  * Requires `--addrindex` to be activated and fully caught up.
* New `blockLogger` struct has been added to factor our common logging
  code
* Wiki and docs updated with new features.
  • Loading branch information
Roasbeef committed Feb 5, 2015
1 parent 86cbf27 commit ecdffda
Show file tree
Hide file tree
Showing 13 changed files with 983 additions and 90 deletions.
76 changes: 76 additions & 0 deletions blocklogger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package main

import (
"sync"
"time"

"github.com/btcsuite/btclog"
"github.com/btcsuite/btcutil"
)

// blockProgressLogger provides periodic logging for other services in order
// to show users progress of certain "actions" involving some or all current
// blocks. Ex: syncing to best chain, indexing all blocks, etc.
type blockProgressLogger struct {
receivedLogBlocks int64
receivedLogTx int64
lastBlockLogTime time.Time

subsystemLogger btclog.Logger
progressAction string
sync.Mutex
}

// newBlockProgressLogger returns a new block progress logger.
// The progress message is templated as follows:
// {progressAction} {numProcessed} {blocks|block} in the last {timePeriod}
// ({numTxs}, height {lastBlockHeight}, {lastBlockTimeStamp})
func newBlockProgressLogger(progressMessage string, logger btclog.Logger) *blockProgressLogger {
return &blockProgressLogger{
lastBlockLogTime: time.Now(),
progressAction: progressMessage,
subsystemLogger: logger,
}
}

// LogBlockHeight logs a new block height as an information message to show
// progress to the user. In order to prevent spam, it limits logging to one
// message every 10 seconds with duration and totals included.
func (b *blockProgressLogger) LogBlockHeight(block *btcutil.Block) {
b.Lock()
defer b.Unlock()

b.receivedLogBlocks++
b.receivedLogTx += int64(len(block.MsgBlock().Transactions))

now := time.Now()
duration := now.Sub(b.lastBlockLogTime)
if duration < time.Second*10 {
return
}

// Truncate the duration to 10s of milliseconds.
durationMillis := int64(duration / time.Millisecond)
tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10)

// Log information about new block height.
blockStr := "blocks"
if b.receivedLogBlocks == 1 {
blockStr = "block"
}
txStr := "transactions"
if b.receivedLogTx == 1 {
txStr = "transaction"
}
b.subsystemLogger.Infof("%s %d %s in the last %s (%d %s, height %d, %s)",
b.progressAction, b.receivedLogBlocks, blockStr, tDuration, b.receivedLogTx,
txStr, block.Height(), block.MsgBlock().Header.Timestamp)

b.receivedLogBlocks = 0
b.receivedLogTx = 0
b.lastBlockLogTime = now
}

func (b *blockProgressLogger) SetLastLogTime(time time.Time) {
b.lastBlockLogTime = time
}
64 changes: 18 additions & 46 deletions blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ type blockManager struct {
blockChain *blockchain.BlockChain
requestedTxns map[wire.ShaHash]struct{}
requestedBlocks map[wire.ShaHash]struct{}
progressLogger *blockProgressLogger
receivedLogBlocks int64
receivedLogTx int64
lastBlockLogTime time.Time
processingReqs bool
syncPeer *peer
msgChan chan interface{}
Expand Down Expand Up @@ -436,41 +436,6 @@ func (b *blockManager) handleDonePeerMsg(peers *list.List, p *peer) {
}
}

// logBlockHeight logs a new block height as an information message to show
// progress to the user. In order to prevent spam, it limits logging to one
// message every 10 seconds with duration and totals included.
func (b *blockManager) logBlockHeight(block *btcutil.Block) {
b.receivedLogBlocks++
b.receivedLogTx += int64(len(block.MsgBlock().Transactions))

now := time.Now()
duration := now.Sub(b.lastBlockLogTime)
if duration < time.Second*10 {
return
}

// Truncate the duration to 10s of milliseconds.
durationMillis := int64(duration / time.Millisecond)
tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10)

// Log information about new block height.
blockStr := "blocks"
if b.receivedLogBlocks == 1 {
blockStr = "block"
}
txStr := "transactions"
if b.receivedLogTx == 1 {
txStr = "transaction"
}
bmgrLog.Infof("Processed %d %s in the last %s (%d %s, height %d, %s)",
b.receivedLogBlocks, blockStr, tDuration, b.receivedLogTx,
txStr, block.Height(), block.MsgBlock().Header.Timestamp)

b.receivedLogBlocks = 0
b.receivedLogTx = 0
b.lastBlockLogTime = now
}

// handleTxMsg handles transaction messages from all peers.
func (b *blockManager) handleTxMsg(tmsg *txMsg) {
// NOTE: BitcoinJ, and possibly other wallets, don't follow the spec of
Expand Down Expand Up @@ -628,7 +593,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
// When the block is not an orphan, log information about it and
// update the chain state.

b.logBlockHeight(bmsg.block)
b.progressLogger.LogBlockHeight(bmsg.block)

// Query the db for the latest best block since the block
// that was processed could be on a side chain or have caused
Expand Down Expand Up @@ -834,7 +799,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
b.headerList.Remove(b.headerList.Front())
bmgrLog.Infof("Received %v block headers: Fetching blocks",
b.headerList.Len())
b.lastBlockLogTime = time.Now()
b.progressLogger.SetLastLogTime(time.Now())
b.fetchHeaderBlocks()
return
}
Expand Down Expand Up @@ -1167,6 +1132,12 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {
r.ntfnMgr.NotifyBlockConnected(block)
}

// If we're maintaing the address index, and it is up to date
// then update it based off this new block.
if cfg.AddrIndex && b.server.addrIndexer.IsCaughtUp() {
b.server.addrIndexer.UpdateAddressIndex(block)
}

// A block has been disconnected from the main block chain.
case blockchain.NTBlockDisconnected:
block, ok := notification.Data.(*btcutil.Block)
Expand Down Expand Up @@ -1344,14 +1315,15 @@ func newBlockManager(s *server) (*blockManager, error) {
}

bm := blockManager{
server: s,
requestedTxns: make(map[wire.ShaHash]struct{}),
requestedBlocks: make(map[wire.ShaHash]struct{}),
lastBlockLogTime: time.Now(),
msgChan: make(chan interface{}, cfg.MaxPeers*3),
headerList: list.New(),
quit: make(chan struct{}),
}
server: s,
requestedTxns: make(map[wire.ShaHash]struct{}),
requestedBlocks: make(map[wire.ShaHash]struct{}),
progressLogger: newBlockProgressLogger("Processed", bmgrLog),
msgChan: make(chan interface{}, cfg.MaxPeers*3),
headerList: list.New(),
quit: make(chan struct{}),
}
bm.progressLogger = newBlockProgressLogger("Processed", bmgrLog)
bm.blockChain = blockchain.New(s.db, s.netParams, bm.handleNotifyMsg)
bm.blockChain.DisableCheckpoints(cfg.DisableCheckpoints)
if !cfg.DisableCheckpoints {
Expand Down
11 changes: 11 additions & 0 deletions btcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ func btcdMain(serverChan chan<- *server) error {
}
defer db.Close()

if cfg.DropAddrIndex {
btcdLog.Info("Deleting entire addrindex.")
err := db.DeleteAddrIndex()
if err != nil {
btcdLog.Errorf("Unable to delete the addrindex: %v", err)
return err
}
btcdLog.Info("Successfully deleted addrindex, exiting")
return nil
}

// Ensure the database is sync'd and closed on Ctrl+C.
addInterruptHandler(func() {
btcdLog.Infof("Gracefully shutting down the database...")
Expand Down
Loading

0 comments on commit ecdffda

Please sign in to comment.