Skip to content

Commit

Permalink
Allow configuring start ID and number of messages
Browse files Browse the repository at this point in the history
  • Loading branch information
xperimental committed Dec 20, 2024
1 parent 61d88cf commit 68570e0
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 20 deletions.
4 changes: 3 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ type ServerConfig struct {
}

type SourceConfig struct {
LogsPerSecond float64 `yaml:"logsPerSecond"`
StartID uint64 `yaml:"startID"`
LogsPerSecond float64 `yaml:"logsPerSecond"`
NumberOfMessages uint64 `yaml:"numberOfMessages"`
}

type SinkType string
Expand Down
2 changes: 1 addition & 1 deletion internal/sink/lokiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (s *lokiClientSink) receiveMessages(ctx context.Context) error {
continue
}

msgId, err := strconv.ParseInt(idMatch[3:], 10, 64)
msgId, err := strconv.ParseUint(idMatch[3:], 10, 64)
if err != nil {
s.log.Errorf("Error parsing message id: %s", err)
continue
Expand Down
21 changes: 19 additions & 2 deletions internal/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type source struct {
log logrus.FieldLogger
storage *storage.Storage
output io.Writer

messageCount uint64
messageID uint64
}

func New(cfg config.SourceConfig, log *logrus.Logger, storage *storage.Storage) component.Component {
Expand All @@ -45,23 +48,37 @@ func (s *source) Start(ctx context.Context, wg *sync.WaitGroup, errCh chan<- err
return
}

s.messageID = s.cfg.StartID

interval := time.Duration(float64(time.Second) / s.cfg.LogsPerSecond)
s.log.Debugf("Source interval: %v", interval)
ticker := time.NewTicker(interval)
defer ticker.Stop()

s.log.Debug("Started source.")
loop:
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
msg := s.storage.Create()
case ts := <-ticker.C:
msg := s.storage.Create(s.nextID(), ts)

if _, err := s.output.Write([]byte(msg)); err != nil {
errCh <- fmt.Errorf("error writing to output: %w", err)
}

s.messageCount++
if s.cfg.NumberOfMessages > 0 && s.messageCount >= s.cfg.NumberOfMessages {
s.log.Debugf("Reached number of messages limit: %v", s.cfg.NumberOfMessages)
break loop
}
}
}
}()
}

func (s *source) nextID() uint64 {
s.messageID++
return s.messageID
}
34 changes: 18 additions & 16 deletions internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,27 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

type StoreOp func(messages map[int64]message) error
type StoreOp func(messages map[uint64]message) error

type Storage struct {
log logrus.FieldLogger
clock func() time.Time
startupTime time.Time

ops chan StoreOp
messages map[int64]message
nextID atomic.Int64
messages map[uint64]message
metricCount prometheus.Counter
metricDelay prometheus.Histogram
}

type message struct {
ID int64
ID uint64
Timestamp time.Time
Seen bool
SeenTimestamp time.Time
Expand All @@ -42,8 +40,7 @@ func New(log logrus.FieldLogger, clock func() time.Time, registry prometheus.Reg
clock: clock,
startupTime: clock(),
ops: make(chan StoreOp, 1),
messages: map[int64]message{},
nextID: atomic.Int64{},
messages: map[uint64]message{},
metricCount: prometheus.NewCounter(prometheus.CounterOpts{
Name: "roundtrip_storage_messages_produced_total",
Help: "Total number of messages produced by storage",
Expand Down Expand Up @@ -90,13 +87,13 @@ func (s *Storage) Startup() time.Time {
return s.startupTime
}

func (s *Storage) Create() string {
func (s *Storage) Create(id uint64, time time.Time) string {
msg := message{
ID: s.nextID.Add(1),
Timestamp: s.clock(),
ID: id,
Timestamp: time,
}

s.ops <- func(messages map[int64]message) error {
s.ops <- func(messages map[uint64]message) error {
messages[msg.ID] = msg
return nil
}
Expand All @@ -107,17 +104,22 @@ func (s *Storage) Create() string {

func (s *Storage) Count() int {
resCh := make(chan int, 1)
s.ops <- func(messages map[int64]message) error {
s.ops <- func(messages map[uint64]message) error {
resCh <- len(messages)
return nil
}

return <-resCh
}

func (s *Storage) Seen(id int64, t time.Time) {
s.ops <- func(messages map[int64]message) error {
msg := s.messages[id]
func (s *Storage) Seen(id uint64, t time.Time) {
s.ops <- func(messages map[uint64]message) error {
msg, ok := s.messages[id]
if !ok {
s.log.Warnf("Found unknown message with ID %v", id)
return nil
}

msg.Seen = true
msg.SeenTimestamp = t
s.messages[id] = msg
Expand All @@ -131,7 +133,7 @@ func (s *Storage) Seen(id int64, t time.Time) {
}

func (s *Storage) ResetSeen() {
s.ops <- func(messages map[int64]message) error {
s.ops <- func(messages map[uint64]message) error {
for id := range s.messages {
msg := s.messages[id]
msg.Seen = false
Expand Down

0 comments on commit 68570e0

Please sign in to comment.