forked from cosmos/cosmos-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgrpc_query.go
123 lines (101 loc) · 3.33 KB
/
grpc_query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package client
import (
gocontext "context"
"fmt"
"reflect"
"strconv"
gogogrpc "github.com/gogo/protobuf/grpc"
abci "github.com/tendermint/tendermint/abci/types"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"
"google.golang.org/grpc/metadata"
"github.com/cosmos/cosmos-sdk/codec/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
grpctypes "github.com/cosmos/cosmos-sdk/types/grpc"
"github.com/cosmos/cosmos-sdk/types/tx"
)
var _ gogogrpc.ClientConn = Context{}
var protoCodec = encoding.GetCodec(proto.Name)
// Invoke implements the grpc ClientConn.Invoke method
func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, args, reply interface{}, opts ...grpc.CallOption) (err error) {
// Two things can happen here:
// 1. either we're broadcasting a Tx, in which call we call Tendermint's broadcast endpoint directly,
// 2. or we are querying for state, in which case we call ABCI's Query.
// In both cases, we don't allow empty request args (it will panic unexpectedly).
if reflect.ValueOf(args).IsNil() {
return sdkerrors.Wrap(sdkerrors.ErrInvalidRequest, "request cannot be nil")
}
// Case 1. Broadcasting a Tx.
if isBroadcast(method) {
req, ok := args.(*tx.BroadcastTxRequest)
if !ok {
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), args)
}
res, ok := reply.(*tx.BroadcastTxResponse)
if !ok {
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), args)
}
broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, req)
if err != nil {
return err
}
*res = *broadcastRes
return err
}
// Case 2. Querying state.
reqBz, err := protoCodec.Marshal(args)
if err != nil {
return err
}
// parse height header
md, _ := metadata.FromOutgoingContext(grpcCtx)
if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 {
height, err := strconv.ParseInt(heights[0], 10, 64)
if err != nil {
return err
}
if height < 0 {
return sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader)
}
ctx = ctx.WithHeight(height)
}
req := abci.RequestQuery{
Path: method,
Data: reqBz,
}
res, err := ctx.QueryABCI(req)
if err != nil {
return err
}
err = protoCodec.Unmarshal(res.Value, reply)
if err != nil {
return err
}
// Create header metadata. For now the headers contain:
// - block height
// We then parse all the call options, if the call option is a
// HeaderCallOption, then we manually set the value of that header to the
// metadata.
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10))
for _, callOpt := range opts {
header, ok := callOpt.(grpc.HeaderCallOption)
if !ok {
continue
}
*header.HeaderAddr = md
}
if ctx.InterfaceRegistry != nil {
return types.UnpackInterfaces(reply, ctx.InterfaceRegistry)
}
return nil
}
// NewStream implements the grpc ClientConn.NewStream method
func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) {
return nil, fmt.Errorf("streaming rpc not supported")
}
func isBroadcast(method string) bool {
return method == "/cosmos.tx.v1beta1.Service/BroadcastTx"
}