Skip to content

Commit

Permalink
update code
Browse files Browse the repository at this point in the history
Change-Id: I765d6ddf10200bcf3543e11f501b33f825a6844f
Signed-off-by: Stuart <[email protected]>
  • Loading branch information
Stuart authored and Stuart committed Jul 11, 2017
1 parent 060424a commit 8940170
Show file tree
Hide file tree
Showing 43 changed files with 381 additions and 260 deletions.
2 changes: 1 addition & 1 deletion chapter1/objects/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func get(w http.ResponseWriter, r *http.Request) {
f, e := os.Open(os.Getenv("STORAGE_ROOT") + "/" +
f, e := os.Open(os.Getenv("STORAGE_ROOT") + "/objects/" +
strings.Split(r.URL.EscapedPath(), "/")[2])
if e != nil {
log.Println(e)
Expand Down
2 changes: 1 addition & 1 deletion chapter1/objects/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func put(w http.ResponseWriter, r *http.Request) {
f, e := os.Create(os.Getenv("STORAGE_ROOT") + "/" +
f, e := os.Create(os.Getenv("STORAGE_ROOT") + "/objects/" +
strings.Split(r.URL.EscapedPath(), "/")[2])
if e != nil {
log.Println(e)
Expand Down
2 changes: 1 addition & 1 deletion chapter2/dataServer/locate/locate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func StartLocate() {
if e != nil {
panic(e)
}
if Locate(os.Getenv("STORAGE_ROOT") + "/" + n) {
if Locate(os.Getenv("STORAGE_ROOT") + "/objects/" + n) {
q.Send(msg.ReplyTo, os.Getenv("LISTEN_ADDRESS"))
}
}
Expand Down
10 changes: 7 additions & 3 deletions chapter2/lib/objectstream/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,21 @@ type GetStream struct {
reader io.Reader
}

func NewGetStream(server, object string) (*GetStream, error) {
r, e := http.Get("http://" + server + "/objects/" + object)
func newGetStream(url string) (*GetStream, error) {
r, e := http.Get(url)
if e != nil {
return nil, e
}
if r.StatusCode != http.StatusOK {
return nil, fmt.Errorf("http code %d", r.StatusCode)
return nil, fmt.Errorf("dataServer return http code %d", r.StatusCode)
}
return &GetStream{r.Body}, nil
}

func NewGetStream(server, object string) (*GetStream, error) {
return newGetStream("http://" + server + "/objects/" + object)
}

func (r *GetStream) Read(p []byte) (n int, err error) {
return r.reader.Read(p)
}
2 changes: 1 addition & 1 deletion chapter2/lib/objectstream/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func NewPutStream(server, object string) *PutStream {
go func() {
r, e := client.Do(request)
if e == nil && r.StatusCode != http.StatusOK {
e = fmt.Errorf("http code %d", r.StatusCode)
e = fmt.Errorf("dataServer return http code %d", r.StatusCode)
}
c <- e
}()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"strings"
)

type TempStream struct {
type TempPutStream struct {
server string
uuid string
}

func NewTempStream(server, object string, size int64) *TempStream {
func NewTempPutStream(server, object string, size int64) (*TempPutStream, error) {
request, e := http.NewRequest("POST", "http://"+server+"/temp/"+object, nil)
if e != nil {
panic(e)
Expand All @@ -21,16 +21,16 @@ func NewTempStream(server, object string, size int64) *TempStream {
client := http.Client{}
response, e := client.Do(request)
if e != nil {
panic(e)
return nil, e
}
uuid, e := ioutil.ReadAll(response.Body)
if e != nil {
panic(e)
}
return &TempStream{server, string(uuid)}
return &TempPutStream{server, string(uuid)}, nil
}

func (w *TempStream) Write(p []byte) (n int, err error) {
func (w *TempPutStream) Write(p []byte) (n int, err error) {
request, e := http.NewRequest("PATCH", "http://"+w.server+"/temp/"+w.uuid, strings.NewReader(string(p)))
if e != nil {
return 0, e
Expand All @@ -41,17 +41,22 @@ func (w *TempStream) Write(p []byte) (n int, err error) {
return 0, e
}
if r.StatusCode != http.StatusOK {
return 0, fmt.Errorf("http code %d", r.StatusCode)
return 0, fmt.Errorf("dataServer return http code %d", r.StatusCode)
}
return len(p), nil
}

func (w *TempStream) Close(good bool) {
func (w *TempPutStream) Close(good bool) (int, error) {
method := "DELETE"
if good {
method = "PUT"
}
request, _ := http.NewRequest(method, "http://"+w.server+"/temp/"+w.uuid, nil)
client := http.Client{}
client.Do(request)
r, e := client.Do(request)
return r.StatusCode, e
}

func (w *TempPutStream) NewTempGetStream() (*GetStream, error) {
return newGetStream("http://" + w.server + "/temp/" + w.uuid)
}
8 changes: 4 additions & 4 deletions chapter3/apiServer/objects/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package objects

import (
"../../lib/es"
"../../lib/httpstream"
"../../lib/objectstream"
"../locate"
"io"
"log"
Expand Down Expand Up @@ -31,12 +31,12 @@ func get(w http.ResponseWriter, r *http.Request) {
return
}
object := url.PathEscape(hash)
s := locate.Locate(object)
if s == "" {
info := locate.Locate(object)
if len(info) == 0 {
w.WriteHeader(http.StatusNotFound)
return
}
stream, e := httpstream.NewGetStream("http://" + s + "/objects/" + object)
stream, e := objectstream.NewGetStream(info[0].Addr, object)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
Expand Down
10 changes: 10 additions & 0 deletions chapter3/apiServer/objects/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,13 @@ func put(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}
}

func storeObject(r *http.Request, hash string) error {
s := heartbeat.ChooseRandomDataServer()
if s == "" {
return fmt.Errorf("cannot find any dataServer")
}
stream := httpstream.NewPutStream("http://" + s + "/objects/" + hash)
io.Copy(stream, r.Body)
return stream.Close()
}
30 changes: 10 additions & 20 deletions chapter3/apiServer/objects/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,7 @@ package objects

import (
"../../lib/es"
"../../lib/httpstream"
"../heartbeat"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
)
Expand All @@ -20,16 +15,21 @@ func getHashFromHeader(r *http.Request) string {
if digest[:8] != "SHA-256=" {
return ""
}
return url.PathEscape(digest[8:])
return digest[8:]
}

func addVersion(r *http.Request, hash string) error {
name := strings.Split(r.URL.EscapedPath(), "/")[2]

func getSizeFromHeader(r *http.Request) int64 {
size, e := strconv.ParseInt(r.Header.Get("content-length"), 0, 64)
if e != nil {
return e
panic(e)
}
return size
}

func addVersion(r *http.Request) error {
name := strings.Split(r.URL.EscapedPath(), "/")[2]
hash := getHashFromHeader(r)
size := getSizeFromHeader(r)

version, _, e := es.SearchLatestVersion(name)
if e != nil {
Expand All @@ -39,13 +39,3 @@ func addVersion(r *http.Request, hash string) error {

return es.PutVersion(name, version, size, hash)
}

func storeObject(r *http.Request, hash string) error {
s := heartbeat.ChooseRandomDataServer()
if s == "" {
return fmt.Errorf("cannot find any dataServer")
}
stream := httpstream.NewPutStream("http://" + s + "/objects/" + hash)
io.Copy(stream, r.Body)
return stream.Close()
}
1 change: 0 additions & 1 deletion chapter3/lib/httpstream

This file was deleted.

1 change: 1 addition & 0 deletions chapter3/lib/objectstream
1 change: 0 additions & 1 deletion chapter4/apiServer/locate

This file was deleted.

51 changes: 51 additions & 0 deletions chapter4/apiServer/locate/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package locate

import (
"../../lib/rabbitmq"
"encoding/json"
"net/http"
"os"
"strings"
"time"
)

type locateMessage struct {
Addr string
Id int
}

func Locate(name string) (locateInfo []locateMessage) {
q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
q.Publish("dataServers", name)
c := q.Consume()
go func() {
time.Sleep(time.Second)
q.Close()
}()
msg := <-c
if len(msg.Body) == 0 {
return
}
var info locateMessage
json.Unmarshal(msg.Body, &info)
locateInfo = append(locateInfo, info)
return
}

func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method
if m != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
info := Locate(strings.Split(r.URL.EscapedPath(), "/")[2])
if len(info) == 0 {
w.WriteHeader(http.StatusNotFound)
return
}
for i := range info {
b, _ := json.Marshal(info[i])
w.Write(b)
w.Write([]byte("\n"))
}
}
49 changes: 43 additions & 6 deletions chapter4/apiServer/objects/put.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,69 @@
package objects

import (
"../../lib/objectstream"
"../heartbeat"
"../locate"
"crypto/sha256"
"encoding/base64"
"fmt"
"io"
"log"
"net/http"
"net/url"
)

func put(w http.ResponseWriter, r *http.Request) {
hash := getHashFromHeader(r)
if hash == "" {
log.Println("missing object hash in digest header")
w.WriteHeader(http.StatusBadRequest)
return
}

s := locate.Locate(hash)
if s == "" {
e := storeObject(r, hash)
info := locate.Locate(url.PathEscape(hash))
if len(info) == 0 {
c, e := storeObject(r)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
w.WriteHeader(c)
return
}
if c != http.StatusOK {
w.WriteHeader(c)
return
}

}

e := addVersion(r, hash)
e := addVersion(r)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
}
}

func storeObject(r *http.Request) (int, error) {
s := heartbeat.ChooseRandomDataServer()
if s == "" {
return http.StatusServiceUnavailable, fmt.Errorf("cannot find any dataServer")
}
hash := getHashFromHeader(r)
size := getSizeFromHeader(r)
h := sha256.New()
reader := io.TeeReader(r.Body, h)
stream, e := objectstream.NewTempStream(s, url.PathEscape(hash), size)
if e != nil {
return http.StatusInternalServerError, e
}
_, e = io.Copy(stream, reader)
if e != nil {
stream.Close(false)
return http.StatusInternalServerError, e
}
digest := base64.StdEncoding.EncodeToString(h.Sum(nil))
if digest != hash {
stream.Close(false)
return http.StatusBadRequest, fmt.Errorf("object hash mismatch, calculated=%s, requested=%s", digest, hash)
}
return stream.Close(true)
}
8 changes: 4 additions & 4 deletions chapter4/dataServer/locate/locate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package locate

import (
"../../lib/rabbitmq"
"fmt"
"os"
"path/filepath"
"strconv"
Expand All @@ -21,7 +20,6 @@ func Locate(object string) int {
mutex.Lock()
id, _ := objects[object]
mutex.Unlock()
fmt.Println(object, id)
return id
}

Expand Down Expand Up @@ -49,8 +47,10 @@ func StartLocate() {
}

func CollectObjects() {
files, e := filepath.Glob(os.Getenv("STORAGE_ROOT") + "/*")
fmt.Println(files, e)
files, e := filepath.Glob(os.Getenv("STORAGE_ROOT") + "/objects/*")
if e != nil {
panic(e)
}
for i := range files {
object := filepath.Base(files[i])
objects[object] = 1
Expand Down
Loading

0 comments on commit 8940170

Please sign in to comment.