Skip to content

Commit

Permalink
feat: recover it when the goroutine caused some panic (OpenAtomFounda…
Browse files Browse the repository at this point in the history
…tion#2349)

Co-authored-by: liuchengyu <[email protected]>
  • Loading branch information
chengyu-l and liuchengyu authored Feb 2, 2024
1 parent a885758 commit 2e5bbff
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 12 deletions.
25 changes: 13 additions & 12 deletions codis/pkg/topom/topom.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"pika/codis/v2/pkg/utils/math2"
"pika/codis/v2/pkg/utils/redis"
"pika/codis/v2/pkg/utils/rpc"
gxruntime "pika/codis/v2/pkg/utils/runtime"
"pika/codis/v2/pkg/utils/sync2/atomic2"
)

Expand Down Expand Up @@ -197,7 +198,7 @@ func (s *Topom) Start(routines bool) error {
}

// Check the status of all masters and slaves every 5 seconds
go func() {
gxruntime.GoUnterminated(func() {
for !s.IsClosed() {
if s.IsOnline() {
w, _ := s.CheckMastersAndSlavesState(10 * time.Second)
Expand All @@ -207,11 +208,11 @@ func (s *Topom) Start(routines bool) error {
}
time.Sleep(s.Config().SentinelCheckServerStateInterval.Duration())
}
}()
}, nil, true, 0)

// Check the status of the pre-offline master every 1 second
// to determine whether to automatically switch master and slave
go func() {
gxruntime.GoUnterminated(func() {
for !s.IsClosed() {
if s.IsOnline() {
w, _ := s.CheckPreOffineMastersState(5 * time.Second)
Expand All @@ -221,9 +222,9 @@ func (s *Topom) Start(routines bool) error {
}
time.Sleep(s.Config().SentinelCheckMasterFailoverInterval.Duration())
}
}()
}, nil, true, 0)

go func() {
gxruntime.GoUnterminated(func() {
for !s.IsClosed() {
if s.IsOnline() {
w, _ := s.RefreshRedisStats(time.Second)
Expand All @@ -233,9 +234,9 @@ func (s *Topom) Start(routines bool) error {
}
time.Sleep(time.Second)
}
}()
}, nil, true, 0)

go func() {
gxruntime.GoUnterminated(func() {
for !s.IsClosed() {
if s.IsOnline() {
w, _ := s.RefreshProxyStats(time.Second)
Expand All @@ -245,9 +246,9 @@ func (s *Topom) Start(routines bool) error {
}
time.Sleep(time.Second)
}
}()
}, nil, true, 0)

go func() {
gxruntime.GoUnterminated(func() {
for !s.IsClosed() {
if s.IsOnline() {
if err := s.ProcessSlotAction(); err != nil {
Expand All @@ -257,9 +258,9 @@ func (s *Topom) Start(routines bool) error {
}
time.Sleep(time.Second)
}
}()
}, nil, true, 0)

go func() {
gxruntime.GoUnterminated(func() {
for !s.IsClosed() {
if s.IsOnline() {
if err := s.ProcessSyncAction(); err != nil {
Expand All @@ -269,7 +270,7 @@ func (s *Topom) Start(routines bool) error {
}
time.Sleep(time.Second)
}
}()
}, nil, true, 0)

return nil
}
Expand Down
84 changes: 84 additions & 0 deletions codis/pkg/utils/runtime/goroutine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package gxruntime

import (
"fmt"
"os"
"runtime/debug"
"sync"
"time"
)

// GoSafely wraps a `go func()` with recover()
func GoSafely(wg *sync.WaitGroup, ignoreRecover bool, handler func(), catchFunc func(r interface{})) {
if wg != nil {
wg.Add(1)
}
go func() {
defer func() {
if r := recover(); r != nil {
if !ignoreRecover {
fmt.Fprintf(os.Stderr, "%s goroutine panic: %v\n%s\n",
time.Now(), r, string(debug.Stack()))
}
if catchFunc != nil {
if wg != nil {
wg.Add(1)
}
go func() {
defer func() {
if p := recover(); p != nil {
if !ignoreRecover {
fmt.Fprintf(os.Stderr, "recover goroutine panic:%v\n%s\n",
p, string(debug.Stack()))
}
}

if wg != nil {
wg.Done()
}
}()
catchFunc(r)
}()
}
}
if wg != nil {
wg.Done()
}
}()
handler()
}()
}

// GoUnterminated is used for which goroutine wanna long live as its process.
// @period: sleep time duration after panic to defeat @handle panic so frequently. if it is not positive,
//
// the @handle will be invoked asap after panic.
func GoUnterminated(handle func(), wg *sync.WaitGroup, ignoreRecover bool, period time.Duration) {
GoSafely(wg,
ignoreRecover,
handle,
func(r interface{}) {
if period > 0 {
time.Sleep(period)
}
GoUnterminated(handle, wg, ignoreRecover, period)
},
)
}

0 comments on commit 2e5bbff

Please sign in to comment.