Skip to content

Commit

Permalink
feat: implement a process supervisor
Browse files Browse the repository at this point in the history
  • Loading branch information
cad committed Aug 11, 2017
1 parent c289118 commit 17ef4b8
Showing 1 changed file with 315 additions and 0 deletions.
315 changes: 315 additions & 0 deletions supervisor/supervisor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,315 @@
// Package supervisor provides a generic API to watch and manage Unix processes.
package supervisor

import (
"bytes"
"fmt"
"os"
"os/exec"
"os/user"
"strconv"
"sync"
"syscall"
"time"

"github.com/Sirupsen/logrus"
)

// State is a FSM state.
type State uint

// States
const (
UNKNOWN State = iota
RUNNING
STOPPED
STARTING
STOPPING
FAILED
EXITED
)

func (s State) String() string {
switch s {
case RUNNING:
return "RUNNING"
case STOPPED:
return "STOPPED"
case STARTING:
return "STARTING"
case STOPPING:
return "STOPPING"
case FAILED:
return "FAILED"
case EXITED:
return "EXITED"

default:
return "UNKNOWN"
}
}

type transition struct {
currState State
nextState State
}

// Transition Table
var tt = []transition{
transition{currState: STOPPED, nextState: STARTING},
transition{currState: STARTING, nextState: RUNNING},
transition{currState: STARTING, nextState: STARTING},
transition{currState: STARTING, nextState: FAILED},

transition{currState: RUNNING, nextState: STOPPING},
transition{currState: RUNNING, nextState: EXITED},
transition{currState: STOPPING, nextState: STOPPED},
transition{currState: STOPPING, nextState: STOPPING},
}

// Process represents a unix process to be supervised.
type Process struct {
lock *sync.RWMutex
state State
maxRetry uint
cmd *exec.Cmd
executable string
wdir string
args []string
done chan error
stop chan bool
out *bytes.Buffer
// stdin io.WriteCloser
// stdoutLog Logger
// stderrLog Logger
}

// NewProcess returns a new process to be supervised.
func NewProcess(executable string, dir string, args []string) (*Process, error) {
// initialize process and set the state to STOPPED without transitioning to it.
p := Process{}
if !isExist(executable) {
return &p, fmt.Errorf("executable can not be found: %s", executable)
}
p.maxRetry = 3
p.executable = executable
p.wdir = dir
p.args = args
p.lock = &sync.RWMutex{}
p.state = STOPPED
p.done = make(chan error)
p.stop = make(chan bool)
p.out = new(bytes.Buffer)

return &p, nil
}

// isExist returns wether the given executable binary is found on the filesystem or not.
func isExist(executable string) bool {
if _, err := os.Stat(executable); !os.IsNotExist(err) {
return true
}
return false
}

// Start will run the process.
func (p *Process) Start() {
p.transitionTo(STARTING)
}

// Stop will cause the process to stop.
func (p *Process) Stop() {
p.transitionTo(STOPPING)
}

// Restart will cause a running process to restart.
func (p *Process) Restart() {
if p.state == RUNNING {
p.Stop()
}
p.Start()
}

// Status returns the current state of the FSM.
func (p *Process) Status() State {
return p.state
}

// IsRunning retunrs wether the process is running or not.
func (p *Process) IsRunning() bool {
return p.state == RUNNING
}

func (p *Process) permittable(state State) bool {
for _, t := range tt {
if p.state == t.currState && t.nextState == state {
return true
}
}
return false
}

func (p *Process) setState(state State) {
p.lock.Lock()
p.state = state
p.lock.Unlock()
}

func (p *Process) transitionTo(state State) {
if p.permittable(state) {
logrus.Infof("transition: '%s' -> '%s'", p.state, state)
logrus.Debug(p.out.String())
p.setState(state)
go p.process(state)()
return
}
logrus.Errorf("transition to '%s' from '%s' is not permitted!", p.state, state)
return
}
func (p *Process) newCommand() *exec.Cmd {
cmd := exec.Command(p.executable)
cmd.Stdout = p.out
cmd.Stderr = p.out
cmd.Dir = p.wdir
cmd.Args = p.args

currUsr, err := user.Current()
if err != nil {
logrus.Errorf("can not get current running user: %v", err)
}

uid, err := strconv.Atoi(currUsr.Uid)
if err != nil {
panic(fmt.Sprintf("can not convert string to int %s: %v", currUsr.Uid, err))
}

gid, err := strconv.Atoi(currUsr.Gid)
if err != nil {
panic(fmt.Sprintf("can not convert string to int %s: %v", currUsr.Gid, err))
}
fmt.Printf("%++v", cmd)
cmd.SysProcAttr = &syscall.SysProcAttr{Credential: &syscall.Credential{Uid: uint32(uid), Gid: uint32(gid)}}
return cmd
}
func (p *Process) process(state State) func() {
switch state {
case STOPPED:
return func() {
p.done = make(chan error)
p.stop = make(chan bool)
}
case STARTING:
return func() {
// Prepare the command and start.
var err error
for i := uint(1); i <= p.maxRetry; i++ {
// Prepare the command to run.
p.lock.Lock()
p.cmd = p.newCommand()
p.lock.Unlock()

logrus.Infof("starting the process")
err = p.cmd.Start()
if err != nil {
logrus.Warnf("process can not be started: %v", err)
logrus.Infof("retrying... (%d/%d)", i, p.maxRetry)
continue
}
break
}
// Max retry reached process still not started.
if err != nil {
p.transitionTo(FAILED)
return
}

// Process started successfully.
logrus.Info("process is started")

// Process Observer
go func() {
err := p.cmd.Wait()
if err != nil {
logrus.Error(err)
close(p.done)
return
}
p.done <- err
close(p.done)
}()
logrus.Info("observer goroutine launched")
p.transitionTo(RUNNING)
}
case RUNNING:
return func() {
// Stop Observer
go func() {
select {
// process is ordered to stop.
case <-p.stop:
p.transitionTo(STOPPING)
return
// process exited on it's own
case err := <-p.done:
if p.state == RUNNING {
logrus.Infof("process exited: %v", err)
p.transitionTo(EXITED)
return
}
}
}()
}
case STOPPING:
return func() {
gracefullyStopped := false

// first try to kill the process, gracefully
err := p.cmd.Process.Signal(os.Interrupt)
if err != nil {
logrus.Errorf("interrupt signal returned error: %v", err)
}
for i := uint(1); i <= p.maxRetry; i++ {
select {
case <-time.After(3 * time.Second):
logrus.Infof("retrying... (%d/%d)", i, p.maxRetry)
err := p.cmd.Process.Signal(os.Interrupt)
if err != nil {
logrus.Errorf("interrupt signal returned error: %v", err)
}
case err = <-p.done:
if err == nil {
gracefullyStopped = true
break
}
logrus.Errorf("process stopped with error: %v", err)
break

}
}

// process didn't exit and retry count is full
// hard killing
if !gracefullyStopped {
err := p.cmd.Process.Kill()
if err != nil {
logrus.Fatal("can not kill process!")
}
<-p.done
}
logrus.Info("process stopped")
p.transitionTo(STOPPED)
}
case FAILED:
return func() {
logrus.Fatal("process operation failed state")
}
case EXITED:
return func() {
logrus.Errorln("process exited unexpectedly")
logrus.Printf("out: %s", p.out.String())
os.Exit(1)
}
default: // UNKNOWN
return nil
}
}

0 comments on commit 17ef4b8

Please sign in to comment.