Skip to content

Commit

Permalink
Algod : merge unlimited assets to master (algorand#3652)
Browse files Browse the repository at this point in the history
## Summary

This PR adds support of for unlimited assets.

Done:
- [x] Update this title
- [x] Complete & Merge the implementation of the accounts prefetched during the validation (algorand#3666)
- [x] Create & Merge a spec update ( algorandfoundation/specs#62 (comment) )
- [x] Create & Merge a consensus upgrade (algorand#3674)

## Test Plan

Extend, update and add new unit tests.
  • Loading branch information
tsachiherman authored Mar 2, 2022
1 parent 89ffcca commit e1ae888
Show file tree
Hide file tree
Showing 129 changed files with 17,386 additions and 3,260 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ GOLDFLAGS := $(GOLDFLAGS_BASE) \
UNIT_TEST_SOURCES := $(sort $(shell GOPATH=$(GOPATH) && GO111MODULE=off && go list ./... | grep -v /go-algorand/test/ ))
ALGOD_API_PACKAGES := $(sort $(shell GOPATH=$(GOPATH) && GO111MODULE=off && cd daemon/algod/api; go list ./... ))

MSGP_GENERATE := ./protocol ./protocol/test ./crypto ./crypto/merklearray ./crypto/merklesignature ./crypto/compactcert ./data/basics ./data/transactions ./data/committee ./data/bookkeeping ./data/hashable ./agreement ./rpcs ./node ./ledger ./ledger/ledgercore ./compactcert ./data/account
MSGP_GENERATE := ./protocol ./protocol/test ./crypto ./crypto/merklearray ./crypto/merklesignature ./crypto/compactcert ./data/basics ./data/transactions ./data/committee ./data/bookkeeping ./data/hashable ./agreement ./rpcs ./node ./ledger ./ledger/ledgercore ./compactcert ./data/account ./daemon/algod/api/spec/v2

default: build

Expand Down
30 changes: 19 additions & 11 deletions catchup/catchpointService.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ type CatchpointCatchupNodeServices interface {

// CatchpointCatchupStats is used for querying and reporting the current state of the catchpoint catchup process
type CatchpointCatchupStats struct {
CatchpointLabel string
TotalAccounts uint64
ProcessedAccounts uint64
VerifiedAccounts uint64
TotalBlocks uint64
AcquiredBlocks uint64
VerifiedBlocks uint64
ProcessedBytes uint64
StartTime time.Time
CatchpointLabel string
TotalAccounts uint64
ProcessedAccounts uint64
VerifiedAccounts uint64
TotalBlocks uint64
AcquiredBlocks uint64
VerifiedBlocks uint64
ProcessedBytes uint64
TotalAccountHashes uint64
StartTime time.Time
}

// CatchpointCatchupService represents the catchpoint catchup service.
Expand Down Expand Up @@ -287,10 +288,14 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) {
return cs.abort(err)
}
peer := psp.Peer
start := time.Now()
err = ledgerFetcher.downloadLedger(cs.ctx, peer, round)
if err == nil {
cs.log.Infof("ledger downloaded in %d seconds", time.Since(start)/time.Second)
start = time.Now()
err = cs.ledgerAccessor.BuildMerkleTrie(cs.ctx, cs.updateVerifiedAccounts)
if err == nil {
cs.log.Infof("built merkle trie in %d seconds", time.Since(start)/time.Second)
break
}
// failed to build the merkle trie for the above catchpoint file.
Expand Down Expand Up @@ -321,10 +326,12 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) {
}

// updateVerifiedAccounts update the user's statistics for the given verified accounts
func (cs *CatchpointCatchupService) updateVerifiedAccounts(verifiedAccounts uint64) {
func (cs *CatchpointCatchupService) updateVerifiedAccounts(addedTrieHashes uint64) {
cs.statsMu.Lock()
defer cs.statsMu.Unlock()
cs.stats.VerifiedAccounts = verifiedAccounts
if cs.stats.TotalAccountHashes > 0 {
cs.stats.VerifiedAccounts = cs.stats.TotalAccounts * addedTrieHashes / cs.stats.TotalAccountHashes
}
}

// processStageLastestBlockDownload is the third catchpoint catchup stage. It downloads the latest block and verify that against the previously downloaded ledger.
Expand Down Expand Up @@ -701,6 +708,7 @@ func (cs *CatchpointCatchupService) updateLedgerFetcherProgress(fetcherStats *le
cs.stats.TotalAccounts = fetcherStats.TotalAccounts
cs.stats.ProcessedAccounts = fetcherStats.ProcessedAccounts
cs.stats.ProcessedBytes = fetcherStats.ProcessedBytes
cs.stats.TotalAccountHashes = fetcherStats.TotalAccountHashes
}

// GetStatistics returns a copy of the current catchpoint catchup statistics
Expand Down
18 changes: 18 additions & 0 deletions catchup/ledgerFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,25 @@ func (lf *ledgerFetcher) getPeerLedger(ctx context.Context, peer network.HTTPPee
defer watchdogReader.Close()
tarReader := tar.NewReader(watchdogReader)
var downloadProgress ledger.CatchpointCatchupAccessorProgress
var writeDuration time.Duration

printLogsFunc := func() {
lf.log.Infof(
"writing balances to disk took %d seconds, "+
"writing creatables to disk took %d seconds, "+
"writing hashes to disk took %d seconds, "+
"total duration is %d seconds",
downloadProgress.BalancesWriteDuration/time.Second,
downloadProgress.CreatablesWriteDuration/time.Second,
downloadProgress.HashesWriteDuration/time.Second,
writeDuration/time.Second)
}

for {
header, err := tarReader.Next()
if err != nil {
if err == io.EOF {
printLogsFunc()
return nil
}
return err
Expand All @@ -166,15 +181,18 @@ func (lf *ledgerFetcher) getPeerLedger(ctx context.Context, peer network.HTTPPee
return err
}
}
start := time.Now()
err = lf.processBalancesBlock(ctx, header.Name, balancesBlockBytes, &downloadProgress)
if err != nil {
return err
}
writeDuration += time.Since(start)
if lf.reporter != nil {
lf.reporter.updateLedgerFetcherProgress(&downloadProgress)
}
if err = watchdogReader.Reset(); err != nil {
if err == io.EOF {
printLogsFunc()
return nil
}
err = fmt.Errorf("getPeerLedger received the following error while reading the catchpoint file : %v", err)
Expand Down
122 changes: 80 additions & 42 deletions cmd/catchpointdump/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var excludedFields *cmdutil.CobraStringSliceValue = cmdutil.MakeCobraStringSlice
func init() {
fileCmd.Flags().StringVarP(&tarFile, "tar", "t", "", "Specify the tar file to process")
fileCmd.Flags().StringVarP(&outFileName, "output", "o", "", "Specify an outfile for the dump ( i.e. tracker.dump.txt )")
fileCmd.Flags().BoolVarP(&loadOnly, "load", "l", false, "Load only, do not dump")
fileCmd.Flags().VarP(excludedFields, "exclude-fields", "e", "List of fields to exclude from the dump: ["+excludedFields.AllowedString()+"]")
}

Expand All @@ -65,6 +66,7 @@ var fileCmd = &cobra.Command{
if err != nil {
reportErrorf("Unable to stat '%s' : %v", tarFile, err)
}

tarSize := stats.Size()
if tarSize == 0 {
reportErrorf("Empty file '%s' : %v", tarFile, err)
Expand All @@ -87,9 +89,11 @@ var fileCmd = &cobra.Command{
defer os.Remove("./ledger.block.sqlite")
defer os.Remove("./ledger.block.sqlite-shm")
defer os.Remove("./ledger.block.sqlite-wal")
defer os.Remove("./ledger.tracker.sqlite")
defer os.Remove("./ledger.tracker.sqlite-shm")
defer os.Remove("./ledger.tracker.sqlite-wal")
if !loadOnly {
defer os.Remove("./ledger.tracker.sqlite")
defer os.Remove("./ledger.tracker.sqlite-shm")
defer os.Remove("./ledger.tracker.sqlite-wal")
}
defer l.Close()

catchupAccessor := ledger.MakeCatchpointCatchupAccessor(l, logging.Base())
Expand All @@ -110,18 +114,20 @@ var fileCmd = &cobra.Command{
reportErrorf("Unable to load catchpoint file into in-memory database : %v", err)
}

outFile := os.Stdout
if outFileName != "" {
outFile, err = os.OpenFile(outFileName, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0755)
if err != nil {
reportErrorf("Unable to create file '%s' : %v", outFileName, err)
if !loadOnly {
outFile := os.Stdout
if outFileName != "" {
outFile, err = os.OpenFile(outFileName, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0755)
if err != nil {
reportErrorf("Unable to create file '%s' : %v", outFileName, err)
}
defer outFile.Close()
}
defer outFile.Close()
}

err = printAccountsDatabase("./ledger.tracker.sqlite", fileHeader, outFile, excludedFields.GetSlice())
if err != nil {
reportErrorf("Unable to print account database : %v", err)
err = printAccountsDatabase("./ledger.tracker.sqlite", fileHeader, outFile, excludedFields.GetSlice())
if err != nil {
reportErrorf("Unable to print account database : %v", err)
}
}
},
}
Expand Down Expand Up @@ -290,8 +296,10 @@ func printAccountsDatabase(databaseName string, fileHeader ledger.CatchpointFile
}

balancesTable := "accountbase"
resourcesTable := "resources"
if fileHeader.Version != 0 {
balancesTable = "catchpointbalances"
resourcesTable = "catchpointresources"
}

var rowsCount int64
Expand All @@ -300,49 +308,79 @@ func printAccountsDatabase(databaseName string, fileHeader ledger.CatchpointFile
return
}

rows, err := tx.Query(fmt.Sprintf("SELECT address, data FROM %s order by address", balancesTable))
if err != nil {
return
}
defer rows.Close()

for rows.Next() {
var addrbuf []byte
var buf []byte
err = rows.Scan(&addrbuf, &buf)
printer := func(addr basics.Address, data interface{}, progress uint64) (err error) {
jsonData, err := json.Marshal(data)
if err != nil {
return
return err
}

fmt.Fprintf(fileWriter, "%v : %s\n", addr, string(jsonData))

if time.Since(lastProgressUpdate) > 50*time.Millisecond && rowsCount > 0 {
lastProgressUpdate = time.Now()
printDumpingCatchpointProgressLine(int(float64(progress)*50.0/float64(rowsCount)), 50, int64(progress))
}
return nil
}

var data basics.AccountData
err = protocol.Decode(buf, &data)
if fileHeader.Version < ledger.CatchpointFileVersionV6 {
var rows *sql.Rows
rows, err = tx.Query(fmt.Sprintf("SELECT address, data FROM %s order by address", balancesTable))
if err != nil {
return
}
defer rows.Close()

for rows.Next() {
var addrbuf []byte
var buf []byte
err = rows.Scan(&addrbuf, &buf)
if err != nil {
return
}

var addr basics.Address
if len(addrbuf) != len(addr) {
err = fmt.Errorf("Account DB address length mismatch: %d != %d", len(addrbuf), len(addr))
return
var addr basics.Address
if len(addrbuf) != len(addr) {
err = fmt.Errorf("account DB address length mismatch: %d != %d", len(addrbuf), len(addr))
return
}
copy(addr[:], addrbuf)

var data basics.AccountData
err = protocol.Decode(buf, &data)
if err != nil {
return
}

err = printer(addr, data, progress)
if err != nil {
return
}

progress++
}
copy(addr[:], addrbuf)
jsonData, err := json.Marshal(data)
err = rows.Err()
} else {
acctCount := 0
acctCb := func(addr basics.Address, data basics.AccountData) {
err = printer(addr, data, progress)
if err != nil {
return
}
progress++
acctCount++
}
_, err = ledger.LoadAllFullAccounts(context.Background(), tx, balancesTable, resourcesTable, acctCb)
if err != nil {
return err
return
}

fmt.Fprintf(fileWriter, "%v : %s\n", addr, string(jsonData))

if time.Now().Sub(lastProgressUpdate) > 50*time.Millisecond && rowsCount > 0 {
lastProgressUpdate = time.Now()
printDumpingCatchpointProgressLine(int(float64(progress)*50.0/float64(rowsCount)), 50, int64(progress))
if acctCount != int(rowsCount) {
return fmt.Errorf("expected %d accounts but got only %d", rowsCount, acctCount)
}
progress++
}

err = rows.Err()
// increase the deadline warning to disable the warning message.
db.ResetTransactionWarnDeadline(ctx, tx, time.Now().Add(5*time.Second))
return nil
return err
})
}
44 changes: 24 additions & 20 deletions cmd/catchpointdump/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var networkName string
var round int
var relayAddress string
var singleCatchpoint bool
var downloadOnly bool
var loadOnly bool

const (
escapeCursorUp = string("\033[A") // Cursor Up
Expand All @@ -57,7 +57,7 @@ func init() {
netCmd.Flags().IntVarP(&round, "round", "r", 0, "Specify the round number ( i.e. 7700000 )")
netCmd.Flags().StringVarP(&relayAddress, "relay", "p", "", "Relay address to use ( i.e. r-ru.algorand-mainnet.network:4160 )")
netCmd.Flags().BoolVarP(&singleCatchpoint, "single", "s", true, "Download/process only from a single relay")
netCmd.Flags().BoolVarP(&downloadOnly, "download", "l", false, "Download only, do not process")
netCmd.Flags().BoolVarP(&loadOnly, "load", "l", false, "Load only, do not dump")
netCmd.Flags().VarP(excludedFields, "exclude-fields", "e", "List of fields to exclude from the dump: ["+excludedFields.AllowedString()+"]")
}

Expand Down Expand Up @@ -97,9 +97,9 @@ var netCmd = &cobra.Command{
},
}},
}
err = makeFileDump(addr, tarName, genesisInitState)
err = loadAndDump(addr, tarName, genesisInitState)
if err != nil {
reportInfof("failed to make a dump from tar file for '%s' : %v", addr, err)
reportInfof("failed to load/dump from tar file for '%s' : %v", addr, err)
continue
}
// clear possible errors from previous run: at this point we've been succeed
Expand Down Expand Up @@ -253,24 +253,26 @@ func downloadCatchpoint(addr string, round int) (tarName string, err error) {
}
}

func makeFileDump(addr string, tarFile string, genesisInitState ledgercore.InitState) error {
deleteLedgerFiles := func() {
func loadAndDump(addr string, tarFile string, genesisInitState ledgercore.InitState) error {
deleteLedgerFiles := func(deleteTracker bool) {
os.Remove("./ledger.block.sqlite")
os.Remove("./ledger.block.sqlite-shm")
os.Remove("./ledger.block.sqlite-wal")
os.Remove("./ledger.tracker.sqlite")
os.Remove("./ledger.tracker.sqlite-shm")
os.Remove("./ledger.tracker.sqlite-wal")
if deleteTracker {
os.Remove("./ledger.tracker.sqlite")
os.Remove("./ledger.tracker.sqlite-shm")
os.Remove("./ledger.tracker.sqlite-wal")
}
}
// delete current ledger files.
deleteLedgerFiles()
deleteLedgerFiles(true)
cfg := config.GetDefaultLocal()
l, err := ledger.OpenLedger(logging.Base(), "./ledger", false, genesisInitState, cfg)
if err != nil {
reportErrorf("Unable to open ledger : %v", err)
}

defer deleteLedgerFiles()
defer deleteLedgerFiles(!loadOnly)
defer l.Close()

catchupAccessor := ledger.MakeCatchpointCatchupAccessor(l, logging.Base())
Expand All @@ -297,15 +299,17 @@ func makeFileDump(addr string, tarFile string, genesisInitState ledgercore.InitS
reportErrorf("Unable to load catchpoint file into in-memory database : %v", err)
}

dirName := "./" + strings.Split(networkName, ".")[0] + "/" + strings.Split(addr, ".")[0]
outFile, err := os.OpenFile(dirName+"/"+strconv.FormatUint(uint64(round), 10)+".dump", os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0755)
if err != nil {
return err
}
err = printAccountsDatabase("./ledger.tracker.sqlite", fileHeader, outFile, excludedFields.GetSlice())
if err != nil {
return err
if !loadOnly {
dirName := "./" + strings.Split(networkName, ".")[0] + "/" + strings.Split(addr, ".")[0]
outFile, err := os.OpenFile(dirName+"/"+strconv.FormatUint(uint64(round), 10)+".dump", os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0755)
if err != nil {
return err
}
defer outFile.Close()
err = printAccountsDatabase("./ledger.tracker.sqlite", fileHeader, outFile, excludedFields.GetSlice())
if err != nil {
return err
}
}
outFile.Close()
return nil
}
Loading

0 comments on commit e1ae888

Please sign in to comment.