Skip to content

Commit

Permalink
optimize(component):remove useless chan
Browse files Browse the repository at this point in the history
  • Loading branch information
ITcathyh committed Dec 31, 2019
1 parent 3c7505c commit c45d1ec
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 28 deletions.
7 changes: 7 additions & 0 deletions actuator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,10 @@ func TestPanic(t *testing.T) {
c := NewActuator()
testPanic(t, c)
}

func TestManyError(t *testing.T) {
timeout := time.Second
opt := &Options{TimeOut:&timeout}
c := NewActuator(opt)
testManyError(t, c)
}
23 changes: 21 additions & 2 deletions common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func testTimeout(t *testing.T, c TimedActuator) {

Equal(t, err, ErrorTimeOut)
et := time.Now().UnixNano()
t.Logf("used time:%ds", (et-st)/1000000)
t.Logf("used time:%dms", (et-st)/1000000)
time.Sleep(time.Millisecond * 500)
}

Expand All @@ -26,7 +26,26 @@ func testError(t *testing.T, c TimedActuator) {

Equal(t, err, te)
et := time.Now().UnixNano()
t.Logf("used time:%ds", (et-st)/1000000)
t.Logf("used time:%dms", (et-st)/1000000)
time.Sleep(time.Millisecond * 500)
}

func testManyError(t *testing.T, c TimedActuator) {
tasks := make([]Task, 0)
tmp, te := getErrorTask()
tasks = append(tasks, tmp...)

for i := 0; i < 100; i++ {
tmp, _ = getErrorTask()
tasks = append(tasks, tmp...)
}

st := time.Now().UnixNano()
err := c.Exec(tasks...)

Equal(t, err, te)
et := time.Now().UnixNano()
t.Logf("used time:%dms", (et-st)/1000000)
time.Sleep(time.Millisecond * 500)
}

Expand Down
44 changes: 28 additions & 16 deletions exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,41 @@ import (
)

// wait waits for the notification of execution result
func wait(c TimedActuator, ctx context.Context, resChan chan error) error {
func wait(c TimedActuator, ctx context.Context,
resChan chan error, cancel context.CancelFunc) error {
if timeout := c.GetTimeout(); timeout != nil {
return waitWithTimeout(ctx, resChan, *timeout)
return waitWithTimeout(ctx, resChan, *timeout, cancel)
}

select {
case <-ctx.Done():
return nil
case err := <-resChan:
return err
for {
select {
case <-ctx.Done():
return nil
case err := <-resChan:
if err != nil {
cancel()
return err
}
}
}
}

// waitWithTimeout waits for the notification of execution result
// when the timeout is set
func waitWithTimeout(ctx context.Context, resChan chan error, timeout time.Duration) error {
select {
case <-time.After(timeout):
return ErrorTimeOut
case <-ctx.Done():
return nil
case err := <-resChan:
return err
func waitWithTimeout(ctx context.Context, resChan chan error,
timeout time.Duration, cancel context.CancelFunc) error {
for {
select {
case <-time.After(timeout):
return ErrorTimeOut
case <-ctx.Done():
return nil
case err := <-resChan:
if err != nil {
cancel()
return err
}
}
}
}

Expand Down Expand Up @@ -68,7 +80,7 @@ func execTasks(c TimedActuator, parent context.Context,
execFunc(f)
}

return wait(c, ctx, resChan)
return wait(c, ctx, resChan, cancel)
}

// simplyRun uses a new goroutine to run the function
Expand Down
11 changes: 1 addition & 10 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,20 @@ func DurationPtr(t time.Duration) *time.Duration {
func wrapperTask(ctx context.Context, task Task,
wg *sync.WaitGroup, resChan chan error) func() {
return func() {
tmpChan := make(chan error, 1)
defer func() {
if r := recover(); r != nil {
err := fmt.Errorf("conexec panic:%v\n%s", r, string(debug.Stack()))
resChan <- err
}

wg.Done()
close(tmpChan)
}()

select {
case <-ctx.Done():
return // fast return
case tmpChan <- task():
if err := <-tmpChan; err != nil {
resChan <- err
}
case resChan <- task():
}

// if err := task(); err != nil {
// resChan <- err
// }
}
}

Expand Down

0 comments on commit c45d1ec

Please sign in to comment.