Skip to content

Commit

Permalink
feat: add ChildParallelCounter
Browse files Browse the repository at this point in the history
  • Loading branch information
Laisky committed Aug 27, 2019
1 parent 9a4a488 commit c3a2a13
Show file tree
Hide file tree
Showing 4 changed files with 462 additions and 2 deletions.
3 changes: 2 additions & 1 deletion configserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,5 +134,6 @@ func TestConfigSrv(t *testing.T) {
}

func init() {
utils.SetupLogger("debug")
// utils.SetupLogger("debug")
utils.SetupLogger("info")
}
107 changes: 107 additions & 0 deletions counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,110 @@ func (c *Uint32Counter) Count() uint32 {
func (c *Uint32Counter) CountN(n uint32) uint32 {
return atomic.AddUint32(&c.n, n)
}

// ---------------------------------------------------

const defaultQuoteStep = 1000

type ParallelCounter struct {
sync.Mutex
n,
quoteStep,
rotatePoint int64
}

type ChildParallelCounter struct {
sync.Mutex
p *ParallelCounter
n, maxN int64
}

func NewParallelCounter(quoteStep, rotatePoint int64) (*ParallelCounter, error) {
Logger.Debug("NewParallelCounter", zap.Int64("quoteStep", quoteStep), zap.Int64("rotatePoint", rotatePoint))
if quoteStep <= 0 {
quoteStep = defaultQuoteStep
}
if rotatePoint <= 0 || quoteStep >= rotatePoint {
return nil, fmt.Errorf("rotate should greater than quoteStep and 0")
}

return &ParallelCounter{
n: 0,
quoteStep: quoteStep,
rotatePoint: rotatePoint,
}, nil
}

func (c *ParallelCounter) GetQuote(step int64) (from, to int64) {
if step <= 0 {
step = c.quoteStep
}
if c.rotatePoint > 0 && step > c.rotatePoint {
step = step % c.rotatePoint
}

c.Lock()
defer c.Unlock()

from = atomic.LoadInt64(&c.n)
to = atomic.AddInt64(&c.n, step) - 1
if c.rotatePoint > 0 && to > c.rotatePoint { // need rotate
from = 0
to = step
}

Logger.Debug("get quote",
zap.Int64("step", step),
zap.Int64("from", from),
zap.Int64("to", to))
return
}

func (c *ParallelCounter) GetChild() *ChildParallelCounter {
cc := &ChildParallelCounter{
p: c,
}
cc.n, cc.maxN = c.GetQuote(c.quoteStep)
return cc
}

func (c *ChildParallelCounter) Get() int64 {
return atomic.LoadInt64(&c.n)
}

func (c *ChildParallelCounter) Count() (r int64) {
r = atomic.AddInt64(&c.n, 1)
if r > atomic.LoadInt64(&c.maxN) {
c.Lock()
defer c.Unlock()
if r > c.p.rotatePoint {
r = r % c.p.rotatePoint
}
var (
cn = atomic.LoadInt64(&c.n)
cmax = atomic.LoadInt64(&c.maxN)
)
if r < cmax && r >= cn { // already get new quote
cn = atomic.AddInt64(&c.n, 1)
r = cn
}

for r > cmax { // double check
cn, cmax = c.p.GetQuote(0)
r = cn
}

atomic.StoreInt64(&c.n, cn)
atomic.StoreInt64(&c.maxN, cmax)
}

return r
}

func (c *ChildParallelCounter) CountN(n int64) (r int64) {
for i := int64(0); i < n-1; i++ {
c.Count()
}

return c.Count()
}
Loading

0 comments on commit c3a2a13

Please sign in to comment.