Skip to content

Commit

Permalink
Make history client sharding aware
Browse files Browse the repository at this point in the history
Summary:
This changes the history service client to be aware of the number of shards
and use the service resolver to identify which host should serve a given
request, based on the workflowID in the request.
It also makes the numberOfShards a parameter, not a constant.

Test Plan: UT, integration test

Reviewers: sivakk, maxim, samar

Reviewed By: samar

Subscribers: jenkins

Differential Revision: https://code.uberinternal.com/D721011
  • Loading branch information
Tamer Eldeeb committed Feb 1, 2017
1 parent 01f7b1e commit e467ee4
Show file tree
Hide file tree
Showing 17 changed files with 239 additions and 153 deletions.
41 changes: 41 additions & 0 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package client

import (
"code.uber.internal/devexp/minions/client/history"
"code.uber.internal/devexp/minions/client/matching"
"code.uber.internal/devexp/minions/common/membership"
"code.uber.internal/devexp/minions/common/metrics"
tchannel "github.com/uber/tchannel-go"
)

// Factory can be used to create RPC clients for cadence services
type Factory interface {
NewHistoryClient() (history.Client, error)
NewMatchingClient() (matching.Client, error)
}

type tchannelClientFactory struct {
ch *tchannel.Channel
monitor membership.Monitor
metricsClient metrics.Client
numberOfHistoryShards int
}

// NewTChannelClientFactory creates an instance of client factory using tchannel
func NewTChannelClientFactory(ch *tchannel.Channel,
monitor membership.Monitor, metricsClient metrics.Client, numberOfHistoryShards int) Factory {
return &tchannelClientFactory{
ch: ch,
monitor: monitor,
metricsClient: metricsClient,
numberOfHistoryShards: numberOfHistoryShards,
}
}

func (cf *tchannelClientFactory) NewHistoryClient() (history.Client, error) {
return history.NewClient(cf.ch, cf.monitor, cf.numberOfHistoryShards)
}

func (cf *tchannelClientFactory) NewMatchingClient() (matching.Client, error) {
return matching.NewClient(cf.ch, cf.monitor)
}
110 changes: 79 additions & 31 deletions client/history/client.go
Original file line number Diff line number Diff line change
@@ -1,60 +1,52 @@
package history

import (
"sync"
"time"

"golang.org/x/net/context"

h "code.uber.internal/devexp/minions/.gen/go/history"
workflow "code.uber.internal/devexp/minions/.gen/go/shared"
"code.uber.internal/devexp/minions/common/membership"
"code.uber.internal/devexp/minions/common/util"
tchannel "github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/thrift"
)

const historyServiceName = "cadence-history"
const shardID = "1" // TODO: actually derive shardID from request

var _ Client = (*clientImpl)(nil)

type clientImpl struct {
connection *tchannel.Channel
resolver membership.ServiceResolver
connection *tchannel.Channel
resolver membership.ServiceResolver
tokenSerializer util.TaskTokenSerializer
numberOfShards int
// TODO: consider refactor thriftCache into a separate struct
thriftCacheLock sync.RWMutex
thriftCache map[string]h.TChanHistoryService
}

