Skip to content

Commit

Permalink
complete tinyrpc dev
Browse files Browse the repository at this point in the history
  • Loading branch information
zehua Ma committed Nov 26, 2021
1 parent a226512 commit 03ccea4
Show file tree
Hide file tree
Showing 21 changed files with 1,785 additions and 0 deletions.
8 changes: 8 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions .idea/tinyrpc.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 53 additions & 0 deletions .idea/watcherTasks.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

196 changes: 196 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package tinyrpc

import (
"errors"
"io"
"log"
"sync"
"time"
)

var ErrShutdown = errors.New("connection is shut down")

type Call struct {
ServiceMethod string // The name of the service and method to call.
Args interface{} // The argument to the function (*struct).
Reply interface{} // The reply from the function (*struct).
Error error // After completion, the error status.
TTL time.Time
Done chan *Call // Receives *Call when Go is complete.
}

type Client struct {
codec ClientCodec

reqMutex sync.Mutex // protects following
request Request

mutex sync.Mutex // protects following
seq uint64
pending map[uint64]*Call
closing bool // user has called Close
shutdown bool // server has told us to stop
}

type ClientCodec interface {
WriteRequest(*Request, interface{}) error
ReadResponseHeader(*Response) error
ReadResponseBody(interface{}) error

Close() error
}

func (client *Client) send(call *Call) {
client.reqMutex.Lock()
defer client.reqMutex.Unlock()

// Register this call.
client.mutex.Lock()
if client.shutdown || client.closing {
client.mutex.Unlock()
call.Error = ErrShutdown
call.done()
return
}
seq := client.seq
client.seq++
client.pending[seq] = call
client.mutex.Unlock()

// Encode and send the request.
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
client.request.TTL = call.TTL
err := client.codec.WriteRequest(&client.request, call.Args)
if err != nil {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
}
}

func (client *Client) input() {
var err error
var response Response
for err == nil {
response = Response{}
err = client.codec.ReadResponseHeader(&response)
if err != nil {
break
}
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()

switch {
case call == nil:
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("tinyrpc: reading error body: " + err.Error())
}
case response.Error != "":
call.Error = errors.New(response.Error)
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("tinyrpc: reading error body: " + err.Error())
}
call.done()
default:
err = client.codec.ReadResponseBody(call.Reply)
if err != nil {
call.Error = errors.New("tinyrpc: reading body " + err.Error())
}
call.done()
}
}

client.reqMutex.Lock()
client.mutex.Lock()
client.shutdown = true
closing := client.closing
if err == io.EOF {
if closing {
err = ErrShutdown
} else {
err = io.ErrUnexpectedEOF
}
}
for _, call := range client.pending {
call.Error = err
call.done()
}
client.mutex.Unlock()
client.reqMutex.Unlock()
if err != io.EOF && !closing {
log.Println("tinyrpc: client protocol error:", err)
}
}

func (call *Call) done() {
select {
case call.Done <- call:
default:
}
}

func NewClientWithCodec(codec ClientCodec) *Client {
client := &Client{
codec: codec,
pending: make(map[uint64]*Call),
}
go client.input()
return client
}

func (client *Client) Close() error {
client.mutex.Lock()
if client.closing {
client.mutex.Unlock()
return ErrShutdown
}
client.closing = true
client.mutex.Unlock()
return client.codec.Close()
}

func (client *Client) Go(serviceMethod string, args interface{},
reply interface{}, done chan *Call, ttl time.Duration) *Call {

call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if ttl != 0 {
call.TTL = time.Now().Add(ttl)
} else {
call.TTL = time.Unix(0, 0)
}
if done == nil {
done = make(chan *Call, 10)
} else {
if cap(done) == 0 {
log.Panic("tinyrpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}

func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1), time.Duration(0)).Done
return call.Error
}

func (client *Client) CallWithTimeout(
serviceMethod string, args interface{}, reply interface{}, ttl time.Duration) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1), ttl).Done
return call.Error
}
59 changes: 59 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package tinyrpc

import "time"

// Request .
type Request struct {
ServiceMethod string
Seq uint64
TTL time.Time
next *Request
}

// Response .
type Response struct {
ServiceMethod string
Seq uint64
Error string
next *Response
}

func (server *Server) getRequest() *Request {
server.reqLock.Lock()
req := server.freeReq
if req == nil {
req = new(Request)
} else {
server.freeReq = req.next
*req = Request{}
}
server.reqLock.Unlock()
return req
}

func (server *Server) freeRequest(req *Request) {
server.reqLock.Lock()
req.next = server.freeReq
server.freeReq = req
server.reqLock.Unlock()
}

func (server *Server) getResponse() *Response {
server.respLock.Lock()
resp := server.freeResp
if resp == nil {
resp = new(Response)
} else {
server.freeResp = resp.next
*resp = Response{}
}
server.respLock.Unlock()
return resp
}

func (server *Server) freeResponse(resp *Response) {
server.respLock.Lock()
resp.next = server.freeResp
server.freeResp = resp
server.respLock.Unlock()
}
10 changes: 10 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module github.com/cloudmzh/tinyrpc

go 1.17

require (
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.4
)

require google.golang.org/protobuf v1.26.0 // indirect
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
Loading

0 comments on commit 03ccea4

Please sign in to comment.