Skip to content

Commit

Permalink
Support invoking RPC using HTTP and JSON (cadence-workflow#5305)
Browse files Browse the repository at this point in the history
What changed?
Adding HTTP option to RPC part.

Why?
Related issue: cadence-workflow#5265

How did you test it?

Potential risks

Release notes

Documentation Changes
Added additional config file to give an example of possible configuration
  • Loading branch information
mantas-sidlauskas authored Jun 15, 2023
1 parent e2a2a94 commit 01b7932
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 3 deletions.
10 changes: 10 additions & 0 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ type (
GRPCMaxMsgSize int `yaml:"grpcMaxMsgSize"`
// TLS allows configuring optional TLS/SSL authentication on the server (only on gRPC port)
TLS TLS `yaml:"tls"`
// HTTP keeps configuration for exposed HTTP API
HTTP *HTTP `yaml:"http"`
}

// HTTP API configuration
HTTP struct {
// Port for listening HTTP requests
Port uint16 `yaml:"port"`
// List of RPC procedures available to call using HTTP
Procedures []string `yaml:"procedures"`
}

// Blobstore contains the config for blobstore
Expand Down
27 changes: 24 additions & 3 deletions common/rpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ package rpc
import (
"crypto/tls"
"net"
nethttp "net/http"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"

"go.uber.org/yarpc"
"go.uber.org/yarpc/transport/grpc"
yarpchttp "go.uber.org/yarpc/transport/http"
"go.uber.org/yarpc/transport/tchannel"
"google.golang.org/grpc/credentials"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
)

const defaultGRPCSizeLimit = 4 * 1024 * 1024
Expand Down Expand Up @@ -83,7 +85,26 @@ func NewFactory(logger log.Logger, p Params) *Factory {
inbounds = append(inbounds, grpcTransport.NewInbound(listener, inboundOptions...))
logger.Info("Listening for GRPC requests", tag.Address(p.GRPCAddress))
}
// Create http inbound if configured
if p.HTTP != nil {
interceptor := func(handler nethttp.Handler) nethttp.Handler {
return nethttp.HandlerFunc(func(w nethttp.ResponseWriter, r *nethttp.Request) {
procedure := r.Header.Get(yarpchttp.ProcedureHeader)
if _, found := p.HTTP.Procedures[procedure]; found {
handler.ServeHTTP(w, r)
return
}
nethttp.NotFound(w, r)
return
})
}

httpinbound := yarpchttp.NewTransport().
NewInbound(p.HTTP.Address, yarpchttp.Interceptor(interceptor))

inbounds = append(inbounds, httpinbound)
logger.Info("Listening for HTTP requests", tag.Address(p.HTTP.Address))
}
// Create outbounds
outbounds := yarpc.Outbounds{}
if p.OutboundsBuilder != nil {
Expand Down
25 changes: 25 additions & 0 deletions common/rpc/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package rpc

import (
"crypto/tls"
"errors"
"fmt"
"net"
"regexp"
Expand All @@ -40,6 +41,7 @@ type Params struct {
TChannelAddress string
GRPCAddress string
GRPCMaxMsgSize int
HTTP *HTTP

InboundTLS *tls.Config
OutboundTLS map[string]*tls.Config
Expand All @@ -50,6 +52,11 @@ type Params struct {
OutboundsBuilder OutboundsBuilder
}

type HTTP struct {
Address string
Procedures map[string]struct{}
}

// NewParams creates parameters for rpc.Factory from the given config
func NewParams(serviceName string, config *config.Config, dc *dynamicconfig.Collection) (Params, error) {
serviceConfig, err := config.GetServiceConfig(serviceName)
Expand Down Expand Up @@ -93,9 +100,27 @@ func NewParams(serviceName string, config *config.Config, dc *dynamicconfig.Coll
// not set, load from static config
forwardingRules = config.HeaderForwardingRules
}
var httpParams *HTTP

if serviceConfig.RPC.HTTP != nil {
if serviceConfig.RPC.HTTP.Port <= 0 {
return Params{}, errors.New("HTTP port is not set")
}
procedureMap := map[string]struct{}{}

for _, v := range serviceConfig.RPC.HTTP.Procedures {
procedureMap[v] = struct{}{}
}

httpParams = &HTTP{
Address: net.JoinHostPort(listenIP.String(), strconv.Itoa(int(serviceConfig.RPC.HTTP.Port))),
Procedures: procedureMap,
}
}

return Params{
ServiceName: serviceName,
HTTP: httpParams,
TChannelAddress: net.JoinHostPort(listenIP.String(), strconv.Itoa(int(serviceConfig.RPC.Port))),
GRPCAddress: net.JoinHostPort(listenIP.String(), strconv.Itoa(int(serviceConfig.RPC.GRPCPort))),
GRPCMaxMsgSize: serviceConfig.RPC.GRPCMaxMsgSize,
Expand Down
7 changes: 7 additions & 0 deletions common/rpc/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ func TestNewParams(t *testing.T) {
assert.Equal(t, 3333, params.GRPCMaxMsgSize)
assert.Nil(t, params.InboundTLS)

params, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, HTTP: &config.HTTP{Port: 8800}}}), dc)
assert.NoError(t, err)
assert.Equal(t, "127.0.0.1:8800", params.HTTP.Address)

params, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, HTTP: &config.HTTP{}}}), dc)
assert.Error(t, err)

params, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnIP: "1.2.3.4", GRPCPort: 2222}}), dc)
assert.NoError(t, err)
assert.Equal(t, "1.2.3.4:2222", params.GRPCAddress)
Expand Down
41 changes: 41 additions & 0 deletions config/development_http_api.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
services:
frontend:
rpc:
port: 7933
grpcPort: 7833
bindOnLocalHost: true
grpcMaxMsgSize: 33554432
# enable HTTP server, allow to call Start worfklow using HTTP API
# Use curl to start a workflow:
# curl http://0.0.0.0:8800 \
# -H 'context-ttl-ms: 2000' \
# -H 'rpc-caller: rpc-client-name' \
# -H 'rpc-service: cadence-frontend' \
# -H 'rpc-encoding: json' \
# -H 'rpc-procedure: uber.cadence.api.v1.WorkflowAPI::StartWorkflowExecution' \
# -X POST --data @data.json
# Where data.json content looks something like this:
# {
# "domain": "samples-domain",
# "workflowId": "workflowid123",
# "execution_start_to_close_timeout": "11s",
# "task_start_to_close_timeout": "10s",
# "workflowType": {
# "name": "workflow_type"
# },
# "taskList": {
# "name": "tasklist-name"
# },
# "identity": "My custom identity",
# "requestId": "4D1E4058-6FCF-4BA8-BF16-8FA8B02F9651"
# }
http:
port: 8800
procedures:
- uber.cadence.api.v1.WorkflowAPI::StartWorkflowExecution
metrics:
statsd:
hostPort: "127.0.0.1:8125"
prefix: "cadence"
pprof:
port: 7936

0 comments on commit 01b7932

Please sign in to comment.