Skip to content

Commit

Permalink
feat: error handling for processor and tester
Browse files Browse the repository at this point in the history
Signed-off-by: Jingfu Wang <[email protected]>
  • Loading branch information
GeekArthur committed Sep 13, 2022
1 parent e7a9ebd commit f957616
Show file tree
Hide file tree
Showing 14 changed files with 149 additions and 135 deletions.
37 changes: 23 additions & 14 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@ var (
ErrConstructionConfigMissing = errors.New("construction configuration is missing")

// Data check errors
ErrDataCheckHalt = errors.New("data check halted")
ErrReconciliationFailure = errors.New("reconciliation failure")
ErrDataCheckHalt = errors.New("data check halted")
ErrReconciliationFailure = errors.New("reconciliation failure")
ErrInitDataTester = errors.New("unexpected error occurred while trying to initialize data tester")
ErrReconcilerDrainHalt = errors.New("reconciler queue drain halted")
ErrMissingOps = errors.New("search for block with missing ops halted")
ErrUnableToFindMissingOps = errors.New("unable to find missing ops")

// Spec check errors
ErrErrorEmptyMessage = errors.New("error object can't have empty message")
Expand All @@ -50,18 +54,23 @@ var (
ErrBlockTip = errors.New("unspecified block_identifier doesn't give the tip block")

// Construction check errors
ErrConstructionCheckHalt = errors.New("construction check halted")
ErrConstructionCheckHalt = errors.New("construction check halted")
ErrBalanceExemptionsWithInitialBalanceFetchDisabled = errors.New("found balance exemptions but initial balance fetch disabled")

// Command errors
ErrBlockNotFound = errors.New("block not found")
ErrNoAvailableNetwork = errors.New("no networks available")
ErrNetworkOptionsAllowlistIsNil = errors.New("network options allowlist is nil")
ErrAsserterConfigurationIsNil = errors.New("asserter configuration is nil")
ErrTimestampStartIndexMismatch = errors.New("timestamp start index mismatch")
ErrOperationTypeLengthMismatch = errors.New("operation type length mismatch")
ErrOperationTypeMismatch = errors.New("operation type mismatch")
ErrOperationStatusLengthMismatch = errors.New("operation status length mismatch")
ErrOperationStatusMismatch = errors.New("operation status mismatch")
ErrErrorLengthMismatch = errors.New("error length mismatch")
ErrErrorMismatch = errors.New("error mismatch")
ErrBlockNotFound = errors.New("block not found")
ErrNoAvailableNetwork = errors.New("no networks available")
ErrNetworkOptionsAllowlistIsNil = errors.New("network options allowlist is nil")
ErrAsserterConfigurationIsNil = errors.New("asserter configuration is nil")
ErrTimestampStartIndexMismatch = errors.New("timestamp start index mismatch")
ErrOperationTypeLengthMismatch = errors.New("operation type length mismatch")
ErrOperationTypeMismatch = errors.New("operation type mismatch")
ErrOperationStatusLengthMismatch = errors.New("operation status length mismatch")
ErrOperationStatusMismatch = errors.New("operation status mismatch")
ErrErrorLengthMismatch = errors.New("error length mismatch")
ErrErrorMismatch = errors.New("error mismatch")
ErrAsserterConfigError = errors.New("asserter configuration validation failed")
ErrNoHeadBlock = errors.New("no head block")
ErrBlockBenchmarkTimeout = errors.New("/block endpoint benchmarking timed out")
ErrAccountBalanceBenchmarkTimeout = errors.New("/account/balance endpoint benchmarking timed out")
)
5 changes: 3 additions & 2 deletions pkg/processor/balance_storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package processor

import (
"context"
"fmt"
"math/big"

"github.com/coinbase/rosetta-cli/pkg/logger"
Expand Down Expand Up @@ -124,7 +125,7 @@ func (h *BalanceStorageHandler) AccountsReconciled(
modules.ReconciledAccounts,
big.NewInt(int64(count)),
)
return err
return fmt.Errorf("failed to update the total accounts reconciled by count: %w", err)
}

// AccountsSeen updates the total accounts seen by count.
Expand All @@ -139,5 +140,5 @@ func (h *BalanceStorageHandler) AccountsSeen(
modules.SeenAccounts,
big.NewInt(int64(count)),
)
return err
return fmt.Errorf("failed to update the total accounts seen by count: %w", err)
}
2 changes: 1 addition & 1 deletion pkg/processor/balance_storage_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (h *BalanceStorageHelper) AccountBalance(
lookupBlock.Index,
)
if err != nil {
return nil, fmt.Errorf("%w: unable to get currency balance", err)
return nil, fmt.Errorf("unable to get balance of currency %s for account %s: %w", types.PrintStruct(currency), types.PrintStruct(account), err)
}

