Skip to content

Commit

Permalink
add enq retries
Browse files Browse the repository at this point in the history
fix locking on enq mtc mnt
update square throttler
  • Loading branch information
1pkg committed Oct 11, 2020
1 parent ecbb3a0 commit 1cb58f3
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 80 deletions.
115 changes: 64 additions & 51 deletions enqueuers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,55 @@ type Enqueuer interface {
}

type enqrabbit struct {
lock sync.Mutex
memconnect Runnable
newrpub func([]byte) Runnable
connection *amqp.Connection
channel *amqp.Channel
queue string
exchange string
}

func NewEnqueuerRabbit(url string, queue string, cahce time.Duration) Enqueuer {
func NewEnqueuerRabbit(url string, queue string, retries uint64) Enqueuer {
exchange := fmt.Sprintf("gohalt_exchange_%s", uuid.NewV4())
enq := enqrabbit{queue: queue, exchange: exchange}
var lock sync.Mutex
enq.memconnect = cached(cahce, func(ctx context.Context) error {
lock.Lock()
defer lock.Unlock()
if err := enq.close(ctx); err != nil {
return err
}
return enq.connect(ctx, url)
enq := &enqrabbit{}
enq.memconnect = cached(0, func(ctx context.Context) error {
return enq.connect(ctx, url, queue, exchange)
})
enq.newrpub = func(message []byte) Runnable {
return retried(retries, func(ctx context.Context) error {
if err := enq.channel.Publish(
exchange,
queue,
false,
false,
amqp.Publishing{
DeliveryMode: 2,
AppId: "gohalt_enqueue",
MessageId: fmt.Sprintf("gohalt_enqueue_%s", uuid.NewV4()),
Timestamp: time.Now().UTC(),
Body: message,
},
); err != nil {
// on error refresh connection just in case
_ = enq.close(ctx)
_ = enq.connect(ctx, url, queue, exchange)
return err
}
return nil
})
}
return enq
}

func (enq enqrabbit) Enqueue(ctx context.Context, message []byte) error {
func (enq *enqrabbit) Enqueue(ctx context.Context, message []byte) error {
enq.lock.Lock()
defer enq.lock.Unlock()
if err := enq.memconnect(ctx); err != nil {
return err
}
return enq.channel.Publish(
enq.exchange,
enq.queue,
false,
false,
amqp.Publishing{
DeliveryMode: 2,
AppId: "gohalt_enqueue",
MessageId: fmt.Sprintf("gohalt_enqueue_%s", uuid.NewV4()),
Timestamp: time.Now().UTC(),
Body: message,
},
)
return enq.newrpub(message)(ctx)
}

func (enq enqrabbit) close(context.Context) error {
func (enq *enqrabbit) close(context.Context) error {
if enq.channel == nil || enq.connection == nil {
return nil
}
Expand All @@ -67,7 +74,7 @@ func (enq enqrabbit) close(context.Context) error {
return enq.connection.Close()
}

func (enq enqrabbit) connect(_ context.Context, url string) error {
func (enq *enqrabbit) connect(_ context.Context, url string, queue string, exchange string) error {
connection, err := amqp.Dial(url)
if err != nil {
return err
Expand All @@ -76,13 +83,13 @@ func (enq enqrabbit) connect(_ context.Context, url string) error {
if err != nil {
return err
}
if err := channel.ExchangeDeclare(enq.exchange, "direct", true, true, false, false, nil); err != nil {
if err := channel.ExchangeDeclare(exchange, "direct", true, true, false, false, nil); err != nil {
return err
}
if _, err := channel.QueueDeclare(enq.queue, true, false, false, false, nil); err != nil {
if _, err := channel.QueueDeclare(queue, true, false, false, false, nil); err != nil {
return err
}
if err := channel.QueueBind(enq.queue, enq.queue, enq.exchange, false, nil); err != nil {
if err := channel.QueueBind(queue, queue, exchange, false, nil); err != nil {
return err
}
enq.connection = connection
Expand All @@ -91,46 +98,52 @@ func (enq enqrabbit) connect(_ context.Context, url string) error {
}

type enqkafka struct {
lock sync.Mutex
memconnect Runnable
newrpub func([]byte) Runnable
connection *kafka.Conn
}

func NewEnqueuerKafka(net string, url string, topic string, cache time.Duration) Enqueuer {
enq := enqkafka{}
var lock sync.Mutex
enq.memconnect = cached(cache, func(ctx context.Context) error {
lock.Lock()
defer lock.Unlock()
if err := enq.close(ctx); err != nil {
return err
}
func NewEnqueuerKafka(net string, url string, topic string, retries uint64) Enqueuer {
enq := &enqkafka{}
enq.memconnect = cached(0, func(ctx context.Context) error {
return enq.connect(ctx, net, url, topic)
})
enq.newrpub = func(message []byte) Runnable {
return retried(retries, func(ctx context.Context) error {
if _, err := enq.connection.WriteMessages(kafka.Message{
Time: time.Now().UTC(),
Key: []byte(fmt.Sprintf("gohalt_enqueue_%s", uuid.NewV4())),
Value: message,
}); err != nil {
// on error refresh connection just in case
_ = enq.close(ctx)
_ = enq.connect(ctx, net, url, topic)
return err
}
return nil
})
}
return enq
}

func (enq enqkafka) Enqueue(ctx context.Context, message []byte) error {
func (enq *enqkafka) Enqueue(ctx context.Context, message []byte) error {
enq.lock.Lock()
defer enq.lock.Unlock()
if err := enq.memconnect(ctx); err != nil {
return err
}
if _, err := enq.connection.WriteMessages(kafka.Message{
Time: time.Now().UTC(),
Key: []byte(fmt.Sprintf("gohalt_enqueue_%s", uuid.NewV4())),
Value: message,
}); err != nil {
return err
}
return nil
return enq.newrpub(message)(ctx)
}

func (enq enqkafka) close(context.Context) error {
func (enq *enqkafka) close(context.Context) error {
if enq.connection == nil {
return nil
}
return enq.connection.Close()
}

func (enq enqkafka) connect(ctx context.Context, net string, url string, topic string) error {
func (enq *enqkafka) connect(ctx context.Context, net string, url string, topic string) error {
connection, err := kafka.DialLeader(ctx, net, url, topic, 0)
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,15 @@ type Metric interface {
}

type mtcprometheus struct {
lock sync.Mutex
mempull Runnable
value bool
}

func NewMetricPrometheus(url string, query string, cache time.Duration, mstep time.Duration) Metric {
mtc := &mtcprometheus{}
var lock sync.Mutex
var api prometheus.API
mtc.mempull = cached(cache, func(ctx context.Context) (err error) {
lock.Lock()
defer lock.Unlock()
if api != nil {
return mtc.pull(ctx, api, cache, mstep, query)
}
Expand All @@ -40,13 +38,15 @@ func NewMetricPrometheus(url string, query string, cache time.Duration, mstep ti
}

func (mtc *mtcprometheus) Query(ctx context.Context) (bool, error) {
mtc.lock.Lock()
defer mtc.lock.Unlock()
if err := mtc.mempull(ctx); err != nil {
return mtc.value, err
}
return mtc.value, nil
}

func (mtc mtcprometheus) connect(_ context.Context, url string) (prometheus.API, error) {
func (mtc *mtcprometheus) connect(_ context.Context, url string) (prometheus.API, error) {
client, err := client.NewClient(
client.Config{
Address: url,
Expand Down
14 changes: 7 additions & 7 deletions monitors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,29 @@ type Stats struct {
}

type mntsys struct {
lock sync.Mutex
memsync Runnable
stats Stats
}

func NewMonitorSystem(cache time.Duration) Monitor {
func NewMonitorSystem(cache time.Duration, tp time.Duration) Monitor {
mnt := &mntsys{}
var lock sync.Mutex
mnt.memsync = cached(cache, func(ctx context.Context) error {
lock.Lock()
defer lock.Unlock()
return mnt.sync(ctx)
return mnt.sync(ctx, tp)
})
return mnt
}

func (mnt *mntsys) Stats(ctx context.Context) (Stats, error) {
mnt.lock.Lock()
defer mnt.lock.Unlock()
if err := mnt.memsync(ctx); err != nil {
return mnt.stats, err
}
return mnt.stats, nil
}

func (mnt *mntsys) sync(context.Context) error {
func (mnt *mntsys) sync(_ context.Context, tp time.Duration) error {
var memstats runtime.MemStats
runtime.ReadMemStats(&memstats)
mnt.stats.MEMAlloc = memstats.Alloc
Expand All @@ -52,7 +52,7 @@ func (mnt *mntsys) sync(context.Context) error {
mnt.stats.CPUPause += p
}
mnt.stats.CPUPause /= 256
if percents, err := cpu.Percent(time.Millisecond, true); err != nil {
if percents, err := cpu.Percent(tp, true); err != nil {
for _, p := range percents {
mnt.stats.CPUUsage += p
}
Expand Down
41 changes: 34 additions & 7 deletions runnables.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package gohalt
import (
"context"
"sync"
"sync/atomic"
"time"
)

var DefaultRetriedDuration = 100 * time.Millisecond

type Runnable func(context.Context) error

func use(err error) Runnable {
Expand Down Expand Up @@ -42,29 +43,55 @@ func delayed(after time.Duration, run Runnable) Runnable {
func locked(run Runnable) Runnable {
var lock uint64
return func(ctx context.Context) error {
defer atomic.AddUint64(&lock, ^uint64(0))
if atomic.AddUint64(&lock, 1) > 1 {
defer atomicBDecr(&lock)
if atomicBIncr(&lock) > 1 {
return nil
}
return run(ctx)
}
}

func cached(cache time.Duration, run Runnable) Runnable {
var ts time.Time
var lock uint64
return func(ctx context.Context) error {
now := time.Now().UTC()
if now.Sub(ts) > cache {
ts := atomicGet(&lock)
now := uint64(time.Now().UTC().Unix())
// on first call run no matters what
if ts == 0 {
if err := run(ctx); err != nil {
return err
}
atomicSet(&lock, now)
return nil
}
// then use cached timestamp
if cache > 0 && time.Duration(now-ts) > cache {
if err := run(ctx); err != nil {
return err
}
ts = now
atomicSet(&lock, now)
return nil
}
return nil
}
}

func retried(retries uint64, run Runnable) Runnable {
thr := NewThrottlerSquare(DefaultRetriedDuration, 0, false)
return func(ctx context.Context) (err error) {
// no need neither to check error
// nor to call release counterpart
for i := uint64(0); i < retries; i++ {
_ = thr.Acquire(ctx)
err = run(ctx)
if err == nil {
return
}
}
return
}
}

func once(run Runnable) Runnable {
var once sync.Once
return func(ctx context.Context) (err error) {
Expand Down
3 changes: 2 additions & 1 deletion throttlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewThrottlerSquare(duration time.Duration, limit time.Duration, reset bool)
func (thr *tsquare) Acquire(context.Context) error {
current := atomicBIncr(&thr.current)
duration := thr.duration * time.Duration(current*current)
if duration > thr.limit {
if thr.limit > 0 && duration > thr.limit {
duration = thr.limit
if thr.reset {
atomicSet(&thr.current, 0)
Expand All @@ -87,6 +87,7 @@ func (thr *tsquare) Acquire(context.Context) error {
}

func (thr *tsquare) Release(context.Context) error {
atomicBDecr(&thr.current)
return nil
}

Expand Down
Loading

0 comments on commit 1cb58f3

Please sign in to comment.