Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JSON RPC compatibility workarounds to support lbry-sdk #75

Merged
merged 3 commits into from
Oct 29, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Workaround to allow lbry-sdk to call server.version and server.features.
Incoming/outgoing JSON is patched using yet another codec (jsonPatchingCodec).
Add more logging of raw/patched JSON.
  • Loading branch information
moodyjon committed Oct 27, 2022
commit a3bedc2c1d9a99af5fb979f5eb7bcaf555ae2ec7
17 changes: 8 additions & 9 deletions server/jsonrpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,16 @@ type ServerVersionService struct {
Args *Args
}

type ServerVersionReq struct{}
type ServerVersionReq [2]string // [client_name, client_version]

type ServerVersionRes string
type ServerVersionRes [2]string // [version, protocol_version]

// Banner is the json rpc endpoint for 'server.version'.
// FIXME: This should return a struct with the version and the protocol version.
// <<-- that comment was written by github, scary shit because it's true
// Version is the json rpc endpoint for 'server.version'.
func (t *ServerService) Version(req *ServerVersionReq, res **ServerVersionRes) error {
log.Println("Version")

*res = (*ServerVersionRes)(&t.Args.ServerVersion)

// FIXME: We may need to do the computation of a negotiated version here.
// Also return an error if client is not supported?
result := [2]string{t.Args.ServerVersion, t.Args.ServerVersion}
*res = (*ServerVersionRes)(&result)
log.Printf("Version(%v) -> %v", *req, **res)
return nil
}
158 changes: 151 additions & 7 deletions server/session.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package server