// If the returned balance block does not match the intended
Expand Down
8 changes: 4 additions & 4 deletions pkg/processor/broadcast_storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (h *BroadcastStorageHandler) TransactionConfirmed(
) error {
_, _, relatedTransactions, err := h.blockStorage.FindRelatedTransactions(ctx, transaction.TransactionIdentifier, dbTx)
if err != nil {
return fmt.Errorf("%w: could not find related transactions", err)
return fmt.Errorf("failed to find related transactions %s: %w", types.PrintStruct(transaction.TransactionIdentifier), err)
}

observed := transaction.Operations
Expand All @@ -81,7 +81,7 @@ func (h *BroadcastStorageHandler) TransactionConfirmed(
}

if err := h.parser.ExpectedOperations(intent, observed, false, true); err != nil {
return fmt.Errorf("%w: confirmed transaction did not match intent", err)
return fmt.Errorf("confirmed transaction did not match intent: %w", err)
}

// Validate destination memo if it's needed
Expand All @@ -108,7 +108,7 @@ func (h *BroadcastStorageHandler) TransactionConfirmed(
identifier,
transaction,
); err != nil {
return fmt.Errorf("%w: coordinator could not handle transaction", err)
return fmt.Errorf("coordinator could not handle transaction: %w", err)
}

return nil
Expand Down Expand Up @@ -155,7 +155,7 @@ func (h *BroadcastStorageHandler) BroadcastFailed(
identifier,
nil,
); err != nil {
return fmt.Errorf("%w: coordinator could not handle transaction", err)
return fmt.Errorf("coordinator could not handle transaction: %w", err)
}

if h.config.Construction.IgnoreBroadcastFailures {
Expand Down
9 changes: 5 additions & 4 deletions pkg/processor/broadcast_storage_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package processor
import (
"context"
"fmt"

"github.com/coinbase/rosetta-sdk-go/utils"

"github.com/coinbase/rosetta-sdk-go/fetcher"
Expand Down Expand Up @@ -55,7 +56,7 @@ func (h *BroadcastStorageHelper) AtTip(
) (bool, error) {
atTip, _, err := utils.CheckStorageTip(ctx, h.network, tipDelay, h.fetcher, h.blockStorage)
if err != nil {
return false, err
return false, fmt.Errorf("failed to check storage tip: %w", err)
}

return atTip, nil
Expand All @@ -68,7 +69,7 @@ func (h *BroadcastStorageHelper) CurrentBlockIdentifier(
) (*types.BlockIdentifier, error) {
blockIdentifier, err := h.blockStorage.GetHeadBlockIdentifier(ctx)
if err != nil {
return nil, fmt.Errorf("%w: unable to get head block identifier", err)
return nil, fmt.Errorf("unable to get head block identifier: %w", err)
}

return blockIdentifier, nil
Expand All @@ -84,7 +85,7 @@ func (h *BroadcastStorageHelper) FindTransaction(
) (*types.BlockIdentifier, *types.Transaction, error) {
newestBlock, transaction, err := h.blockStorage.FindTransaction(ctx, transactionIdentifier, txn)
if err != nil {
return nil, nil, fmt.Errorf("%w: unable to perform transaction search", err)
return nil, nil, fmt.Errorf("unable to perform transaction search for transaction %s: %w", types.PrintStruct(transactionIdentifier), err)
}

return newestBlock, transaction, nil
Expand All @@ -103,7 +104,7 @@ func (h *BroadcastStorageHelper) BroadcastTransaction(
networkTransaction,
)
if fetchErr != nil {
return nil, fmt.Errorf("%w: unable to broadcast transaction", fetchErr.Err)
return nil, fmt.Errorf("unable to broadcast transaction %s: %w", networkTransaction, fetchErr.Err)
}

return transactionIdentifier, nil
Expand Down
22 changes: 11 additions & 11 deletions pkg/processor/coordinator_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ package processor

import (
"context"
"errors"
"fmt"
"log"
"math/big"

cliErrs "github.com/coinbase/rosetta-cli/pkg/errors"
"github.com/coinbase/rosetta-sdk-go/constructor/coordinator"
"github.com/coinbase/rosetta-sdk-go/fetcher"
"github.com/coinbase/rosetta-sdk-go/keys"
Expand Down Expand Up @@ -153,7 +153,7 @@ func (c *CoordinatorHelper) Derive(
)
if fetchErr != nil {
c.verboseLog(reqerror, constructionDerive, arg{argError, fetchErr})
return nil, nil, fetchErr.Err
return nil, nil, fmt.Errorf("/construction/derive call is failed: %w", fetchErr.Err)
}

c.verboseLog(response, constructionDerive,
Expand Down Expand Up @@ -185,7 +185,7 @@ func (c *CoordinatorHelper) Preprocess(

if fetchErr != nil {
c.verboseLog(reqerror, constructionPreprocess, arg{argError, fetchErr})
return nil, nil, fetchErr.Err
return nil, nil, fmt.Errorf("/construction/preprocess call is failed: %w", fetchErr.Err)
}

c.verboseLog(response, constructionPreprocess,
Expand Down Expand Up @@ -217,7 +217,7 @@ func (c *CoordinatorHelper) Metadata(

if fetchErr != nil {
c.verboseLog(reqerror, constructionMetadata, arg{argError, fetchErr})
return nil, nil, fetchErr.Err
return nil, nil, fmt.Errorf("/construction/metadata call is failed: %w", fetchErr.Err)
}

c.verboseLog(response, constructionMetadata,
Expand Down Expand Up @@ -251,7 +251,7 @@ func (c *CoordinatorHelper) Payloads(

if fetchErr != nil {
c.verboseLog(reqerror, constructionPayloads, arg{argError, fetchErr})
return "", nil, fetchErr.Err
return "", nil, fmt.Errorf("/construction/payloads call is failed: %w", fetchErr.Err)
}

c.verboseLog(response, constructionPayloads,
Expand Down Expand Up @@ -283,7 +283,7 @@ func (c *CoordinatorHelper) Parse(

if fetchErr != nil {
c.verboseLog(reqerror, constructionParse, arg{argError, fetchErr})
return nil, nil, nil, fetchErr.Err
return nil, nil, nil, fmt.Errorf("/construction/parse call is failed: %w", fetchErr.Err)
}

c.verboseLog(response, constructionParse,
Expand Down Expand Up @@ -316,7 +316,7 @@ func (c *CoordinatorHelper) Combine(

if fetchErr != nil {
c.verboseLog(reqerror, constructionCombine, arg{argError, fetchErr})
return "", fetchErr.Err
return "", fmt.Errorf("/construction/combine call is failed: %w", fetchErr.Err)
}

c.verboseLog(response, constructionCombine, arg{argNetworkTransaction, res})
Expand All @@ -342,7 +342,7 @@ func (c *CoordinatorHelper) Hash(

if fetchErr != nil {
c.verboseLog(reqerror, constructionHash, arg{argError, fetchErr})
return nil, fetchErr.Err
return nil, fmt.Errorf("/construction/hash call is failed: %w", fetchErr.Err)
}

c.verboseLog(response, constructionHash, arg{argTransactionIdentifier, res})
Expand Down Expand Up @@ -400,10 +400,10 @@ func (c *CoordinatorHelper) Balance(
) (*types.Amount, error) {
headBlock, err := c.blockStorage.GetHeadBlockIdentifier(ctx)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get head block identifier: %w", err)
}
if headBlock == nil {
return nil, errors.New("no blocks synced")
return nil, cliErrs.ErrNoHeadBlock
}

return c.balanceStorage.GetOrSetBalanceTransactional(
Expand All @@ -429,7 +429,7 @@ func (c *CoordinatorHelper) Coins(
accountIdentifier,
)
if err != nil {
return nil, fmt.Errorf("%w: unable to get coins", err)
return nil, fmt.Errorf("unable to get coins for account %s: %w", types.PrintStruct(accountIdentifier), err)
}

coinsToReturn := []*types.Coin{}
Expand Down
16 changes: 8 additions & 8 deletions pkg/processor/reconciler_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (h *ReconcilerHandler) UpdateCounts(ctx context.Context) error {
}

if _, err := h.counterStorage.Update(ctx, key, big.NewInt(count)); err != nil {
return err
return fmt.Errorf("failed to key %s in counter storage: %w", key, err)
}
}

Expand Down Expand Up @@ -145,7 +145,7 @@ func (h *ReconcilerHandler) ReconciliationFailed(
block,
)
if err != nil {
return err
return fmt.Errorf("failed to log reconciliation checks when reconciliation is failed: %w", err)
}

if h.haltOnReconciliationError {
Expand All @@ -161,28 +161,28 @@ func (h *ReconcilerHandler) ReconciliationFailed(
}
h.InactiveFailureBlock = block
return fmt.Errorf(
"%w: inactive reconciliation error for %s at %d (computed: %s%s, live: %s%s)",
cliErrs.ErrReconciliationFailure,
"inactive reconciliation error for account address %s at block index %d (computed: %s%s, live: %s%s): %w",
account.Address,
block.Index,
computedBalance,
currency.Symbol,
liveBalance,
currency.Symbol,
cliErrs.ErrReconciliationFailure,
)
}

// If we halt on an active reconciliation error, store in the handler.
h.ActiveFailureBlock = block
return fmt.Errorf(
"%w: active reconciliation error for %s at %d (computed: %s%s, live: %s%s)",
cliErrs.ErrReconciliationFailure,
"active reconciliation error for account address %s at block index %d (computed: %s%s, live: %s%s): %w",
account.Address,
block.Index,
computedBalance,
currency.Symbol,
liveBalance,
currency.Symbol,
cliErrs.ErrReconciliationFailure,
)
}

Expand All @@ -209,7 +209,7 @@ func (h *ReconcilerHandler) ReconciliationExempt(
// we still mark the account as being reconciled because the balance was in the range
// specified by exemption.
if err := h.balanceStorage.Reconciled(ctx, account, currency, block); err != nil {
return fmt.Errorf("%w: unable to store updated reconciliation", err)
return fmt.Errorf("unable to store updated reconciliation currency %s of account %s at block %s: %w", types.PrintStruct(currency), types.PrintStruct(account), types.PrintStruct(block), err)
}

return nil
Expand Down Expand Up @@ -250,7 +250,7 @@ func (h *ReconcilerHandler) ReconciliationSucceeded(
h.counterLock.Unlock()

if err := h.balanceStorage.Reconciled(ctx, account, currency, block); err != nil {
return fmt.Errorf("%w: unable to store updated reconciliation", err)
return fmt.Errorf("unable to store updated reconciliation currency %s of account %s at block %s: %w", types.PrintStruct(currency), types.PrintStruct(account), types.PrintStruct(block), err)
}

return h.logger.ReconcileSuccessStream(
Expand Down
3 changes: 2 additions & 1 deletion pkg/processor/reconciler_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package processor

import (
"context"
"fmt"

"github.com/coinbase/rosetta-cli/configuration"

Expand Down Expand Up @@ -138,7 +139,7 @@ func (h *ReconcilerHelper) LiveBalance(
index,
)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to get current balance of currency %s of account %s: %w", types.PrintStruct(currency), types.PrintStruct(account), err)
}
return amt, block, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/results/construction_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (c *CheckConstructionResults) Output(path string) {
if len(path) > 0 {
writeErr := utils.SerializeAndWrite(path, c)
if writeErr != nil {
log.Printf("%s: unable to save results\n", writeErr.Error())
log.Printf("unable to save results: %s\n", writeErr.Error())
}
}
}
Expand Down
Loading

0 comments on commit f957616

Please sign in to comment.