// NewClient creates a new history service TChannel client
func NewClient(ch *tchannel.Channel, monitor membership.Monitor) (Client, error) {
func NewClient(ch *tchannel.Channel, monitor membership.Monitor, numberOfShards int) (Client, error) {
sResolver, err := monitor.GetResolver(historyServiceName)
if err != nil {
return nil, err
}

client := &clientImpl{
connection: ch,
resolver: sResolver,
connection: ch,
resolver: sResolver,
tokenSerializer: util.NewJSONTaskTokenSerializer(),
numberOfShards: numberOfShards,
thriftCache: make(map[string]h.TChanHistoryService),
}
return client, nil
}

func (c *clientImpl) getHostForRequest(key string) (h.TChanHistoryService, error) {
host, err := c.resolver.Lookup(key)
if err != nil {
return nil, err
}
// TODO: build client cache
tClient := thrift.NewClient(c.connection, historyServiceName, &thrift.ClientOptions{
HostPort: host.GetAddress(),
})
return h.NewTChanHistoryServiceClient(tClient), nil
}

func (c *clientImpl) createContext() (thrift.Context, context.CancelFunc) {
// TODO: make timeout configurable
return thrift.NewContext(time.Second * 30)
}

func (c *clientImpl) StartWorkflowExecution(request *workflow.StartWorkflowExecutionRequest) (*workflow.StartWorkflowExecutionResponse, error) {
client, err := c.getHostForRequest(shardID)
client, err := c.getHostForRequest(request.GetWorkflowId())
if err != nil {
return nil, err
}
Expand All @@ -65,7 +57,7 @@ func (c *clientImpl) StartWorkflowExecution(request *workflow.StartWorkflowExecu

func (c *clientImpl) GetWorkflowExecutionHistory(
request *workflow.GetWorkflowExecutionHistoryRequest) (*workflow.GetWorkflowExecutionHistoryResponse, error) {
client, err := c.getHostForRequest(shardID)
client, err := c.getHostForRequest(request.Execution.GetWorkflowId())
if err != nil {
return nil, err
}
Expand All @@ -75,7 +67,7 @@ func (c *clientImpl) GetWorkflowExecutionHistory(
}

func (c *clientImpl) RecordDecisionTaskStarted(request *h.RecordDecisionTaskStartedRequest) (*h.RecordDecisionTaskStartedResponse, error) {
client, err := c.getHostForRequest(shardID)
client, err := c.getHostForRequest(request.WorkflowExecution.GetWorkflowId())
if err != nil {
return nil, err
}
Expand All @@ -85,7 +77,7 @@ func (c *clientImpl) RecordDecisionTaskStarted(request *h.RecordDecisionTaskStar
}

func (c *clientImpl) RecordActivityTaskStarted(request *h.RecordActivityTaskStartedRequest) (*h.RecordActivityTaskStartedResponse, error) {
client, err := c.getHostForRequest(shardID)
client, err := c.getHostForRequest(request.WorkflowExecution.GetWorkflowId())
if err != nil {
return nil, err
}
Expand All @@ -95,7 +87,11 @@ func (c *clientImpl) RecordActivityTaskStarted(request *h.RecordActivityTaskStar
}

func (c *clientImpl) RespondDecisionTaskCompleted(request *workflow.RespondDecisionTaskCompletedRequest) error {
client, err := c.getHostForRequest(shardID)
taskToken, err := c.tokenSerializer.Deserialize(request.TaskToken)
if err != nil {
return err
}
client, err := c.getHostForRequest(taskToken.WorkflowID)
if err != nil {
return err
}
Expand All @@ -105,7 +101,11 @@ func (c *clientImpl) RespondDecisionTaskCompleted(request *workflow.RespondDecis
}

func (c *clientImpl) RespondActivityTaskCompleted(request *workflow.RespondActivityTaskCompletedRequest) error {
client, err := c.getHostForRequest(shardID)
taskToken, err := c.tokenSerializer.Deserialize(request.TaskToken)
if err != nil {
return err
}
client, err := c.getHostForRequest(taskToken.WorkflowID)
if err != nil {
return err
}
Expand All @@ -115,7 +115,11 @@ func (c *clientImpl) RespondActivityTaskCompleted(request *workflow.RespondActiv
}

func (c *clientImpl) RespondActivityTaskFailed(request *workflow.RespondActivityTaskFailedRequest) error {
client, err := c.getHostForRequest(shardID)
taskToken, err := c.tokenSerializer.Deserialize(request.TaskToken)
if err != nil {
return err
}
client, err := c.getHostForRequest(taskToken.WorkflowID)
if err != nil {
return err
}
Expand All @@ -125,11 +129,55 @@ func (c *clientImpl) RespondActivityTaskFailed(request *workflow.RespondActivity
}

func (c *clientImpl) RecordActivityTaskHeartbeat(request *workflow.RecordActivityTaskHeartbeatRequest) (*workflow.RecordActivityTaskHeartbeatResponse, error) {
client, err := c.getHostForRequest(shardID)
taskToken, err := c.tokenSerializer.Deserialize(request.TaskToken)
if err != nil {
return nil, err
}
client, err := c.getHostForRequest(taskToken.WorkflowID)
if err != nil {
return nil, err
}
ctx, cancel := c.createContext()
defer cancel()
return client.RecordActivityTaskHeartbeat(ctx, request)
}

func (c *clientImpl) getHostForRequest(workflowID string) (h.TChanHistoryService, error) {
key := util.WorkflowIDToHistoryShard(workflowID, c.numberOfShards)
host, err := c.resolver.Lookup(string(key))
if err != nil {
return nil, err
}

return c.getThriftClient(host.GetAddress()), nil
}

func (c *clientImpl) createContext() (thrift.Context, context.CancelFunc) {
// TODO: make timeout configurable
return thrift.NewContext(time.Second * 30)
}

func (c *clientImpl) getThriftClient(hostPort string) h.TChanHistoryService {
c.thriftCacheLock.RLock()
client, ok := c.thriftCache[hostPort]
c.thriftCacheLock.RUnlock()
if ok {
return client
}

c.thriftCacheLock.Lock()
defer c.thriftCacheLock.Unlock()

// check again if in the cache cause it might have been added
// before we acquired the lock
client, ok = c.thriftCache[hostPort]
if !ok {
tClient := thrift.NewClient(c.connection, historyServiceName, &thrift.ClientOptions{
HostPort: hostPort,
})

client = h.NewTChanHistoryServiceClient(tClient)
c.thriftCache[hostPort] = client
}
return client
}
34 changes: 0 additions & 34 deletions common/clientfactory.go

This file was deleted.

30 changes: 21 additions & 9 deletions common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"time"

"code.uber.internal/devexp/minions/client"
"code.uber.internal/devexp/minions/common/logging"
"code.uber.internal/devexp/minions/common/membership"
"code.uber.internal/devexp/minions/common/metrics"
Expand Down Expand Up @@ -40,23 +41,27 @@ type serviceImpl struct {
rpSeedHosts []string
membershipMonitor membership.Monitor
tchannelFactory TChannelFactory
clientFactory ClientFactory
clientFactory client.Factory
numberOfHistoryShards int
logger bark.Logger
metricsScope tally.Scope
runtimeMetricsReporter *metrics.RuntimeMetricsReporter
}

// NewService instantiates a ServiceInstance
// TODO: have a better name for Service.
// TODO: consider passing a config object if the parameter list gets too big
// this is the object which holds all the common stuff
// shared by all the services.
func NewService(serviceName string, logger bark.Logger, scope tally.Scope, tchanFactory TChannelFactory, rpHosts []string) Service {
func NewService(serviceName string, logger bark.Logger,
scope tally.Scope, tchanFactory TChannelFactory, rpHosts []string, numberOfHistoryShards int) Service {
sVice := &serviceImpl{
sName: serviceName,
logger: logger.WithField("Service", serviceName),
tchannelFactory: tchanFactory,
rpSeedHosts: rpHosts,
metricsScope: scope,
sName: serviceName,
logger: logger.WithField("Service", serviceName),
tchannelFactory: tchanFactory,
rpSeedHosts: rpHosts,
metricsScope: scope,
numberOfHistoryShards: numberOfHistoryShards,
}
sVice.runtimeMetricsReporter = metrics.NewRuntimeMetricsReporter(scope, time.Minute, sVice.logger)

Expand Down Expand Up @@ -126,7 +131,8 @@ func (h *serviceImpl) Start(thriftServices []thrift.TChanServer) {
}
h.hostInfo = hostInfo

h.clientFactory = newTChannelClientFactory(h.ch, h.membershipMonitor)
metricsClient := metrics.NewClient(h.metricsScope, h.getMetricsServiceIdx(h.sName))
h.clientFactory = client.NewTChannelClientFactory(h.ch, h.membershipMonitor, metricsClient, h.numberOfHistoryShards)

// The service is now started up
log.Info("service started")
Expand Down Expand Up @@ -161,7 +167,7 @@ func (h *serviceImpl) GetMetricsScope() tally.Scope {
return h.metricsScope
}

func (h *serviceImpl) GetClientFactory() ClientFactory {
func (h *serviceImpl) GetClientFactory() client.Factory {
return h.clientFactory
}

Expand Down Expand Up @@ -192,3 +198,9 @@ func (h *serviceImpl) bootstrapRingpop(rp *ringpop.Ringpop, rpHosts []string) er
_, err := rp.Bootstrap(bOptions)
return err
}

func (h *serviceImpl) getMetricsServiceIdx(serviceName string) metrics.ServiceIdx {
// for now we always use frontend for all metrics
// TODO: return proper index based on service name once per-service metrics are defined
return metrics.Frontend
}
3 changes: 2 additions & 1 deletion common/serviceinterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/uber-go/tally"
"github.com/uber/tchannel-go/thrift"

"code.uber.internal/devexp/minions/client"
"code.uber.internal/devexp/minions/common/membership"
)

Expand All @@ -24,7 +25,7 @@ type (

GetMetricsScope() tally.Scope

GetClientFactory() ClientFactory
GetClientFactory() client.Factory

GetMembershipMonitor() membership.Monitor

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package common
package util

import "encoding/json"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package common
package util

type (
// TaskTokenSerializer serializes task tokens
Expand Down
8 changes: 8 additions & 0 deletions common/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"sync"
"time"

farm "github.com/dgryski/go-farm"

workflow "code.uber.internal/devexp/minions/.gen/go/shared"
"code.uber.internal/devexp/minions/common/backoff"
)
Expand Down Expand Up @@ -74,3 +76,9 @@ func IsPersistenceTransientError(err error) bool {

return false
}

// WorkflowIDToHistoryShard is used to map workflowID to a shardID
func WorkflowIDToHistoryShard(workflowID string, numberOfShards int) int {
hash := farm.Fingerprint32([]byte(workflowID))
return int(hash % uint32(numberOfShards))
}
Loading

0 comments on commit e467ee4

Please sign in to comment.