Skip to content

Commit

Permalink
Merge branch 'master' into parser-options
Browse files Browse the repository at this point in the history
  • Loading branch information
lukescott committed Sep 14, 2016
2 parents 6c23101 + 783cfcb commit 470ac64
Show file tree
Hide file tree
Showing 6 changed files with 413 additions and 94 deletions.
61 changes: 52 additions & 9 deletions cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
package cron

import (
"log"
"runtime"
"sort"
"time"
)
Expand All @@ -16,6 +18,8 @@ type Cron struct {
add chan *Entry
snapshot chan []*Entry
running bool
ErrorLog *log.Logger
location *time.Location
}

// Job is an interface for submitted cron jobs.
Expand Down Expand Up @@ -66,14 +70,21 @@ func (s byTime) Less(i, j int) bool {
return s[i].Next.Before(s[j].Next)
}

// New returns a new Cron job runner.
// New returns a new Cron job runner, in the Local time zone.
func New() *Cron {
return NewWithLocation(time.Now().Location())
}

// NewWithLocation returns a new Cron job runner.
func NewWithLocation(location *time.Location) *Cron {
return &Cron{
entries: nil,
add: make(chan *Entry),
stop: make(chan struct{}),
snapshot: make(chan []*Entry),
running: false,
ErrorLog: nil,
location: location,
}
}

Expand All @@ -87,7 +98,7 @@ func (c *Cron) AddFunc(spec string, cmd func()) error {
return c.AddJob(spec, FuncJob(cmd))
}

// AddFunc adds a Job to the Cron to be run on the given schedule.
// AddJob adds a Job to the Cron to be run on the given schedule.
func (c *Cron) AddJob(spec string, cmd Job) error {
schedule, err := Parse(spec)
if err != nil {
Expand Down Expand Up @@ -121,17 +132,37 @@ func (c *Cron) Entries() []*Entry {
return c.entrySnapshot()
}

// Start the cron scheduler in its own go-routine.
// Location gets the time zone location
func (c *Cron) Location() *time.Location {
return c.location
}

// Start the cron scheduler in its own go-routine, or no-op if already started.
func (c *Cron) Start() {
if c.running {
return
}
c.running = true
go c.run()
}

func (c *Cron) runWithRecovery(j Job) {
defer func() {
if r := recover(); r != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
c.logf("cron: panic running job: %v\n%s", r, buf)
}
}()
j.Run()
}

// Run the scheduler.. this is private just due to the need to synchronize
// access to the 'running' state variable.
func (c *Cron) run() {
// Figure out the next activation times for each entry.
now := time.Now().Local()
now := time.Now().In(c.location)
for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now)
}
Expand All @@ -149,32 +180,44 @@ func (c *Cron) run() {
effective = c.entries[0].Next
}

timer := time.NewTimer(effective.Sub(now))
select {
case now = <-time.After(effective.Sub(now)):
case now = <-timer.C:
// Run every entry whose next time was this effective time.
for _, e := range c.entries {
if e.Next != effective {
break
}
go e.Job.Run()
go c.runWithRecovery(e.Job)
e.Prev = e.Next
e.Next = e.Schedule.Next(effective)
e.Next = e.Schedule.Next(now)
}
continue

case newEntry := <-c.add:
c.entries = append(c.entries, newEntry)
newEntry.Next = newEntry.Schedule.Next(now)
newEntry.Next = newEntry.Schedule.Next(time.Now().In(c.location))

case <-c.snapshot:
c.snapshot <- c.entrySnapshot()

case <-c.stop:
timer.Stop()
return
}

// 'now' should be updated after newEntry and snapshot cases.
now = time.Now().Local()
now = time.Now().In(c.location)
timer.Stop()
}
}

// Logs an error to stderr or to the configured error log
func (c *Cron) logf(format string, args ...interface{}) {
if c.ErrorLog != nil {
c.ErrorLog.Printf(format, args...)
} else {
log.Printf(format, args...)
}
}

Expand Down
75 changes: 75 additions & 0 deletions cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,38 @@ import (
// compensate for a few milliseconds of runtime.
const ONE_SECOND = 1*time.Second + 10*time.Millisecond

func TestFuncPanicRecovery(t *testing.T) {
cron := New()
cron.Start()
defer cron.Stop()
cron.AddFunc("* * * * * ?", func() { panic("YOLO") })

select {
case <-time.After(ONE_SECOND):
return
}
}

type DummyJob struct{}

func (d DummyJob) Run() {
panic("YOLO")
}

func TestJobPanicRecovery(t *testing.T) {
var job DummyJob

cron := New()
cron.Start()
defer cron.Stop()
cron.AddJob("* * * * * ?", job)

select {
case <-time.After(ONE_SECOND):
return
}
}

// Start and stop cron with no entries.
func TestNoEntries(t *testing.T) {
cron := New()
Expand Down Expand Up @@ -77,6 +109,22 @@ func TestAddWhileRunning(t *testing.T) {
}
}

// Test for #34. Adding a job after calling start results in multiple job invocations
func TestAddWhileRunningWithDelay(t *testing.T) {
cron := New()
cron.Start()
defer cron.Stop()
time.Sleep(5 * time.Second)
var calls = 0
cron.AddFunc("* * * * * *", func() { calls += 1 })

<-time.After(ONE_SECOND)
if calls != 1 {
fmt.Printf("called %d times, expected 1\n", calls)
t.Fail()
}
}

// Test timing with Entries.
func TestSnapshotEntries(t *testing.T) {
wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -189,6 +237,33 @@ func TestLocalTimezone(t *testing.T) {
}
}

// Test that the cron is run in the given time zone (as opposed to local).
func TestNonLocalTimezone(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)

loc, err := time.LoadLocation("Atlantic/Cape_Verde")
if err != nil {
fmt.Printf("Failed to load time zone Atlantic/Cape_Verde: %+v", err)
t.Fail()
}

now := time.Now().In(loc)
spec := fmt.Sprintf("%d %d %d %d %d ?",
now.Second()+1, now.Minute(), now.Hour(), now.Day(), now.Month())

cron := NewWithLocation(loc)
cron.AddFunc(spec, func() { wg.Done() })
cron.Start()
defer cron.Stop()

select {
case <-time.After(ONE_SECOND):
t.FailNow()
case <-wait(wg):
}
}

// Test that calling stop before start silently returns without
// blocking the stop channel.
func TestStopWithoutStart(t *testing.T) {
Expand Down
Loading

0 comments on commit 470ac64

Please sign in to comment.