Skip to content

Commit

Permalink
Bug: aggregates should use chain_id for their where clause (ava-labs#173
Browse files Browse the repository at this point in the history
)
  • Loading branch information
tasinco authored Nov 20, 2020
1 parent a8ad846 commit 4e23513
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 7 deletions.
15 changes: 13 additions & 2 deletions services/indexes/avax/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,9 @@ func (r *Reader) Aggregate(ctx context.Context, params *params.AggregateParams)

var builder *dbr.SelectStmt

params.Version = 0

switch params.Version {
// new requests v=1 use the avm_asset_aggregation tables
case 1:
columns := []string{
"CAST(COALESCE(SUM(avm_asset_aggregation.transaction_volume),0) AS CHAR) as transaction_volume",
Expand All @@ -152,6 +153,10 @@ func (r *Reader) Aggregate(ctx context.Context, params *params.AggregateParams)
Where("avm_asset_aggregation.aggregate_ts >= ?", params.ListParams.StartTime).
Where("avm_asset_aggregation.aggregate_ts < ?", params.ListParams.EndTime)

if len(params.ChainIDs) != 0 {
builder.Where("avm_asset_aggregation.chain_id IN ?", params.ChainIDs)
}

if params.AssetID != nil {
builder.Where("avm_asset_aggregation.asset_id = ?", params.AssetID.String())
}
Expand Down Expand Up @@ -179,6 +184,10 @@ func (r *Reader) Aggregate(ctx context.Context, params *params.AggregateParams)
Where("avm_outputs.created_at >= ?", params.ListParams.StartTime).
Where("avm_outputs.created_at < ?", params.ListParams.EndTime)

if len(params.ChainIDs) != 0 {
builder.Where("avm_outputs.chain_id IN ?", params.ChainIDs)
}

if params.AssetID != nil {
builder.Where("avm_outputs.asset_id = ?", params.AssetID.String())
}
Expand Down Expand Up @@ -754,7 +763,7 @@ func (r *Reader) searchByShortID(ctx context.Context, id ids.ShortID) (*models.S
return &models.SearchResults{}, nil
}

func (r *Reader) dressAddresses(ctx context.Context, dbRunner dbr.SessionRunner, addrs []*models.AddressInfo, version int, chainIDs []string) error {
func (r *Reader) dressAddresses(ctx context.Context, dbRunner dbr.SessionRunner, addrs []*models.AddressInfo, _ int, chainIDs []string) error {
if len(addrs) == 0 {
return nil
}
Expand All @@ -775,6 +784,8 @@ func (r *Reader) dressAddresses(ctx context.Context, dbRunner dbr.SessionRunner,
models.AssetInfo
}

version := 0

switch version {
case 1:
builder := dbRunner.
Expand Down
10 changes: 6 additions & 4 deletions services/indexes/pvm/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,12 @@ func (w *Writer) Consume(ctx context.Context, c services.Consumable) error {
job := w.conns.Stream().NewJob("index")
sess := w.conns.DB().NewSessionForEventReceiver(job)

// fire and forget..
// update the created_at on the state table if we have an earlier date in ctx.Time().
// which means we need to re-run aggregation calculations from this earlier date.
_, _ = models.UpdateAvmAssetAggregationLiveStateTimestamp(ctx, sess, time.Unix(c.Timestamp(), 0))
if false {
// fire and forget..
// update the created_at on the state table if we have an earlier date in ctx.Time().
// which means we need to re-run aggregation calculations from this earlier date.
_, _ = models.UpdateAvmAssetAggregationLiveStateTimestamp(ctx, sess, time.Unix(c.Timestamp(), 0))
}

// Create w tx
dbTx, err := sess.Begin()
Expand Down
4 changes: 3 additions & 1 deletion stream/indexer_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,9 @@ func (t *ProducerTasker) replaceAvmAggregateCount(avmAggregatesCount models.AvmA
func (t *ProducerTasker) Start() {
t.initMetrics()

go initRefreshAggregatesTick(t)
if false {
go initRefreshAggregatesTick(t)
}
}

func (t *ProducerTasker) initMetrics() {
Expand Down

0 comments on commit 4e23513

Please sign in to comment.