import (
"bytes"
"encoding/hex"
"encoding/json"
"fmt"
"net"
"net/rpc"
Expand Down Expand Up @@ -241,7 +243,7 @@ func (sm *sessionManager) addSession(conn net.Conn) *session {

sm.grp.Add(1)
go func() {
s1.ServeCodec(&SessionServerCodec{jsonrpc.NewServerCodec(conn), sess})
s1.ServeCodec(&sessionServerCodec{jsonrpc.NewServerCodec(newJsonPatchingCodec(conn)), sess})
log.Infof("session %v goroutine exit", sess.addr.String())
sm.grp.Done()
}()
Expand Down Expand Up @@ -343,7 +345,7 @@ func (sm *sessionManager) doNotify(notification interface{}) {
}
}

type SessionServerCodec struct {
type sessionServerCodec struct {
rpc.ServerCodec
sess *session
}
Expand All @@ -354,13 +356,14 @@ type SessionServerCodec struct {
// blockchain.address.listunspent -> blockchain.address.Listunspent
// This makes the "method" string compatible with rpc.Server
// requirements.
func (c *SessionServerCodec) ReadRequestHeader(req *rpc.Request) error {
log.Infof("receive header from %v", c.sess.addr.String())
func (c *sessionServerCodec) ReadRequestHeader(req *rpc.Request) error {
log.Infof("from %v receive header", c.sess.addr.String())
err := c.ServerCodec.ReadRequestHeader(req)
if err != nil {
log.Warnf("error: %v", err)
return err
}
log.Infof("from %v receive header: %#v", c.sess.addr.String(), *req)
rawMethod := req.ServiceMethod
parts := strings.Split(rawMethod, ".")
if len(parts) < 2 {
Expand All @@ -377,20 +380,21 @@ func (c *SessionServerCodec) ReadRequestHeader(req *rpc.Request) error {
}

// ReadRequestBody wraps the regular implementation, but updates session stats too.
func (c *SessionServerCodec) ReadRequestBody(params any) error {
func (c *sessionServerCodec) ReadRequestBody(params any) error {
log.Infof("from %v receive body", c.sess.addr.String())
err := c.ServerCodec.ReadRequestBody(params)
if err != nil {
log.Warnf("error: %v", err)
return err
}
log.Infof("receive body from %v", c.sess.addr.String())
log.Infof("from %v receive body: %#v", c.sess.addr.String(), params)
// Bump last receive time.
c.sess.lastRecv = time.Now()
return err
}

// WriteResponse wraps the regular implementation, but updates session stats too.
func (c *SessionServerCodec) WriteResponse(resp *rpc.Response, reply any) error {
func (c *sessionServerCodec) WriteResponse(resp *rpc.Response, reply any) error {
log.Infof("respond to %v", c.sess.addr.String())
err := c.ServerCodec.WriteResponse(resp, reply)
if err != nil {
Expand All @@ -400,3 +404,143 @@ func (c *SessionServerCodec) WriteResponse(resp *rpc.Response, reply any) error
c.sess.lastSend = time.Now()
return err
}

// serverRequest is a duplicate of serverRequest from
// net/rpc/jsonrpc/server.go with an added Version which
// we can check.
type serverRequest struct {
Version string `json:"jsonrpc"`
Method string `json:"method"`
Params *json.RawMessage `json:"params"`
Id *json.RawMessage `json:"id"`
}

// serverResponse is a duplicate of serverResponse from
// net/rpc/jsonrpc/server.go with an added Version which
// we can set at will.
type serverResponse struct {
Version string `json:"jsonrpc"`
Id *json.RawMessage `json:"id"`
Result any `json:"result,omitempty"`
Error any `json:"error,omitempty"`
}

// jsonPatchingCodec is able to intercept the JSON requests/responses
// and tweak them. Currently it appears we need to add a
// "jsonrpc": "1.0" or "jsonrpc": "2.0" to make the client happy.
type jsonPatchingCodec struct {
conn net.Conn
inBuffer *bytes.Buffer
dec *json.Decoder
enc *json.Encoder
outBuffer *bytes.Buffer
}

func newJsonPatchingCodec(conn net.Conn) *jsonPatchingCodec {
buf1, buf2 := bytes.NewBuffer(nil), bytes.NewBuffer(nil)
return &jsonPatchingCodec{
conn: conn,
inBuffer: buf1,
dec: json.NewDecoder(buf1),
enc: json.NewEncoder(buf2),
outBuffer: buf2,
}
}

func (c *jsonPatchingCodec) Read(p []byte) (n int, err error) {
if c.outBuffer.Len() > 0 {
// Return remaining decoded bytes.
return c.outBuffer.Read(p)
}
// Buffer contents consumed. Try to decode more JSON.

// Read until framing newline. This allows us to print the raw request.
for !bytes.ContainsAny(c.inBuffer.Bytes(), "\n") {
var buf [1024]byte
n, err = c.conn.Read(buf[:])
if err != nil {
return 0, err
}
c.inBuffer.Write(buf[:n])
}
log.Infof("raw request: %v", c.inBuffer.String())

var req serverRequest
err = c.dec.Decode(&req)
if err != nil {
return 0, err
}

if req.Params != nil {
n := len(*req.Params)
if n < 2 || (*req.Params)[0] != '[' && (*req.Params)[n-1] != ']' {
// This is an error, but we're not going to try to correct it.
goto encode
}
// FIXME: The heuristics here don't cover all possibilities.
// For example: [{obj1}, {obj2}] or ["foo,bar"] would not
// be handled correctly.
bracketed := (*req.Params)[1 : n-1]
n = len(bracketed)
if n > 1 && (bracketed[0] == '{' || bracketed[0] == '[') {
// Probable single object or list argument.
goto encode
}
args := strings.Split(string(bracketed), ",")
if len(args) <= 1 {
// No commas at all. Definitely a single argument.
goto encode
}
// The params look like ["arg1", "arg2", "arg3", ...].
// We're in trouble because our jsonrpc library does not
// handle this. So pack these args in an inner list.
// The handler method will receive ONE list argument.
params := json.RawMessage(fmt.Sprintf("[[%s]]", bracketed))
req.Params = &params
} else {
// Add empty argument list if params omitted.
params := json.RawMessage("[]")
req.Params = &params
}

encode:
// Encode the request. This allows us to print the patched request.
buf, err := json.Marshal(req)
if err != nil {
return 0, err
}
log.Infof("patched request: %v", string(buf))

err = c.enc.Encode(req)
if err != nil {
return 0, err
}
return c.outBuffer.Read(p)
}

func (c *jsonPatchingCodec) Write(p []byte) (n int, err error) {
log.Infof("raw response: %v", string(p))
var resp serverResponse
err = json.Unmarshal(p, &resp)
if err != nil {
return 0, err
}

// Add "jsonrpc": "2.0" if missing.
if len(resp.Version) == 0 {
resp.Version = "2.0"
}

buf, err := json.Marshal(resp)
if err != nil {
return 0, err
}
log.Infof("patched response: %v", string(buf))

// Add newline for framing.
return c.conn.Write(append(buf, '\n'))
}

func (c *jsonPatchingCodec) Close() error {
return c.conn.Close()
}