Skip to content

Commit

Permalink
1. separate timer in timer queue processor into dedicated file, add UT (
Browse files Browse the repository at this point in the history
cadence-workflow#602)

* separate timer in timer queue processor into dedicated file, add UT
* increase history cache size from 256 to 512, note, this value was previously 1024
  • Loading branch information
wxing1292 authored Mar 16, 2018
1 parent edb556c commit 1d939e6
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 126 deletions.
2 changes: 1 addition & 1 deletion service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config {
return &Config{
NumberOfShards: numberOfShards,
HistoryCacheInitialSize: 128,
HistoryCacheMaxSize: 256,
HistoryCacheMaxSize: 512,
HistoryCacheTTL: time.Hour,
RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range
AcquireShardInterval: time.Minute,
Expand Down
122 changes: 122 additions & 0 deletions service/history/timerGate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package history

import (
"sync"
"time"
)

type (
// TimerGate interface
TimerGate interface {
// FireChan return the channel which will be fired when time is up
FireChan() <-chan struct{}
// FireAfter check will the timer get fired after a certain time
FireAfter(now time.Time) bool
// Update update the timer gate, return true if update is a success
// success means timer is idle or timer is set with a sooner time to fire
Update(nextTime time.Time) bool
// Close shutdown the timer
Close()
}

// TimerGateImpl is an timer implementation,
// which basically is an wrrapper of golang's timer and
// additional feature
TimerGateImpl struct {
// the channel which will be used to proxy the fired timer
fireChan chan struct{}
closeChan chan struct{}

// lock for timer and next wake up time
sync.Mutex
// the actual timer which will fire
timer *time.Timer
// variable indicating when the above timer will fire
nextWakeupTime time.Time
}
)

// NewTimerGate create a new timer gate instance
func NewTimerGate() TimerGate {
timer := &TimerGateImpl{
timer: time.NewTimer(0),
nextWakeupTime: time.Time{},
fireChan: make(chan struct{}),
closeChan: make(chan struct{}),
}

go func() {
defer close(timer.fireChan)
defer timer.timer.Stop()
loop:
for {
select {
case <-timer.timer.C:
// re-transmit on gateC
timer.fireChan <- struct{}{}

case <-timer.closeChan:
// closed; cleanup and quit
break loop
}
}
}()

return timer
}

// FireChan return the channel which will be fired when time is up
func (timerGate *TimerGateImpl) FireChan() <-chan struct{} {
return timerGate.fireChan
}

// FireAfter check will the timer get fired after a certain time
func (timerGate *TimerGateImpl) FireAfter(now time.Time) bool {
return timerGate.nextWakeupTime.After(now)
}

// Update update the timer gate, return true if update is a success
// success means timer is idle or timer is set with a sooner time to fire
func (timerGate *TimerGateImpl) Update(nextTime time.Time) bool {
now := time.Now()

timerGate.Lock()
defer timerGate.Unlock()
if !timerGate.FireAfter(now) || timerGate.nextWakeupTime.After(nextTime) {
// if timer will not fire or next wake time is after the "next"
// then we need to update the timer to fire
timerGate.nextWakeupTime = nextTime
// reset timer to fire when the next message should be made 'visible'
timerGate.timer.Reset(nextTime.Sub(now))

// Notifies caller that next notification is reset to fire at passed in 'next' visibility time
return true
}

return false
}

// Close shutdown the timer
func (timerGate *TimerGateImpl) Close() {
close(timerGate.closeChan)
}
114 changes: 114 additions & 0 deletions service/history/timerGate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package history

import (
"os"
"testing"
"time"

log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/suite"
)

type (
timerSuite struct {
suite.Suite
timerGate TimerGate
}
)

func TestTimerSuite(t *testing.T) {
s := new(timerSuite)
suite.Run(t, s)
}

func (s *timerSuite) SetupSuite() {
if testing.Verbose() {
log.SetOutput(os.Stdout)
}

}

func (s *timerSuite) TearDownSuite() {

}

func (s *timerSuite) SetupTest() {
s.timerGate = NewTimerGate()
}

func (s *timerSuite) TearDownTest() {
s.timerGate.Close()
}

func (s *timerSuite) TestTimerFire() {
now := time.Now()
timerDelay := now.Add(1 * time.Second)
deadlineDelay := now.Add(2 * time.Second)
s.timerGate.Update(timerDelay)

select {
case <-s.timerGate.FireChan():
case <-time.NewTimer(deadlineDelay.Sub(now)).C:
s.Fail("timer should fire before test deadline")
}
}

func (s *timerSuite) TestTimerFireAfterUpdate_Updated() {
now := time.Now()
timerDelay := now.Add(5 * time.Second)
updatedTimerDelay := now.Add(1 * time.Second)
deadlineDelay := now.Add(3 * time.Second)
s.timerGate.Update(timerDelay)
s.True(s.timerGate.Update(updatedTimerDelay))

select {
case <-s.timerGate.FireChan():
case <-time.NewTimer(deadlineDelay.Sub(now)).C:
s.Fail("timer should fire before test deadline")
}
}

func (s *timerSuite) TestTimerFireAfterUpdate_NotUpdated() {
now := time.Now()
timerDelay := now.Add(1 * time.Second)
updatedTimerDelay := now.Add(3 * time.Second)
deadlineDelay := now.Add(2 * time.Second)
s.timerGate.Update(timerDelay)
s.False(s.timerGate.Update(updatedTimerDelay))

select {
case <-s.timerGate.FireChan():
case <-time.NewTimer(deadlineDelay.Sub(now)).C:
s.Fail("timer should fire before test deadline")
}
}

func (s *timerSuite) TestTimerWillFire() {
now := time.Now()
timerDelay := now.Add(2 * time.Second)
timeBeforeTimer := now.Add(1 * time.Second)
timeAfterTimer := now.Add(3 * time.Second)
s.timerGate.Update(timerDelay)
s.True(s.timerGate.FireAfter(timeBeforeTimer))
s.False(s.timerGate.FireAfter(timeAfterTimer))
}
Loading

0 comments on commit 1d939e6

Please sign in to comment.