Skip to content

Commit

Permalink
Implement consensus check during dynamic scaling. (lsds#146)
Browse files Browse the repository at this point in the history
* wait stdio

* update logging

* use uint32 for IPv4

* rename Host -> IPv4

* rename Hostname -> IPv4

* more rename

* consensus

* update docker compose

* fix panic

* add more tests

* temporary fix

* call unsafe.Pointer directly (lsds#149)
  • Loading branch information
lgarithm authored and luomai committed Oct 12, 2019
1 parent 13ce522 commit 52fbfd9
Show file tree
Hide file tree
Showing 26 changed files with 240 additions and 110 deletions.
7 changes: 4 additions & 3 deletions benchmarks/adaptive/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ RUN wget -q https://dl.google.com/go/go1.11.linux-amd64.tar.gz \
&& tar -C /usr/local -xf go1.11.linux-amd64.tar.gz \
&& rm go1.11.linux-amd64.tar.gz

RUN pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple tensorflow==1.12
RUN pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple numpy==1.16 tensorflow==1.12

WORKDIR /src/kungfu
ENV PATH ${PATH}:/usr/local/go/bin:/src/kungfu/bin
ENV GOBIN ${HOME}/go/bin
ENV PATH ${PATH}:/usr/local/go/bin:${HOME}/go/bin
ENV TF_CPP_MIN_LOG_LEVEL 2

ADD . .

RUN ./configure --build-tools --build-tensorflow-ops && make
RUN pip3 install --no-index -U -v .
RUN go install -v ./srcs/go/cmd/kungfu-run
1 change: 0 additions & 1 deletion benchmarks/adaptive/gen-compose.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def gen_compose(np, n_nodes, node_cap, tag, user_command):
"-np",
str(np),
"-w",
"-k",
] + user_command
compose = {
"version": "3",
Expand Down
11 changes: 8 additions & 3 deletions scripts/tests/run-go-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ set -e

cd $(dirname $0)/../..

CMAKE_SOURCE_DIR=$(pwd)

reset_go_mod() {
echo 'module github.com/lsds/KungFu' >go.mod
if [ -f go.sum ]; then
Expand All @@ -15,7 +13,7 @@ reset_go_mod() {
rebuild() {
env \
GOBIN=$(pwd)/bin \
go install -v ./tests/go/...
go install -v ./...
}

run_unit_tests() {
Expand All @@ -28,6 +26,13 @@ run_integration_tests() {
KUNGFU_CONFIG_ENABLE_MONITORING=true \
KUNGFU_CONFIG_MONITORING_PERIOD=$period \
./bin/test-monitor -p $period -d 1s

local np=4
local H=127.0.0.1:$np
./bin/kungfu-run \
-H $H \
-np $np \
./bin/test-public-apis
}

reset_go_mod
Expand Down
1 change: 0 additions & 1 deletion srcs/go/cmd/kungfu-rrun/rrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func main() {
utils.ExitErr(fmt.Errorf("failed to parse -port-range: %v", err))
}
jc := sch.JobConfig{
Parent: plan.PeerID{Host: "0.0.0.0"},
HostList: hl,
PortRange: *pr,
Prog: restArgs[0],
Expand Down
43 changes: 25 additions & 18 deletions srcs/go/cmd/kungfu-run/kungfu-run.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,11 @@ func main() {
}
t0 := time.Now()
defer func(prog string) { log.Infof("%s took %s", prog, time.Since(t0)) }(progName())
selfIP := func() string {
switch {
case len(*selfHost) > 0:
return *selfHost
case len(*nicName) > 0:
return inferIP(*nicName)
}
return "127.0.0.1"
}()
log.Infof("Using selfHost=%s", selfIP)
selfIP, err := getSelfIPv4(*selfHost, *nicName)
if err != nil {
utils.ExitErr(err)
}
log.Infof("Using selfHost=%s", plan.FormatIPv4(selfIP))
restArgs := flag.Args()
if len(restArgs) < 1 {
utils.ExitErr(errMissingProgramName)
Expand All @@ -92,11 +87,11 @@ func main() {
if err != nil {
utils.ExitErr(fmt.Errorf("failed to parse -H: %v", err))
}
parent := plan.PeerID{Host: selfIP, Port: uint16(*port)}
parent := plan.PeerID{IPv4: selfIP, Port: uint16(*port)}
parents := func() plan.PeerList {
var ps plan.PeerList
for _, h := range hl {
ps = append(ps, plan.PeerID{Host: h.Hostname, Port: uint16(*port)})
ps = append(ps, plan.PeerID{IPv4: h.IPv4, Port: uint16(*port)})
}
return ps
}()
Expand Down Expand Up @@ -133,7 +128,7 @@ func main() {
}
}

func simpleRun(ctx context.Context, selfIP string, ps []sch.Proc, jc sch.JobConfig) {
func simpleRun(ctx context.Context, selfIP uint32, ps []sch.Proc, jc sch.JobConfig) {
myPs := sch.ForHost(selfIP, ps)
if len(myPs) <= 0 {
log.Infof("No task to run on this node")
Expand All @@ -147,10 +142,22 @@ func simpleRun(ctx context.Context, selfIP string, ps []sch.Proc, jc sch.JobConf
}
}

func inferIP(nicName string) string {
func getSelfIPv4(hostname string, nic string) (uint32, error) {
if len(hostname) > 0 {
return plan.ParseIPv4(hostname)
}
if len(nic) > 0 {
return inferIPv4(nic)
}
return plan.MustParseIPv4(`127.0.0.1`), nil
}

var errNoIPv4Found = errors.New("no ipv4 found")

func inferIPv4(nicName string) (uint32, error) {
ifaces, err := net.Interfaces()
if err != nil {
return "127.0.0.1"
return 0, err
}
for _, i := range ifaces {
if i.Name != nicName {
Expand All @@ -168,10 +175,10 @@ func inferIP(nicName string) string {
case *net.IPAddr:
ip = v.IP
}
if ip.To4() != nil {
return ip.String()
if ip := ip.To4(); ip != nil {
return plan.PackIPv4(ip), nil
}
}
}
return "127.0.0.1"
return 0, errNoIPv4Found
}
18 changes: 9 additions & 9 deletions srcs/go/cmd/kungfu-run/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,25 @@ func watchRun(ctx context.Context, parent plan.PeerID, parents plan.PeerList, ch

reconcileCluster := func(s run.Stage) {
a, b := current.Diff(s.Cluster)
del := a.On(parent.Host)
add := b.On(parent.Host)
log.Infof("arrived at %q, np=%d, will remove %d %s (%d locally), will add %d %s (%d locally)",
del := a.On(parent.IPv4)
add := b.On(parent.IPv4)
log.Infof("arrived at %q, np=%d, will remove %s (%d locally), will add %s (%d locally)",
s.Checkpoint, len(s.Cluster),
len(a), utils.Pluralize(len(a), "peer", "peers"), len(del),
len(b), utils.Pluralize(len(b), "peer", "peers"), len(add))
utils.Pluralize(len(a), "peer", "peers"), len(del),
utils.Pluralize(len(b), "peer", "peers"), len(add))
log.Debugf("waiting %d peers to stop", len(del))
for _, id := range del {
gs[id].Wait()
delete(gs, id)
}
log.Debugf("%d peers removed", len(del))
log.Debugf("%s removed", utils.Pluralize(len(del), "peer", "peers"))
for i, id := range add {
gs[id] = new(sync.WaitGroup)
gs[id].Add(1)
all.Add(1)
go func(g *sync.WaitGroup, id plan.PeerID, s run.Stage) {
localRank, _ := s.Cluster.LocalRank(id)
name := fmt.Sprintf("%s.%d", id.Host, id.Port)
name := fmt.Sprintf("%s.%d", plan.FormatIPv4(id.IPv4), id.Port)
envs := sch.Envs{
kb.InitSessEnvKey: s.Checkpoint,
kb.CheckpointEnvKey: s.Checkpoint,
Expand All @@ -61,13 +61,13 @@ func watchRun(ctx context.Context, parent plan.PeerID, parents plan.PeerList, ch
atomic.AddInt32(&running, 1)
runProc(ctx, cancel, proc, s.Checkpoint)
n := atomic.AddInt32(&running, -1)
log.Debugf("%d peer is still running on this host", n)
log.Debugf("%s is still running on this host", utils.Pluralize(int(n), "peer", "peers"))
g.Done()
all.Done()
}(gs[id], id, s)
log.Debugf("peer %d/%d created", i, len(add))
}
log.Debugf("%d peers created", len(add))
log.Debugf("%s created", utils.Pluralize(len(add), "peer", "peers"))
current = s.Cluster
}
reconcileCluster(<-ch)
Expand Down
2 changes: 1 addition & 1 deletion srcs/go/cmd/run-experiments/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func parseResult(line string, r *Result) {
func humanizeHostSpecs(hosts []plan.HostSpec) string {
var ss []string
for _, h := range hosts {
ss = append(ss, fmt.Sprintf("<ip=%s, slots=%d, pub_ip=%s>", h.Hostname, h.Slots, h.PublicAddr))
ss = append(ss, fmt.Sprintf("<ip=%s, slots=%d, pub_ip=%s>", plan.FormatIPv4(h.IPv4), h.Slots, h.PublicAddr))
}
return strings.Join(ss, ", ")
}
Expand Down
51 changes: 49 additions & 2 deletions srcs/go/kungfu/kungfu.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Kungfu struct {
func getParentIDs(hl plan.HostList, parent plan.PeerID) plan.PeerList {
var ps plan.PeerList
for _, h := range hl {
ps = append(ps, plan.PeerID{Host: h.Hostname, Port: parent.Port})
ps = append(ps, plan.PeerID{IPv4: h.IPv4, Port: parent.Port})
}
return ps
}
Expand Down Expand Up @@ -83,7 +83,7 @@ func (kf *Kungfu) Start() int {
monitoringPort := kf.self.Port + 10000
monitor.StartServer(int(monitoringPort))
monitorAddr := plan.NetAddr{
Host: kf.self.Host, // FIXME: use pubAddr
IPv4: kf.self.IPv4, // FIXME: use pubAddr
Port: monitoringPort,
}
log.Infof("Kungfu peer %s started, monitoring endpoint http://%s/metrics", kf.self, monitorAddr)
Expand Down Expand Up @@ -159,11 +159,46 @@ func par(ps plan.PeerList, f func(plan.PeerID) error) error {
return mergeErrors(errs, "par")
}

func (kf *Kungfu) consensus(bs []byte) bool {
n := len(bs)
sess := kf.CurrentSession()
{
x := kb.NewVector(1, kb.I32)
y := kb.NewVector(1, kb.I32)
z := kb.NewVector(1, kb.I32)
x.AsI32()[0] = int32(n)
w1 := Workspace{SendBuf: x, RecvBuf: y, OP: kb.MIN, Name: ":consensus:len:min"}
w2 := Workspace{SendBuf: x, RecvBuf: z, OP: kb.MAX, Name: ":consensus:len:max"}
sess.AllReduce(w1)
sess.AllReduce(w2)
if !bytesEq(x.Data, y.Data) || !bytesEq(x.Data, z.Data) {
return false
}
}
{
x := &kb.Vector{Data: bs, Count: n, Type: kb.U8}
y := kb.NewVector(n, kb.U8)
z := kb.NewVector(n, kb.U8)
w1 := Workspace{SendBuf: x, RecvBuf: y, OP: kb.MIN, Name: ":consensus:min"}
w2 := Workspace{SendBuf: x, RecvBuf: z, OP: kb.MAX, Name: ":consensus:max"}
sess.AllReduce(w1)
sess.AllReduce(w2)
if !bytesEq(x.Data, y.Data) || !bytesEq(x.Data, z.Data) {
return false
}
}
return true
}

func (kf *Kungfu) propose(ckpt string, peers plan.PeerList) bool {
if peers.Eq(kf.currentPeers) {
log.Debugf("ingore unchanged proposal")
return true
}
if digest := peers.Bytes(); !kf.consensus(digest) {
log.Errorf("diverge proposal detected!")
return true
}
{
stage := run.Stage{Checkpoint: ckpt, Cluster: peers}
if err := par(kf.parents, func(parent plan.PeerID) error {
Expand Down Expand Up @@ -194,3 +229,15 @@ func (kf *Kungfu) ResizeCluster(ckpt string, newSize int) (bool, error) {
}
return kf.Update(), nil
}

func bytesEq(x, y []byte) bool {
if len(x) != len(y) {
return false
}
for i, a := range x {
if a != y[i] {
return false
}
}
return true
}
2 changes: 1 addition & 1 deletion srcs/go/kungfu/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,5 +357,5 @@ func mergeErrors(errs []error, hint string) error {
if failed == 0 {
return nil
}
return fmt.Errorf("%s failed with %d %s: %s", hint, failed, utils.Pluralize(failed, "error", "errors"), msg)
return fmt.Errorf("%s failed with %s: %s", hint, utils.Pluralize(failed, "error", "errors"), msg)
}
19 changes: 14 additions & 5 deletions srcs/go/kungfubase/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,24 @@ const (
// Transform performs y[i] += x[i] for vectors y and x
func Transform(y, x *Vector, op OP) {
// Assuming Count and Type are consistent
C.std_transform_2(ptr(x.Data), ptr(y.Data), ptr(y.Data), C.int(y.Count), C.KungFu_Datatype(y.Type), C.KungFu_Op(op))
Transform2(y, x, y, op)
}

// Transform2 performs z[i] = x[i] + y[i] for vectors z and x, y.
func Transform2(z, x, y *Vector, op OP) {
// Assuming Count and Type are consistent
C.std_transform_2(ptr(x.Data), ptr(y.Data), ptr(z.Data), C.int(z.Count), C.KungFu_Datatype(z.Type), C.KungFu_Op(op))
C.std_transform_2(
// ptr(x.Data), // panic when x.Data is returned from bytes.Buffer
// ptr(y.Data),
// ptr(z.Data),
// https://github.com/lsds/KungFu/issues/149
unsafe.Pointer(&x.Data[0]),
unsafe.Pointer(&y.Data[0]),
unsafe.Pointer(&z.Data[0]),
C.int(z.Count), C.KungFu_Datatype(z.Type), C.KungFu_Op(op))
}

func ptr(bs []byte) unsafe.Pointer {
return unsafe.Pointer(&bs[0])
}
// panic: runtime error: cgo argument has Go pointer to Go pointer
// func ptr(bs []byte) unsafe.Pointer {
// return unsafe.Pointer(&bs[0])
// }
2 changes: 1 addition & 1 deletion srcs/go/monitor/counters.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func newRateAccumulatorGroup(prefix string) *rateAccumulatorGroup {
}

func (g *rateAccumulatorGroup) getOrCreate(a plan.NetAddr) *rateAccumulator {
labels := fmt.Sprintf(`{peer="%s:%d"}`, a.Host, a.Port)
labels := fmt.Sprintf(`{peer="%s"}`, a)
g.Lock()
defer g.Unlock()
if ra, ok := g.rateAccumulators[labels]; !ok {
Expand Down
Loading

0 comments on commit 52fbfd9

Please sign in to comment.