Skip to content

Commit

Permalink
first version
Browse files Browse the repository at this point in the history
  • Loading branch information
ccding committed Jul 30, 2015
1 parent e6b34a9 commit c8eaddc
Show file tree
Hide file tree
Showing 6 changed files with 900 additions and 0 deletions.
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.PHONY: all
all:
sh gobuild.sh

.PHONY: clean
clean:
rm mst cli srv
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
# bayou
a Bayou implementation with a master for test

[http://dl.acm.org/citation.cfm?id=224070](http://dl.acm.org/citation.cfm?id=224070)

Terry, D. B., Theimer, M. M., Petersen, K., Demers, A. J., Spreitzer, M. J.,
and Hauser, C. H. Managing update conflicts in Bayou, a weakly connected
replicated storage system. SOSP 1995.
153 changes: 153 additions & 0 deletions client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package main

import (
"log"
"net"
"net/http"
"net/rpc"
"os"
"strconv"
)

type opType int

const (
PUT opType = iota + 1
DELETE
JOIN
RETIRE
)

type Data struct {
Op opType
Id int
Key string
Value string
Stable bool
CSN int
AccTime int
Replica string
Rid int
}

type Sret struct {
R string
VV []int
}

type Server struct {
clientId int
serverId int
maxNum int
log map[string][]int
}

func newServer(n int, m int) *Server {
s := new(Server)
s.serverId = m
s.clientId = n
s.maxNum = 100
s.log = make(map[string][]int)
return s
}

func (t *Server) Break(s *int, r *int) error {
if *s == t.serverId {
t.serverId = -1
}
return nil
}

func (t *Server) Restore(s *int, r *int) error {
t.serverId = *s
return nil
}

func (t *Server) Put(s *Data, r *string) error {
n := t.serverId
client, err := rpc.DialHTTP("tcp", "localhost:"+strconv.Itoa(8800+n))
if err != nil {
// error will never happen based on the assumption
}
defer client.Close()
ret := new(Sret)
ret.VV = make([]int, t.maxNum)
err = client.Call("Server.Sput", s, &ret)
if _, ok := t.log[s.Key]; !ok {
t.log[s.Key] = make([]int, t.maxNum)
}
for i := 0; i < len(t.log[s.Key]); i++ {
if t.log[s.Key][i] < ret.VV[i] {
t.log[s.Key][i] = ret.VV[i]
}
}
return err
}

func (t *Server) Get(s *string, r *string) error {
n := t.serverId
client, err := rpc.DialHTTP("tcp", "localhost:"+strconv.Itoa(8800+n))
if err != nil {
// error will never happen based on the assumption
}
defer client.Close()
ret := new(Sret)
ret.VV = make([]int, t.maxNum)
err = client.Call("Server.Sget", s, &ret)
if ret.R == "" {
ret.R = "ERR_KEY"
}
if _, ok := t.log[*s]; !ok {
t.log[*s] = make([]int, t.maxNum)
}
for i := 0; i < len(ret.VV); i++ {
if ret.VV[i] < t.log[*s][i] {
ret.R = "ERR_DEP"
break
}
}
*r = ret.R
for i := 0; i < len(t.log[*s]); i++ {
if t.log[*s][i] < ret.VV[i] {
t.log[*s][i] = ret.VV[i]
}
}
return err
}

func (t *Server) Delete(s *string, r *string) error {
n := t.serverId
client, err := rpc.DialHTTP("tcp", "localhost:"+strconv.Itoa(8800+n))
if err != nil {
// error will never happen based on the assumption
}
defer client.Close()
ret := new(Sret)
ret.VV = make([]int, t.maxNum)
err = client.Call("Server.Sdelete", s, &ret)
if _, ok := t.log[*s]; !ok {
t.log[*s] = make([]int, t.maxNum)
}
for i := 0; i < len(t.log[*s]); i++ {
if t.log[*s][i] < ret.VV[i] {
t.log[*s][i] = ret.VV[i]
}
}
return err
}

func main() {
n, _ := strconv.Atoi(os.Args[1])
m, _ := strconv.Atoi(os.Args[2])
port := strconv.Itoa(n + 8800)

server := newServer(n, m)
listener, err := net.Listen("tcp", ":"+port)
if err != nil {
log.Fatalln("listen error:", err)
}

rpc.Register(server)
rpc.HandleHTTP()
http.Serve(listener, nil)
}
14 changes: 14 additions & 0 deletions gobuild.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#! /bin/bash

cd master
go build
mv master ../mst
cd ..
cd client
go build
mv client ../cli
cd ..
cd server
go build
mv server ../srv
cd ..
190 changes: 190 additions & 0 deletions master/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package main

import (
"bufio"
"fmt"
"log"
"net/rpc"
"os"
"strconv"
"strings"
"time"
)

type Data struct {
Key string
Value string
}

func main() {

srvIds := make(map[int]bool)
processes := make([]*os.Process, 0)

// read from command line
bio := bufio.NewReader(os.Stdin)
for {
// print the command line starting char
// fmt.Printf("> ")

line, _, err := bio.ReadLine()
if err != nil {
for _, p := range processes {
p.Kill()
}
break
}

// handle the command line parameters
cmd := strings.Split(string(line), " ")
switch cmd[0] {
case "joinServer":
n, _ := strconv.Atoi(cmd[1])
srvIds[n] = true
var procAttr os.ProcAttr
procAttr.Files = []*os.File{nil, os.Stdout,
os.Stderr}
p, err := os.StartProcess("./srv",
[]string{"server", cmd[1]}, &procAttr)
if err != nil {
log.Fatalln("start server:", err)
}
processes = append(processes, p)
case "retireServer":
n, _ := strconv.Atoi(cmd[1])
delete(srvIds, n)
client, err := rpc.DialHTTP("tcp",
"localhost:"+strconv.Itoa(8800+n))
err = client.Call("Server.Retire", &n, nil)
if err != nil {
log.Fatalln(err)
}
client.Close()
case "joinClient":
var procAttr os.ProcAttr
procAttr.Files = []*os.File{nil, os.Stdout,
os.Stderr}
p, err := os.StartProcess("./cli",
[]string{"client", cmd[1], cmd[2]}, &procAttr)
if err != nil {
log.Fatalln("start client:", err)
}
processes = append(processes, p)
case "breakConnection":
n, _ := strconv.Atoi(cmd[1])
m, _ := strconv.Atoi(cmd[2])
{
client, err := rpc.DialHTTP("tcp",
"localhost:"+strconv.Itoa(8800+n))
err = client.Call("Server.Break", &m, nil)
if err != nil {
log.Fatalln(err)
}
client.Close()
}
{
client, err := rpc.DialHTTP("tcp",
"localhost:"+strconv.Itoa(8800+m))
err = client.Call("Server.Break", &n, nil)
if err != nil {
log.Fatalln(err)
}
client.Close()
}
case "restoreConnection":
n, _ := strconv.Atoi(cmd[1])
m, _ := strconv.Atoi(cmd[2])
{
client, err := rpc.DialHTTP("tcp",
"localhost:"+strconv.Itoa(8800+n))
err = client.Call("Server.Restore", &m, nil)
if err != nil {
log.Fatalln(err)
}
client.Close()
}
{
client, err := rpc.DialHTTP("tcp",
"localhost:"+strconv.Itoa(8800+m))
err = client.Call("Server.Restore", &n, nil)
if err != nil {
log.Fatalln(err)
}
client.Close()
}
case "pause":
for n, _ := range srvIds {
client, err := rpc.DialHTTP("tcp",
"localhost:"+strconv.Itoa(8800+n))
err = client.Call("Server.Pause", &n, nil)
if err != nil {
log.Fatalln(err)
}
client.Close()
}
case "start":
for n, _ := range srvIds {
client, err := rpc.DialHTTP("tcp",
"localhost:"+strconv.Itoa(8800+n))
err = client.Call("Server.Start", &n, nil)
if err != nil {
log.Fatalln(err)
}
client.Close()
}
case "stabilize":
for n, _ := range srvIds {
client, err := rpc.DialHTTP("tcp",
"localhost:"+strconv.Itoa(8800+n))
err = client.Call("Server.Stabilize", &n, nil)
if err != nil {
log.Fatalln(err)
}
client.Close()
}
case "printLog":
n, _ := strconv.Atoi(cmd[1])
client, err := rpc.DialHTTP("tcp",
"localhost:"+strconv.Itoa(8800+n))
var r string
err = client.Call("Server.PrintLog", &n, &r)
if err != nil {
log.Fatalln(err)
}
fmt.Printf("%s", r)
client.Close()
case "put":
n, _ := strconv.Atoi(cmd[1])
client, err := rpc.DialHTTP("tcp",
"localhost:"+strconv.Itoa(8800+n))
data := &Data{Key: cmd[2], Value: cmd[3]}
err = client.Call("Server.Put", data, nil)
if err != nil {
log.Fatalln(err)
}
client.Close()
case "get":
n, _ := strconv.Atoi(cmd[1])
client, err := rpc.DialHTTP("tcp",
"localhost:"+strconv.Itoa(8800+n))
var r string
err = client.Call("Server.Get", &cmd[2], &r)
if err != nil {
log.Fatalln(err)
}
fmt.Printf("%s:%s\n", cmd[2], r)
client.Close()
case "delete":
n, _ := strconv.Atoi(cmd[1])
client, err := rpc.DialHTTP("tcp",
"localhost:"+strconv.Itoa(8800+n))
err = client.Call("Server.Delete", &cmd[2], nil)
if err != nil {
log.Fatalln(err)
}
client.Close()
default:
}
time.Sleep(time.Second)
}
}
Loading

0 comments on commit c8eaddc

Please sign in to comment.