Skip to content

Commit

Permalink
chore: add stats to dedup module (rudderlabs#5190)
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 authored Oct 14, 2024
1 parent 519f3c6 commit f305282
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 29 deletions.
15 changes: 5 additions & 10 deletions router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,21 @@ import (
"testing"
"time"

"github.com/google/uuid"
jsoniter "github.com/json-iterator/go"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"

"github.com/rudderlabs/rudder-server/enterprise/reporting"

jsoniter "github.com/json-iterator/go"

"github.com/tidwall/sjson"

"go.uber.org/mock/gomock"

"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/admin"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/enterprise/reporting"
"github.com/rudderlabs/rudder-server/jobsdb"
mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/backend-config"
mocksJobsDB "github.com/rudderlabs/rudder-server/mocks/jobsdb"
Expand Down
7 changes: 6 additions & 1 deletion services/dedup/badger/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package badger

import (
"context"
"errors"
"fmt"
"strconv"
"sync"
Expand Down Expand Up @@ -77,6 +78,8 @@ func NewBadgerDB(conf *config.Config, stats stats.Stats, path string) *Dedup {
}

func (d *BadgerDB) Get(key string) (int64, bool, error) {
defer d.stats.NewTaggedStat("dedup_get_duration_seconds", stats.TimerType, stats.Tags{"mode": "badger"}).RecordDuration()()

var payloadSize int64
var found bool
err := d.badgerDB.View(func(txn *badger.Txn) error {
Expand All @@ -90,13 +93,15 @@ func (d *BadgerDB) Get(key string) (int64, bool, error) {
}
return nil
})
if err != nil && err != badger.ErrKeyNotFound {
if err != nil && !errors.Is(err, badger.ErrKeyNotFound) {
return 0, false, err
}
return payloadSize, found, nil
}

func (d *BadgerDB) Set(kvs []types.KeyValue) error {
defer d.stats.NewTaggedStat("dedup_commit_duration_seconds", stats.TimerType, stats.Tags{"mode": "badger"}).RecordDuration()()

txn := d.badgerDB.NewTransaction(true)
for _, message := range kvs {
value := strconv.FormatInt(message.Value, 10)
Expand Down
57 changes: 57 additions & 0 deletions services/dedup/dedup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,60 @@ func Benchmark_Dedup(b *testing.B) {

b.Log("db size:", string(out))
}

// Benchmark_DedupModes/MirrorBadger-12 1072 1101878 ns/op
// Benchmark_DedupModes/MirrorScylla-12 566 1986533 ns/op
// Benchmark_DedupModes/Scylla-12 990 1525086 ns/op
// Benchmark_DedupModes/Badger-12 108246 9981 ns/op

func Benchmark_DedupModes(b *testing.B) {
testCases := []struct {
name string
}{
{
name: "Badger",
},
{
name: "Scylla",
},
{
name: "MirrorScylla",
},
{
name: "MirrorBadger",
},
{
name: "Random",
},
}
pool, err := dockertest.NewPool("")
require.NoError(b, err)
keySpace := strings.ToUpper(rand.String(5))
table := rand.String(5)
resource, err := scylla.Setup(pool, b, scylla.WithKeyspace(keySpace))
require.NoError(b, err)
for _, tc := range testCases {
config.Reset()
logger.Reset()
misc.Init()
dbPath := b.TempDir()
conf := config.New()
conf.Set("Scylla.Hosts", resource.URL)
conf.Set("Scylla.Keyspace", keySpace)
conf.Set("Scylla.TableName", table)
b.Setenv("RUDDER_TMPDIR", dbPath)
conf.Set("Dedup.Mode", tc.name)
d, err := dedup.New(conf, stats.Default)
require.Nil(b, err)
b.Run(tc.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
key := uuid.New().String()
_, _, err = d.Get(types.KeyValue{Key: key, Value: int64(i + 1), WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"})
require.NoError(b, err)
err = d.Commit([]string{key})
require.NoError(b, err)
}
})
d.Close()
}
}
4 changes: 4 additions & 0 deletions services/dedup/mirrorBadger/mirrorBadger.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func (mb *MirrorBadger) Close() {
}

func (mb *MirrorBadger) Get(kv types.KeyValue) (bool, int64, error) {
defer mb.stat.NewTaggedStat("dedup_get_duration_seconds", stats.TimerType, stats.Tags{"mode": "mirror_badger"}).RecordDuration()()

_, _, err := mb.scylla.Get(kv)
if err != nil {
mb.stat.NewTaggedStat("dedup_mirror_badger_get_error", stats.CountType, stats.Tags{}).Increment()
Expand All @@ -41,6 +43,8 @@ func (mb *MirrorBadger) Get(kv types.KeyValue) (bool, int64, error) {
}

func (mb *MirrorBadger) Commit(keys []string) error {
defer mb.stat.NewTaggedStat("dedup_commit_duration_seconds", stats.TimerType, stats.Tags{"mode": "mirror_badger"}).RecordDuration()()

_ = mb.scylla.Commit(keys)
return mb.badger.Commit(keys)
}
4 changes: 4 additions & 0 deletions services/dedup/mirrorScylla/mirrorScylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func (ms *MirrorScylla) Close() {
}

func (ms *MirrorScylla) Get(kv types.KeyValue) (bool, int64, error) {
defer ms.stat.NewTaggedStat("dedup_get_duration_seconds", stats.TimerType, stats.Tags{"mode": "mirror_scylla"}).RecordDuration()()

_, _, err := ms.badger.Get(kv)
if err != nil {
ms.stat.NewTaggedStat("dedup_mirror_scylla_get_error", stats.CountType, stats.Tags{}).Increment()
Expand All @@ -41,6 +43,8 @@ func (ms *MirrorScylla) Get(kv types.KeyValue) (bool, int64, error) {
}

func (ms *MirrorScylla) Commit(keys []string) error {
defer ms.stat.NewTaggedStat("dedup_commit_duration_seconds", stats.TimerType, stats.Tags{"mode": "mirror_scylla"}).RecordDuration()()

_ = ms.badger.Commit(keys)
return ms.scylla.Commit(keys)
}
6 changes: 5 additions & 1 deletion services/dedup/scylla/scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func (d *ScyllaDB) Close() {
}

func (d *ScyllaDB) Get(kv types.KeyValue) (bool, int64, error) {
// Create the table if it doesn't exist
defer d.stat.NewTaggedStat("dedup_get_duration_seconds", stats.TimerType, stats.Tags{"mode": "scylla"}).RecordDuration()()

var err error
d.cacheMu.Lock()
defer d.cacheMu.Unlock()
Expand All @@ -59,6 +60,8 @@ func (d *ScyllaDB) Get(kv types.KeyValue) (bool, int64, error) {
}

func (d *ScyllaDB) Commit(keys []string) error {
defer d.stat.NewTaggedStat("dedup_commit_duration_seconds", stats.TimerType, stats.Tags{"mode": "scylla"}).RecordDuration()()

d.cacheMu.Lock()
kvs := make([]types.KeyValue, len(keys))
for i, key := range keys {
Expand Down Expand Up @@ -94,6 +97,7 @@ func (d *ScyllaDB) Commit(keys []string) error {
func New(conf *config.Config, stats stats.Stats) (*ScyllaDB, error) {
cluster := gocql.NewCluster(conf.GetReloadableStringSliceVar([]string{"localhost:9042"}, "Scylla.Hosts").Load()...)
cluster.Consistency = gocql.Quorum
cluster.NumConns = conf.GetInt("Scylla.NumConns", 2)
cluster.RetryPolicy = &gocql.ExponentialBackoffRetryPolicy{
NumRetries: conf.GetInt("Scylla.NumRetries", 3),
Min: conf.GetDuration("Scylla.MinRetry", 100, time.Millisecond),
Expand Down
14 changes: 7 additions & 7 deletions warehouse/archive/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type uploadRecord struct {
endStagingFileId int64
startLoadFileID int64
endLoadFileID int64
uploadMetdata json.RawMessage
uploadMetadata json.RawMessage
workspaceID string
}

Expand Down Expand Up @@ -101,7 +101,7 @@ func New(
}

func (a *Archiver) backupRecords(ctx context.Context, args backupRecordsArgs) (backupLocation string, err error) {
a.log.Infof("[Archiver]: Starting backupRecords for uploadId: %s, sourceId: %s, destinationId: %s, tableName: %s,",
a.log.Infof("[Archiver]: Starting backupRecords for uploadId: %d, sourceId: %s, destinationId: %s, tableName: %s,",
args.uploadID, args.sourceID, args.destID, args.tableName,
)

Expand Down Expand Up @@ -164,7 +164,7 @@ func (a *Archiver) backupRecords(ctx context.Context, args backupRecordsArgs) (b
}

backupLocation, err = tableJSONArchiver.Do()
a.log.Infof(`[Archiver]: Completed backupRecords for uploadId: %s, sourceId: %s, destinationId: %s, tableName: %s,`,
a.log.Infof(`[Archiver]: Completed backupRecords for uploadId: %d, sourceId: %s, destinationId: %s, tableName: %s,`,
args.uploadID, args.sourceID, args.destID, args.tableName,
)

Expand Down Expand Up @@ -311,7 +311,7 @@ func (a *Archiver) archiveUploads(ctx context.Context, maxArchiveLimit int) erro
&u.endStagingFileId,
&u.startLoadFileID,
&u.endLoadFileID,
&u.uploadMetdata,
&u.uploadMetadata,
&u.workspaceID,
)
if err != nil {
Expand Down Expand Up @@ -372,7 +372,7 @@ func (a *Archiver) archiveUploads(ctx context.Context, maxArchiveLimit int) erro
continue
}

hasUsedRudderStorage := a.usedRudderStorage(u.uploadMetdata)
hasUsedRudderStorage := a.usedRudderStorage(u.uploadMetadata)

// delete load file records
if err := a.deleteLoadFileRecords(ctx, txn, stagingFileIDs, hasUsedRudderStorage); err != nil {
Expand All @@ -383,14 +383,14 @@ func (a *Archiver) archiveUploads(ctx context.Context, maxArchiveLimit int) erro
}

// update upload metadata
u.uploadMetdata, _ = sjson.SetBytes(u.uploadMetdata, "archivedStagingAndLoadFiles", true)
u.uploadMetadata, _ = sjson.SetBytes(u.uploadMetadata, "archivedStagingAndLoadFiles", true)
stmt := fmt.Sprintf(`
UPDATE %s
SET metadata = $1
WHERE id = $2;`,
pq.QuoteIdentifier(warehouseutils.WarehouseUploadsTable),
)
_, err = txn.ExecContext(ctx, stmt, u.uploadMetdata, u.uploadID)
_, err = txn.ExecContext(ctx, stmt, u.uploadMetadata, u.uploadID)
if err != nil {
a.log.Errorf(`[Archiver]: Error running txn while archiving upload files. Query: %s Error: %v`, stmt, err)
_ = txn.Rollback()
Expand Down
16 changes: 6 additions & 10 deletions warehouse/integrations/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,17 @@ import (
"strings"
"time"

"github.com/rudderlabs/rudder-server/warehouse/integrations/tunnelling"

"github.com/rudderlabs/rudder-go-kit/stats"

sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/internal/service/loadfiles/downloader"
"github.com/rudderlabs/rudder-server/warehouse/logfield"

"github.com/rudderlabs/rudder-server/warehouse/internal/model"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/warehouse/client"
sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/integrations/tunnelling"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/internal/service/loadfiles/downloader"
"github.com/rudderlabs/rudder-server/warehouse/logfield"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

Expand Down

0 comments on commit f305282

Please sign in to comment.