Skip to content

Commit

Permalink
Mempool: Retrive stable state of the mempool. optimze get mempool ent…
Browse files Browse the repository at this point in the history
…ries by addresses (kaspanet#2111)

* fix mempool accessing, rewrite get_mempool_entries_by_addresses

* fix counter, add verbose

* fmt

* addresses as string

* Define error in case utxoEntry is missing.

* fix error variable to string

* stop tests from failing (see in code comment)

* access both pools in the same state via parameters

* get rid of todo message

* fmt - very important!

* perf: scriptpublickey in mempool, no txscript.

* address reveiw

* fmt fix

* mixed up isorphan bool, pass tests now

* do map preallocation in mempoolbyaddresses

* no proallocation for orphanpool sending.

Co-authored-by: Ori Newman <[email protected]>
  • Loading branch information
D-Stacks and someone235 authored Jul 26, 2022
1 parent 7a61c63 commit eb693c4
Show file tree
Hide file tree
Showing 21 changed files with 345 additions and 272 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (flow *handleRelayedTransactionsFlow) requestInvTransactions(
func (flow *handleRelayedTransactionsFlow) isKnownTransaction(txID *externalapi.DomainTransactionID) bool {
// Ask the transaction memory pool if the transaction is known
// to it in any form (main pool or orphan).
if _, ok := flow.Domain().MiningManager().GetTransaction(txID); ok {
if _, _, ok := flow.Domain().MiningManager().GetTransaction(txID, true, true); ok {
return true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (flow *handleRequestedTransactionsFlow) start() error {
}

for _, transactionID := range msgRequestTransactions.IDs {
tx, ok := flow.Domain().MiningManager().GetTransaction(transactionID)
tx, _, ok := flow.Domain().MiningManager().GetTransaction(transactionID, true, false)

if !ok {
msgTransactionNotFound := appmessage.NewMsgTransactionNotFound(transactionID)
Expand All @@ -40,7 +40,6 @@ func (flow *handleRequestedTransactionsFlow) start() error {
}
continue
}

err := flow.outgoingRoute.Enqueue(appmessage.DomainTransactionToMsgTx(tx))
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion app/rpc/rpccontext/notificationmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/kaspanet/kaspad/domain/dagconfig"

"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/kaspanet/kaspad/domain/consensus/utils/txscript"

"github.com/kaspanet/kaspad/app/appmessage"
Expand Down Expand Up @@ -421,7 +422,7 @@ func (nl *NotificationListener) convertUTXOChangesToUTXOsChangedNotification(
}

func (nl *NotificationListener) scriptPubKeyStringToAddressString(scriptPublicKeyString utxoindex.ScriptPublicKeyString) (string, error) {
scriptPubKey := utxoindex.ConvertStringToScriptPublicKey(scriptPublicKeyString)
scriptPubKey := externalapi.NewScriptPublicKeyFromString(string(scriptPublicKeyString))

// ignore error because it is often returned when the script is of unknown type
scriptType, address, err := txscript.ExtractScriptPubKeyAddress(scriptPubKey, nl.params)
Expand Down
18 changes: 1 addition & 17 deletions app/rpc/rpccontext/utxos_by_addresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,6 @@ func ConvertUTXOOutpointEntryPairsToUTXOsByAddressesEntries(address string, pair
return utxosByAddressesEntries
}

// convertUTXOOutpointsToUTXOsByAddressesEntries converts
// UTXOOutpoints to a slice of UTXOsByAddressesEntry
func convertUTXOOutpointsToUTXOsByAddressesEntries(address string, outpoints utxoindex.UTXOOutpoints) []*appmessage.UTXOsByAddressesEntry {
utxosByAddressesEntries := make([]*appmessage.UTXOsByAddressesEntry, 0, len(outpoints))
for outpoint := range outpoints {
utxosByAddressesEntries = append(utxosByAddressesEntries, &appmessage.UTXOsByAddressesEntry{
Address: address,
Outpoint: &appmessage.RPCOutpoint{
TransactionID: outpoint.TransactionID.String(),
Index: outpoint.Index,
},
})
}
return utxosByAddressesEntries
}

// ConvertAddressStringsToUTXOsChangedNotificationAddresses converts address strings
// to UTXOsChangedNotificationAddresses
func (ctx *Context) ConvertAddressStringsToUTXOsChangedNotificationAddresses(
Expand All @@ -63,7 +47,7 @@ func (ctx *Context) ConvertAddressStringsToUTXOsChangedNotificationAddresses(
if err != nil {
return nil, errors.Errorf("Could not create a scriptPublicKey for address '%s': %s", addressString, err)
}
scriptPublicKeyString := utxoindex.ConvertScriptPublicKeyToString(scriptPublicKey)
scriptPublicKeyString := utxoindex.ScriptPublicKeyString(scriptPublicKey.String())
addresses[i] = &UTXOsChangedNotificationAddress{
Address: addressString,
ScriptPublicKeyString: scriptPublicKeyString,
Expand Down
2 changes: 1 addition & 1 deletion app/rpc/rpchandlers/get_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func HandleGetInfo(context *rpccontext.Context, _ *router.Router, _ appmessage.M

response := appmessage.NewGetInfoResponseMessage(
context.NetAdapter.ID().String(),
uint64(context.Domain.MiningManager().TransactionCount()),
uint64(context.Domain.MiningManager().TransactionCount(true, false)),
version.Version(),
context.Config.UTXOIndex,
context.ProtocolManager.Context().HasPeers() && isNearlySynced,
Expand Down
70 changes: 24 additions & 46 deletions app/rpc/rpchandlers/get_mempool_entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,58 +12,36 @@ func HandleGetMempoolEntries(context *rpccontext.Context, _ *router.Router, requ

entries := make([]*appmessage.MempoolEntry, 0)

transactionPoolTransactions, orphanPoolTransactions := context.Domain.MiningManager().AllTransactions(!getMempoolEntriesRequest.FilterTransactionPool, getMempoolEntriesRequest.IncludeOrphanPool)

if !getMempoolEntriesRequest.FilterTransactionPool {
transactionPoolEntries, err := getTransactionPoolMempoolEntries(context)
if err != nil {
return nil, err
for _, transaction := range transactionPoolTransactions {
rpcTransaction := appmessage.DomainTransactionToRPCTransaction(transaction)
err := context.PopulateTransactionWithVerboseData(rpcTransaction, nil)
if err != nil {
return nil, err
}
entries = append(entries, &appmessage.MempoolEntry{
Fee: transaction.Fee,
Transaction: rpcTransaction,
IsOrphan: false,
})
}

entries = append(entries, transactionPoolEntries...)
}

if getMempoolEntriesRequest.IncludeOrphanPool {
orphanPoolEntries, err := getOrphanPoolMempoolEntries(context)
if err != nil {
return nil, err
for _, transaction := range orphanPoolTransactions {
rpcTransaction := appmessage.DomainTransactionToRPCTransaction(transaction)
err := context.PopulateTransactionWithVerboseData(rpcTransaction, nil)
if err != nil {
return nil, err
}
entries = append(entries, &appmessage.MempoolEntry{
Fee: transaction.Fee,
Transaction: rpcTransaction,
IsOrphan: true,
})
}
entries = append(entries, orphanPoolEntries...)
}

return appmessage.NewGetMempoolEntriesResponseMessage(entries), nil
}

func getTransactionPoolMempoolEntries(context *rpccontext.Context) ([]*appmessage.MempoolEntry, error) {
transactions := context.Domain.MiningManager().AllTransactions()
entries := make([]*appmessage.MempoolEntry, 0, len(transactions))
for _, transaction := range transactions {
rpcTransaction := appmessage.DomainTransactionToRPCTransaction(transaction)
err := context.PopulateTransactionWithVerboseData(rpcTransaction, nil)
if err != nil {
return nil, err
}
entries = append(entries, &appmessage.MempoolEntry{
Fee: transaction.Fee,
Transaction: rpcTransaction,
IsOrphan: false,
})
}
return entries, nil
}

func getOrphanPoolMempoolEntries(context *rpccontext.Context) ([]*appmessage.MempoolEntry, error) {
orphanTransactions := context.Domain.MiningManager().AllOrphanTransactions()
entries := make([]*appmessage.MempoolEntry, 0, len(orphanTransactions))
for _, orphanTransaction := range orphanTransactions {
rpcTransaction := appmessage.DomainTransactionToRPCTransaction(orphanTransaction)
err := context.PopulateTransactionWithVerboseData(rpcTransaction, nil)
if err != nil {
return nil, err
}
entries = append(entries, &appmessage.MempoolEntry{
Fee: orphanTransaction.Fee,
Transaction: rpcTransaction,
IsOrphan: true,
})
}
return entries, nil
}
179 changes: 75 additions & 104 deletions app/rpc/rpchandlers/get_mempool_entries_by_addresses.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package rpchandlers

import (
"errors"

"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/rpc/rpccontext"

"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/kaspanet/kaspad/domain/consensus/utils/consensushashing"
"github.com/kaspanet/kaspad/domain/consensus/utils/txscript"

"github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/kaspanet/kaspad/util"
)
Expand All @@ -20,132 +16,107 @@ func HandleGetMempoolEntriesByAddresses(context *rpccontext.Context, _ *router.R

mempoolEntriesByAddresses := make([]*appmessage.MempoolEntryByAddress, 0)

if !getMempoolEntriesByAddressesRequest.FilterTransactionPool {
transactionPoolTransactions := context.Domain.MiningManager().AllTransactions()
transactionPoolEntriesByAddresses, err := extractMempoolEntriesByAddressesFromTransactions(
context,
getMempoolEntriesByAddressesRequest.Addresses,
transactionPoolTransactions,
false,
)
if err != nil {
rpcError := &appmessage.RPCError{}
if !errors.As(err, &rpcError) {
return nil, err
}
errorMessage := &appmessage.GetUTXOsByAddressesResponseMessage{}
errorMessage.Error = rpcError
return errorMessage, nil
}
mempoolEntriesByAddresses = append(mempoolEntriesByAddresses, transactionPoolEntriesByAddresses...)
sendingInTransactionPool, receivingInTransactionPool, sendingInOrphanPool, receivingInOrphanPool, err := context.Domain.MiningManager().GetTransactionsByAddresses(!getMempoolEntriesByAddressesRequest.FilterTransactionPool, getMempoolEntriesByAddressesRequest.IncludeOrphanPool)
if err != nil {
return nil, err
}

if getMempoolEntriesByAddressesRequest.IncludeOrphanPool {
for _, addressString := range getMempoolEntriesByAddressesRequest.Addresses {

orphanPoolTransactions := context.Domain.MiningManager().AllOrphanTransactions()
orphanPoolEntriesByAddress, err := extractMempoolEntriesByAddressesFromTransactions(
context,
getMempoolEntriesByAddressesRequest.Addresses,
orphanPoolTransactions,
true,
)
address, err := util.DecodeAddress(addressString, context.Config.NetParams().Prefix)
if err != nil {
rpcError := &appmessage.RPCError{}
if !errors.As(err, &rpcError) {
return nil, err
}
errorMessage := &appmessage.GetUTXOsByAddressesResponseMessage{}
errorMessage.Error = rpcError
errorMessage := &appmessage.GetMempoolEntriesByAddressesResponseMessage{}
errorMessage.Error = appmessage.RPCErrorf("Could not decode address '%s': %s", addressString, err)
return errorMessage, nil
}

mempoolEntriesByAddresses = append(mempoolEntriesByAddresses, orphanPoolEntriesByAddress...)
}

return appmessage.NewGetMempoolEntriesByAddressesResponseMessage(mempoolEntriesByAddresses), nil
}
sending := make([]*appmessage.MempoolEntry, 0)
receiving := make([]*appmessage.MempoolEntry, 0)

//TO DO: optimize extractMempoolEntriesByAddressesFromTransactions
func extractMempoolEntriesByAddressesFromTransactions(context *rpccontext.Context, addresses []string, transactions []*externalapi.DomainTransaction, areOrphans bool) ([]*appmessage.MempoolEntryByAddress, error) {
mempoolEntriesByAddresses := make([]*appmessage.MempoolEntryByAddress, 0)
for _, addressString := range addresses {
_, err := util.DecodeAddress(addressString, context.Config.ActiveNetParams.Prefix)
scriptPublicKey, err := txscript.PayToAddrScript(address)
if err != nil {
return nil, appmessage.RPCErrorf("Could not decode address '%s': %s", addressString, err)
errorMessage := &appmessage.GetMempoolEntriesByAddressesResponseMessage{}
errorMessage.Error = appmessage.RPCErrorf("Could not extract scriptPublicKey from address '%s': %s", addressString, err)
return errorMessage, nil
}

sending := make([]*appmessage.MempoolEntry, 0)
receiving := make([]*appmessage.MempoolEntry, 0)
if !getMempoolEntriesByAddressesRequest.FilterTransactionPool {

for _, transaction := range transactions {

for i, input := range transaction.Inputs {
if input.UTXOEntry == nil {
if !areOrphans { // Orphans can legitimately have `input.UTXOEntry == nil`
// TODO: Fix the underlying cause of the bug for non-orphan entries
log.Debugf(
"Couldn't find UTXO entry for input %d in mempool transaction %s. This is a bug and should be fixed.",
i, consensushashing.TransactionID(transaction))
}
continue
if transaction, found := sendingInTransactionPool[scriptPublicKey.String()]; found {
rpcTransaction := appmessage.DomainTransactionToRPCTransaction(transaction)
err := context.PopulateTransactionWithVerboseData(rpcTransaction, nil)
if err != nil {
return nil, err
}

_, transactionSendingAddress, err := txscript.ExtractScriptPubKeyAddress(
input.UTXOEntry.ScriptPublicKey(),
context.Config.ActiveNetParams)
sending = append(sending, &appmessage.MempoolEntry{
Fee: transaction.Fee,
Transaction: rpcTransaction,
IsOrphan: false,
},
)
}

if transaction, found := receivingInTransactionPool[scriptPublicKey.String()]; found {
rpcTransaction := appmessage.DomainTransactionToRPCTransaction(transaction)
err := context.PopulateTransactionWithVerboseData(rpcTransaction, nil)
if err != nil {
return nil, err
}
if addressString == transactionSendingAddress.String() {
rpcTransaction := appmessage.DomainTransactionToRPCTransaction(transaction)
sending = append(
sending,
&appmessage.MempoolEntry{
Fee: transaction.Fee,
Transaction: rpcTransaction,
IsOrphan: areOrphans,
},
)
break //one input is enough
}
}

for _, output := range transaction.Outputs {
_, transactionReceivingAddress, err := txscript.ExtractScriptPubKeyAddress(
output.ScriptPublicKey,
context.Config.ActiveNetParams,
receiving = append(receiving, &appmessage.MempoolEntry{
Fee: transaction.Fee,
Transaction: rpcTransaction,
IsOrphan: false,
},
)
}
}
if getMempoolEntriesByAddressesRequest.IncludeOrphanPool {

if transaction, found := sendingInOrphanPool[scriptPublicKey.String()]; found {
rpcTransaction := appmessage.DomainTransactionToRPCTransaction(transaction)
err := context.PopulateTransactionWithVerboseData(rpcTransaction, nil)
if err != nil {
return nil, err
}
if addressString == transactionReceivingAddress.String() {
rpcTransaction := appmessage.DomainTransactionToRPCTransaction(transaction)
receiving = append(
receiving,
&appmessage.MempoolEntry{
Fee: transaction.Fee,
Transaction: rpcTransaction,
IsOrphan: areOrphans,
},
)
break //one output is enough
}

sending = append(sending, &appmessage.MempoolEntry{
Fee: transaction.Fee,
Transaction: rpcTransaction,
IsOrphan: true,
},
)
}

//Only append mempoolEntriesByAddress, if at least 1 mempoolEntry for the address is found.
//This mimics the behaviour of GetUtxosByAddresses RPC call.
if len(sending) > 0 || len(receiving) > 0 {
mempoolEntriesByAddresses = append(
mempoolEntriesByAddresses,
&appmessage.MempoolEntryByAddress{
Address: addressString,
Sending: sending,
Receiving: receiving,
},
if transaction, found := receivingInOrphanPool[scriptPublicKey.String()]; found {
rpcTransaction := appmessage.DomainTransactionToRPCTransaction(transaction)
err := context.PopulateTransactionWithVerboseData(rpcTransaction, nil)
if err != nil {
return nil, err
}

receiving = append(receiving, &appmessage.MempoolEntry{
Fee: transaction.Fee,
Transaction: rpcTransaction,
IsOrphan: true,
},
)
}

}

if len(sending) > 0 || len(receiving) > 0 {
mempoolEntriesByAddresses = append(
mempoolEntriesByAddresses,
&appmessage.MempoolEntryByAddress{
Address: address.String(),
Sending: sending,
Receiving: receiving,
},
)
}
}
return mempoolEntriesByAddresses, nil

return appmessage.NewGetMempoolEntriesByAddressesResponseMessage(mempoolEntriesByAddresses), nil
}
Loading

0 comments on commit eb693c4

Please sign in to comment.