forked from goadesign/goa
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhandler.go
159 lines (140 loc) · 3.7 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package grpc
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
goa "goa.design/goa/v3/pkg"
)
// Inspired from https://github.com/go-kit/kit/blob/1c17eccf28596f5a2c59314f7923ca66301b90e4/transport/grpc/server.go
type (
// UnaryHandler handles a unary RPC. The request and response types are
// protocol buffer message types.
UnaryHandler interface {
// Handle handles a unary RPC.
//
// It takes a protocol buffer message type and returns a
// protocol buffer message type and any error when executing the
// RPC.
Handle(ctx context.Context, reqpb any) (respb any, err error)
}
// StreamHandler handles a streaming RPC. The stream may be client-side,
// server-side, or bidirectional.
StreamHandler interface {
// Handle handles a streaming RPC.
//
// input contains the endpoint payload (if any) and generated
// endpoint stream.
Handle(ctx context.Context, input any) (err error)
// Decode decodes the protocol buffer message and metadata to
// the service type. For client-side and bidirectional streams,
// the message is nil.
Decode(ctx context.Context, reqpb any) (req any, err error)
}
unaryHandler struct {
endpoint goa.Endpoint
decoder RequestDecoder
encoder ResponseEncoder
}
streamHandler struct {
endpoint goa.Endpoint
decoder RequestDecoder
}
)
// NewUnaryHandler returns a handler to handle unary gRPC endpoints.
func NewUnaryHandler(e goa.Endpoint, dec RequestDecoder, enc ResponseEncoder) UnaryHandler {
return &unaryHandler{
endpoint: e,
decoder: dec,
encoder: enc,
}
}
// NewStreamHandler returns a handler to handle streaming gRPC endpoints.
func NewStreamHandler(e goa.Endpoint, dec RequestDecoder) StreamHandler {
return &streamHandler{
endpoint: e,
decoder: dec,
}
}
// Handle serves a gRPC request.
func (h *unaryHandler) Handle(ctx context.Context, reqpb any) (any, error) {
var (
req any
err error
)
{
if h.decoder != nil {
// Decode gRPC request message and incoming metadata
md, _ := metadata.FromIncomingContext(ctx)
if req, err = h.decoder(ctx, reqpb, md); err != nil {
if _, ok := err.(*goa.ServiceError); ok {
return nil, err
}
return nil, status.Error(codes.InvalidArgument, err.Error())
}
}
}
var (
resp any
)
{
// Invoke goa endpoint
if resp, err = h.endpoint(ctx, req); err != nil {
return nil, err
}
}
var (
respb any
hdr = metadata.MD{}
trlr = metadata.MD{}
)
{
if h.encoder != nil {
// Encode gRPC response
if respb, err = h.encoder(ctx, resp, &hdr, &trlr); err != nil {
if _, ok := err.(*goa.ServiceError); ok {
return nil, err
}
return nil, status.Error(codes.Unknown, err.Error())
}
}
}
// Encode gRPC headers
if len(hdr) > 0 {
if err := grpc.SendHeader(ctx, hdr); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
}
}
// Encode gRPC trailers
if len(trlr) > 0 {
if err := grpc.SetTrailer(ctx, trlr); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
}
}
return respb, err
}
// Decode decodes the request message and incoming metadata into goa type.
func (h *streamHandler) Decode(ctx context.Context, reqpb any) (any, error) {
var (
req any
err error
)
{
if h.decoder != nil {
md, _ := metadata.FromIncomingContext(ctx)
if req, err = h.decoder(ctx, reqpb, md); err != nil {
if _, ok := err.(*goa.ServiceError); ok {
return nil, err
}
return nil, status.Error(codes.InvalidArgument, err.Error())
}
}
}
return req, nil
}
// Handle serves a gRPC request.
func (h *streamHandler) Handle(ctx context.Context, stream any) error {
_, err := h.endpoint(ctx, stream)
return err
}