Skip to content

Commit

Permalink
executor api for async operators
Browse files Browse the repository at this point in the history
  • Loading branch information
jacokoo committed Jan 29, 2019
1 parent e353472 commit 0465559
Show file tree
Hide file tree
Showing 6 changed files with 492 additions and 0 deletions.
60 changes: 60 additions & 0 deletions executor/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package executor

import (
"sync"
)

// Disposer dispose something
type Disposer interface {
Dispose()
}

type funcc struct {
fn interface{}
}

type eventSource struct {
listeners []*funcc
lock *sync.Mutex
}

type listenerDisposer struct {
es *eventSource
fn *funcc
}

func (ld *listenerDisposer) Dispose() {
ld.es.off(ld.fn)
}

func (e *eventSource) on(fn interface{}) Disposer {
e.lock.Lock()
defer e.lock.Unlock()
fc := &funcc{fn}
e.listeners = append(e.listeners, fc)
return &listenerDisposer{e, fc}
}

func (e *eventSource) off(fn *funcc) {
e.lock.Lock()
defer e.lock.Unlock()

ls := make([]*funcc, 0)
for _, v := range e.listeners {
if v != fn {
ls = append(ls, v)
}
}
e.listeners = ls
}

func (e *eventSource) fire(fn func(fn interface{})) {
ls := e.listeners[:]
for _, v := range ls {
fn(v.fn)
}
}

func newEventSource() *eventSource {
return &eventSource{nil, new(sync.Mutex)}
}
122 changes: 122 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package executor

import (
"errors"
"sync"
)

// Executor task executor
type Executor interface {
Submit(string, TaskFunc) (Future, error)
Running() []RunningTask
Close() error
}

type item struct {
task *task
state *taskState
}

func (i *item) Name() string {
return i.task.name
}

func (i *item) Future() Future {
return i.state.fu
}

type limittedExecutor struct {
max int
errorWhenFull bool
ch chan *item
quit chan bool
running []*item
ended bool
lock *sync.RWMutex
}

func (e *limittedExecutor) worker(i int) {
for {
select {
case item := <-e.ch:
if !item.state.IsCancelled() {
e.running[i] = item
data, err := item.task.Do(item.state)
item.state.fu.set(data, err)
e.running[i] = nil
}
case <-e.quit:
return
}
}
}

func (e *limittedExecutor) init() {
for i := 0; i < e.max; i++ {
go e.worker(i)
}
}

// Fixed create a limited executor
func Fixed(max uint, errorWhenFull bool) Executor {
e := &limittedExecutor{
int(max), errorWhenFull,
make(chan *item), make(chan bool), make([]*item, int(max)),
false, new(sync.RWMutex),
}
e.init()
return e
}

func (e *limittedExecutor) Submit(name string, fn TaskFunc) (Future, error) {
e.lock.RLock()
if e.ended {
return nil, errors.New("Executor is already closed")
}
e.lock.RUnlock()

ts := newTaskState()
item := &item{&task{name, fn}, ts}
if !e.errorWhenFull {
e.ch <- item
return ts.fu, nil
}

select {
case e.ch <- item:
return ts.fu, nil
default:
return nil, errors.New("task is full")
}
}

func (e *limittedExecutor) Running() []RunningTask {
re := make([]RunningTask, 0)
for _, v := range e.running {
if v != nil {
re = append(re, v)
}
}
return re
}

func (e *limittedExecutor) Close() error {
e.lock.Lock()
defer e.lock.Unlock()

if e.ended {
return errors.New("Executor is already closed")
}

e.ended = true
for i := 0; i < e.max; i++ {
e.quit <- true
}

for _, v := range e.running {
if v != nil {
v.Future().Cancel()
}
}
return nil
}
23 changes: 23 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package executor

import (
"testing"
"time"
)

func TestExecutor(t *testing.T) {
exec := Fixed(2, false)
fu, _ := exec.Submit("hello", func(ts TaskState) (interface{}, error) {
time.Sleep(time.Second * 2)
return 1, nil
})
fu2, _ := exec.Submit("world", func(ts TaskState) (interface{}, error) {
return 2, nil
})

re, _ := fu.Get()
re2, _ := fu2.Get()
if re != 1 || re2 != 2 {
t.Fail()
}
}
157 changes: 157 additions & 0 deletions executor/future.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package executor

import (
"errors"
"sync"
)

var (
_ = Future(new(future))

// ErrorCancelled error to use when cancel task
ErrorCancelled = errors.New("Task is already cancelled")
)

type futureState uint16

// Future state
const (
futurePadding futureState = iota
futureBlocked
futureCancelled
futureCompleted
)

// Future get value after complete
type Future interface {
Get() (interface{}, error)
Cancel() error
OnComplete(func(interface{}, error)) Disposer
OnProgress(func(interface{})) Disposer
}

type future struct {
completeSource *eventSource
progressSource *eventSource

result interface{}
error error
blocker chan bool
blockCount int
state futureState
stateLock *sync.RWMutex
}

func newFuture() *future {
return &future{
completeSource: newEventSource(),
progressSource: newEventSource(),
blocker: make(chan bool),
stateLock: new(sync.RWMutex),
}
}

func (f *future) Get() (interface{}, error) {
f.stateLock.Lock()

if f.state == futureCancelled {
f.stateLock.Unlock()
return nil, ErrorCancelled
}

if f.state == futureCompleted {
f.stateLock.Unlock()
return f.result, f.error
}

f.blockCount++
f.state = futureBlocked
f.stateLock.Unlock()

<-f.blocker

f.stateLock.RLock()
defer f.stateLock.RUnlock()
if f.state == futureCancelled {
return nil, ErrorCancelled
}

if f.state == futureCompleted {
return f.result, f.error
}

return nil, errors.New("Unknown state")
}

func (f *future) set(result interface{}, err error) {
f.stateLock.Lock()
defer f.stateLock.Unlock()

if f.state != futurePadding && f.state != futureBlocked {
return
}
f.result = result
f.error = err

old := f.state
f.state = futureCompleted

if old == futureBlocked {
for i := 0; i < f.blockCount; i++ {
f.blocker <- true
}
}

f.fireComplete()
}

func (f *future) Cancel() error {
f.stateLock.Lock()
defer f.stateLock.Unlock()

if f.state == futureCompleted {
return errors.New("Task is already completed")
}

if f.state == futureCancelled {
return nil
}

f.error = ErrorCancelled
old := f.state
f.state = futureCancelled

if old == futureBlocked {
for i := 0; i < f.blockCount; i++ {
f.blocker <- true
}
}

f.fireComplete()

return nil
}

// OnComplete set complete callback
func (f *future) OnComplete(fn func(interface{}, error)) Disposer {
return f.completeSource.on(fn)
}

// OnProgress set progress callback
func (f *future) OnProgress(fn func(interface{})) Disposer {
return f.progressSource.on(fn)
}

func (f *future) fireComplete() {
go f.completeSource.fire(func(fn interface{}) {
fnn, _ := fn.(func(interface{}, error))
fnn(f.result, f.error)
})
}

func (f *future) fireProgress(p interface{}) {
go f.progressSource.fire(func(fn interface{}) {
fnn, _ := fn.(func(interface{}))
fnn(p)
})
}
Loading

0 comments on commit 0465559

Please sign in to comment.