Skip to content

Commit

Permalink
fix client goroutine leakage
Browse files Browse the repository at this point in the history
  • Loading branch information
holdno committed Apr 29, 2024
1 parent aa257bf commit c400fd5
Show file tree
Hide file tree
Showing 22 changed files with 36 additions and 28 deletions.
18 changes: 10 additions & 8 deletions agent/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,8 +513,8 @@ func tryLockTaskForExec(a *client, taskExecuteInfo *common.TaskExecutingInfo, ta
if taskExecuteInfo.CancelCtx.Err() != nil {
return
}
realtimeError := make(chan error)
err := tryLockUntilCtxIsDone(a.GetCenterSrv(), taskExecuteInfo, realtimeError)

realtimeError, err := tryLockUntilCtxIsDone(a.GetCenterSrv(), taskExecuteInfo)

if err != nil {
a.logger.Warn("failed to get task execute lock",
Expand Down Expand Up @@ -648,10 +648,10 @@ func (ts *TaskScheduler) PushEvent(event *common.TaskEvent) {
ts.TaskEventChan <- event
}

func tryLockUntilCtxIsDone(cli cronpb.CenterClient, execInfo *common.TaskExecutingInfo, disconnectChan chan error) error {
func tryLockUntilCtxIsDone(cli cronpb.CenterClient, execInfo *common.TaskExecutingInfo) (chan error, error) {
locker, err := cli.TryLock(execInfo.CancelCtx)
if err != nil {
return err
return nil, err
}

if err = locker.Send(&cronpb.TryLockRequest{
Expand All @@ -660,16 +660,18 @@ func tryLockUntilCtxIsDone(cli cronpb.CenterClient, execInfo *common.TaskExecuti
}); err != nil {
if errors.Is(err, io.EOF) {
if _, err = locker.Recv(); err != nil {
return err
return nil, err
}
}
return err
return nil, err
}

if _, err = locker.Recv(); err != nil {
return err
return nil, err
}

disconnectChan := make(chan error, 1)

go func() {
defer close(disconnectChan)
for {
Expand All @@ -694,5 +696,5 @@ func tryLockUntilCtxIsDone(cli cronpb.CenterClient, execInfo *common.TaskExecuti
}
}()

return nil
return disconnectChan, nil
}
6 changes: 6 additions & 0 deletions agent/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,9 @@ func TestSchedulerLatency(t *testing.T) {
t.Log(getSchedulerLatency(100, int32(i)) < getSchedulerLatency(50, 1))
}
}

func TestChanSend(t *testing.T) {
c1 := make(chan struct{})
c1 <- struct{}{}
t.Log("success")
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c400fd5

Please sign in to comment.