Skip to content
This repository has been archived by the owner on Mar 2, 2023. It is now read-only.

Make accounter schedule transactions explicitly #17

Merged
merged 1 commit into from
Oct 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Make accounter schedule transactions explicitly
This should:
  a) simplify backend logic. There are now AddrRequest<->AddrResponses and
     TxRequest<->TxResponses pipelines. Backends don't have to track
     transactions that are seen in order to avoid sending duplicate
     entries to the accounter.
  b) make sure accounter stops when all the transactions are received
     from the backend. The accounter keeps track of derived and processed
     addresses. It can now see how many transactions each address has
     and then schedule those transactions accordingly with the backend.
     Then it knows, how many (non-duplicate) transactions it has to receive
     before completing the fetchTransactions step and moving to
     processing all the transaction information.
  • Loading branch information
mbyczkowski committed Oct 16, 2018
commit 27a8002fc91c533f7d5938492910e74a8fc88071
18 changes: 1 addition & 17 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 29 additions & 8 deletions accounter/accounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ type Accounter struct {
deriver *deriver.AddressDeriver
lookahead uint32

countMu sync.Mutex // protects lastAddresses, derivedCount and checkedCount
lastAddresses [2]uint32
derivedCount uint32
checkedCount uint32
countMu sync.Mutex // protects lastAddresses, derivedAddrCount and processedAddrCount
lastAddresses [2]uint32
derivedAddrCount uint32
processedAddrCount uint32
seenTxCount uint32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could use a single sync.Wg for all the backend channels here. Increment it before we write to any channel, decrement it when we read data.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this give us anything useful? We keep generating more addresses, so at some point WaitGroup can be done, but there might be more data in-flight. Maybe there's an elegant solution I cannot yet see 🤔

processedTxCount uint32

addrResponses <-chan *backend.AddrResponse
txResponses <-chan *backend.TxResponse
Expand Down Expand Up @@ -112,10 +114,12 @@ func (a *Accounter) processTransactions() {
for hash, tx := range a.transactions {
// remove transactions which are too recent
if tx.height > int64(a.blockHeight) {
reporter.GetInstance().Logf("transaction %s has height %d > BLOCK HEIGHT (%d)", hash, tx.height, a.blockHeight)
delete(a.transactions, hash)
}
// remove transactions which haven't been mined
if tx.height <= 0 {
reporter.GetInstance().Logf("transaction %s has not been mined, yet (height=%d)", hash, tx.height)
delete(a.transactions, hash)
}
}
Expand Down Expand Up @@ -223,7 +227,7 @@ func (a *Accounter) sendWork() {
// increment the number of addresses which have been derived
addr := a.deriver.Derive(change, indexes[change])
a.countMu.Lock()
a.derivedCount++
a.derivedAddrCount++
a.countMu.Unlock()
a.backend.AddrRequest(addr)
indexes[change]++
Expand All @@ -248,15 +252,24 @@ func (a *Accounter) recvWork() {
reporter.GetInstance().IncAddressesFetched()

a.countMu.Lock()
a.checkedCount++
a.processedAddrCount++
a.countMu.Unlock()

a.addresses[resp.Address.Script()] = address{
path: resp.Address,
txHashes: resp.TxHashes,
}

reporter.GetInstance().Log(fmt.Sprintf("address %s has %d transactions", resp.Address, len(resp.TxHashes)))
a.countMu.Lock()
for _, txHash := range resp.TxHashes {
if _, exists := a.transactions[txHash]; !exists {
a.backend.TxRequest(txHash)
a.seenTxCount++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you want to increment a.seenTxCount before making the request?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. sloppy coding 😅

}
}
a.countMu.Unlock()

reporter.GetInstance().Logf("address %s has %d transactions", resp.Address, len(resp.TxHashes))

if resp.HasTransactions() {
a.countMu.Lock()
Expand All @@ -272,6 +285,10 @@ func (a *Accounter) recvWork() {

reporter.GetInstance().IncTxFetched()

a.countMu.Lock()
a.processedTxCount++
a.countMu.Unlock()

tx := transaction{
height: resp.Height,
hex: resp.Hex,
Expand Down Expand Up @@ -303,6 +320,10 @@ func (a *Accounter) complete() bool {
defer a.countMu.Unlock()

// We are done when the right number of addresses were scheduled, fetched and processed
// *and* all the transactions that were seen have been scheduled, fetched and processed.
indexes := a.lastAddresses[0] + a.lastAddresses[1]
return a.derivedCount == indexes && a.checkedCount == indexes
addrsDone := a.derivedAddrCount == indexes && a.processedAddrCount == indexes
txsDone := a.seenTxCount == a.processedTxCount

return addrsDone && txsDone
}
1 change: 1 addition & 0 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
type Backend interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll have to eventually update the comment above this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mostly addressed the comment for the Backend interface in other PRs, right?

AddrRequest(addr *deriver.Address)
AddrResponses() <-chan *AddrResponse
TxRequest(txHash string)
TxResponses() <-chan *TxResponse

ChainHeight() uint32
Expand Down
134 changes: 92 additions & 42 deletions backend/btcd_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ type BtcdBackend struct {
// channels used to communicate with the Accounter
addrRequests chan *deriver.Address
addrResponses chan *AddrResponse
txRequests chan string
txResponses chan *TxResponse

// internal channels
transactionsMu sync.Mutex // mutex to guard read/writes to transactions map
transactions map[string]int64

chainHeight uint32
// internal channels
transactionsMu sync.Mutex // mutex to guard read/writes to transactions map
cachedTransactions map[string]*TxResponse
doneCh chan bool
}

const (
Expand Down Expand Up @@ -79,14 +80,16 @@ func NewBtcdBackend(hostPort, user, pass string, network utils.Network) (*BtcdBa
}

b := &BtcdBackend{
client: client,
network: network,
addrRequests: make(chan *deriver.Address, addrRequestsChanSize),
addrResponses: make(chan *AddrResponse, addrRequestsChanSize),
txResponses: make(chan *TxResponse, 2*maxTxsPerAddr),
blockHeightLookup: make(map[string]int64),
transactions: make(map[string]int64),
chainHeight: uint32(height),
client: client,
network: network,
chainHeight: uint32(height),
addrRequests: make(chan *deriver.Address, addrRequestsChanSize),
addrResponses: make(chan *AddrResponse, addrRequestsChanSize),
txRequests: make(chan string, 2*maxTxsPerAddr),
txResponses: make(chan *TxResponse, 2*maxTxsPerAddr),
blockHeightLookup: make(map[string]int64),
cachedTransactions: make(map[string]*TxResponse),
doneCh: make(chan bool),
}

// launch
Expand All @@ -100,7 +103,7 @@ func NewBtcdBackend(hostPort, user, pass string, network utils.Network) (*BtcdBa
// to the given address.
func (b *BtcdBackend) AddrRequest(addr *deriver.Address) {
reporter.GetInstance().IncAddressesScheduled()
reporter.GetInstance().Log(fmt.Sprintf("scheduling address: %s", addr))
reporter.GetInstance().Logf("scheduling address: %s", addr)
b.addrRequests <- addr
}

Expand All @@ -110,6 +113,14 @@ func (b *BtcdBackend) AddrResponses() <-chan *AddrResponse {
return b.addrResponses
}

// TxRequest schedules a request to the backend to lookup information related
// to the given transaction hash.
func (b *BtcdBackend) TxRequest(txHash string) {
reporter.GetInstance().IncTxScheduled()
reporter.GetInstance().Logf("scheduling tx: %s", txHash)
b.txRequests <- txHash
}

// TxResponses exposes a channel that allows to consume backend's responses to
// address requests created with addrrequest().
// if an address has any transactions then they will be sent to this channel by the
Expand All @@ -120,7 +131,7 @@ func (b *BtcdBackend) TxResponses() <-chan *TxResponse {

// Finish informs the backend to stop doing its work.
func (b *BtcdBackend) Finish() {
close(b.addrResponses)
close(b.doneCh)
b.client.Disconnect()
}

Expand All @@ -129,10 +140,20 @@ func (b *BtcdBackend) ChainHeight() uint32 {
}

func (b *BtcdBackend) processRequests() {
for addr := range b.addrRequests {
err := b.processAddrRequest(addr)
if err != nil {
panic(fmt.Sprintf("processAddrRequest failed: %+v", err))
for {
select {
case addr := <-b.addrRequests:
err := b.processAddrRequest(addr)
if err != nil {
panic(fmt.Sprintf("processAddrRequest failed: %+v", err))
}
case tx := <-b.txRequests:
err := b.processTxRequest(tx)
if err != nil {
panic(fmt.Sprintf("processTxRequest failed: %+v", err))
}
case <-b.doneCh:
break
}
}
}
Expand All @@ -147,7 +168,8 @@ func (b *BtcdBackend) processAddrRequest(address *deriver.Address) error {
// the address doesn't exist in the blockchain - either because it was not used
// or given backend doesn't have a complete blockchain
b.addrResponses <- &AddrResponse{
Address: address,
Address: address,
TxHashes: []string{},
}
return nil
}
Expand All @@ -164,7 +186,7 @@ func (b *BtcdBackend) processAddrRequest(address *deriver.Address) error {
txHashes = append(txHashes, tx.Txid)
}

go b.scheduleTx(txs)
b.cacheTxs(txs)

b.addrResponses <- &AddrResponse{
Address: address,
Expand All @@ -174,10 +196,48 @@ func (b *BtcdBackend) processAddrRequest(address *deriver.Address) error {
return nil
}

func (b *BtcdBackend) scheduleTx(txs []*btcjson.SearchRawTransactionsResult) {
func (b *BtcdBackend) processTxRequest(txHash string) error {
b.transactionsMu.Lock()
tx, exists := b.cachedTransactions[txHash]
b.transactionsMu.Unlock()

if exists {
b.txResponses <- tx

return nil
}

hash, err := chainhash.NewHashFromStr(txHash)
if err != nil {
return err
}
txResp, err := b.client.GetRawTransactionVerbose(hash)
if err != nil {
if jerr, ok := err.(*btcjson.RPCError); ok {
switch jerr.Code {
case btcjson.ErrRPCInvalidAddressOrKey:
return errors.Wrap(err, "blockchain doesn't have transaction "+txHash)
}
}
return errors.Wrap(err, "could not fetch transaction "+txHash)
}
height, err := b.getBlockHeight(txResp.BlockHash)
if err != nil {
return err
}

b.txResponses <- &TxResponse{
Hash: txHash,
Height: height,
Hex: txResp.Hex,
}
return nil
}

func (b *BtcdBackend) cacheTxs(txs []*btcjson.SearchRawTransactionsResult) {
for _, tx := range txs {
b.transactionsMu.Lock()
_, exists := b.transactions[tx.Txid]
_, exists := b.cachedTransactions[tx.Txid]
b.transactionsMu.Unlock()

if exists {
Expand All @@ -190,31 +250,15 @@ func (b *BtcdBackend) scheduleTx(txs []*btcjson.SearchRawTransactionsResult) {
}

b.transactionsMu.Lock()
b.transactions[tx.Txid] = height
b.transactionsMu.Unlock()

reporter.GetInstance().IncTxScheduled()
reporter.GetInstance().Log(fmt.Sprintf("scheduling tx: %s", tx.Txid))

b.txResponses <- &TxResponse{
b.cachedTransactions[tx.Txid] = &TxResponse{
Hash: tx.Txid,
Height: b.getTxHeight(tx.Txid),
Height: height,
Hex: tx.Hex,
}
b.transactionsMu.Unlock()
}
}

func (b *BtcdBackend) getTxHeight(txHash string) int64 {
b.transactionsMu.Lock()
defer b.transactionsMu.Unlock()

height, exists := b.transactions[txHash]
if !exists {
panic(fmt.Sprintf("inconsistent cache: %s", txHash))
}
return height
}

// getBlockHeight returns a block height for a given block hash or returns an error
func (b *BtcdBackend) getBlockHeight(hash string) (int64, error) {
b.blockHeightMu.Lock()
Expand All @@ -230,7 +274,13 @@ func (b *BtcdBackend) getBlockHeight(hash string) (int64, error) {
}
resp, err := b.client.GetBlockVerbose(h)
if err != nil {
return -1, err
if jerr, ok := err.(*btcjson.RPCError); ok {
switch jerr.Code {
case btcjson.ErrRPCInvalidAddressOrKey:
return -1, errors.Wrap(err, "blockchain doesn't have block "+hash)
}
}
return -1, errors.Wrap(err, "could not fetch block "+hash)
}

b.blockHeightMu.Lock()
Expand Down
Loading