Skip to content

Commit

Permalink
Merge branch 'master' into perf-tests-logs
Browse files Browse the repository at this point in the history
  • Loading branch information
daixiang0 authored Oct 11, 2022
2 parents e3a6bd1 + 4656eb0 commit 1668566
Show file tree
Hide file tree
Showing 14 changed files with 464 additions and 277 deletions.
5 changes: 5 additions & 0 deletions docs/development/dapr-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ Dapr uses prometheus process and go collectors by default.
* dapr_runtime_actor_deactivated_total: The number of the successful actor deactivation.
* dapr_runtime_actor_deactivated_failed_total: The number of the failed actor deactivation.

#### Resiliency

* dapr_runtime_resiliency_loaded: The number of resiliency policies loaded.
* dapr_runtime_resiliency_count: The number of times a resiliency policy has been executed.

### gRPC monitoring metrics

Dapr leverages opencensus ocgrpc plugin to generate gRPC server and client metrics.
Expand Down
54 changes: 30 additions & 24 deletions pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,15 @@ type Actors interface {
GetActiveActorsCount(ctx context.Context) []ActiveActorsCount
}

// GRPCConnectionFn is the type of the function that returns a gRPC connection
type GRPCConnectionFn func(ctx context.Context, address, id string, namespace string, skipTLS, recreateIfExists, enableSSL bool, customOpts ...grpc.DialOption) (*grpc.ClientConn, func(), error)

type actorsRuntime struct {
appChannel channel.AppChannel
store state.Store
transactionalStore state.TransactionalStore
placement *internal.ActorPlacement
grpcConnectionFn func(ctx context.Context, address, id string, namespace string, skipTLS, recreateIfExists, enableSSL bool, customOpts ...grpc.DialOption) (*grpc.ClientConn, func(), error)
grpcConnectionFn GRPCConnectionFn
config Config
actorsTable *sync.Map
activeTimers *sync.Map
Expand Down Expand Up @@ -139,32 +142,39 @@ const (

var ErrDaprResponseHeader = errors.New("error indicated via actor header response")

// ActorsOpts contains options for NewActors.
type ActorsOpts struct {
StateStore state.Store
AppChannel channel.AppChannel
GRPCConnectionFn GRPCConnectionFn
Config Config
CertChain *daprCredentials.CertChain
TracingSpec configuration.TracingSpec
Features []configuration.FeatureSpec
Resiliency resiliency.Provider
StateStoreName string
}

// NewActors create a new actors runtime with given config.
func NewActors(
stateStore state.Store,
appChannel channel.AppChannel,
grpcConnectionFn func(ctx context.Context, address, id string, namespace string, skipTLS, recreateIfExists, enableSSL bool, customOpts ...grpc.DialOption) (*grpc.ClientConn, func(), error),
config Config,
certChain *daprCredentials.CertChain,
tracingSpec configuration.TracingSpec,
features []configuration.FeatureSpec,
resiliency resiliency.Provider,
stateStoreName string,
) Actors {
func NewActors(opts ActorsOpts) Actors {
var transactionalStore state.TransactionalStore
if stateStore != nil {
features := stateStore.Features()
if opts.StateStore != nil {
features := opts.StateStore.Features()
if state.FeatureETag.IsPresent(features) && state.FeatureTransactional.IsPresent(features) {
transactionalStore = stateStore.(state.TransactionalStore)
transactionalStore = opts.StateStore.(state.TransactionalStore)
}
}

return &actorsRuntime{
appChannel: appChannel,
config: config,
store: stateStore,
store: opts.StateStore,
appChannel: opts.AppChannel,
grpcConnectionFn: opts.GRPCConnectionFn,
config: opts.Config,
certChain: opts.CertChain,
tracingSpec: opts.TracingSpec,
resiliency: opts.Resiliency,
storeName: opts.StateStoreName,
transactionalStore: transactionalStore,
grpcConnectionFn: grpcConnectionFn,
actorsTable: &sync.Map{},
activeTimers: &sync.Map{},
activeTimersLock: &sync.RWMutex{},
Expand All @@ -177,11 +187,7 @@ func NewActors(
evaluationBusy: false,
evaluationChan: make(chan bool),
appHealthy: atomic.NewBool(true),
certChain: certChain,
tracingSpec: tracingSpec,
resiliency: resiliency,
storeName: stateStoreName,
isResiliencyEnabled: configuration.IsFeatureEnabled(features, configuration.Resiliency),
isResiliencyEnabled: configuration.IsFeatureEnabled(opts.Features, configuration.Resiliency),
}
}

Expand Down
155 changes: 135 additions & 20 deletions pkg/actors/actors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,14 @@ func (b *runtimeBuilder) buildActorRuntime() *actorsRuntime {
}

if b.config == nil {
config := NewConfig("", TestAppID, []string{"placement:5050"}, 0, "", config.ApplicationConfig{})
config := NewConfig(ConfigOpts{
HostAddress: "",
AppID: TestAppID,
PlacementAddresses: []string{"placement:5050"},
Port: 0,
Namespace: "",
AppConfig: config.ApplicationConfig{},
})
b.config = &config
}

Expand All @@ -307,33 +314,74 @@ func (b *runtimeBuilder) buildActorRuntime() *actorsRuntime {
storeName = b.actorStoreName
}

a := NewActors(store, b.appChannel, nil, *b.config, nil, tracingSpec, b.featureSpec, resiliency.FromConfigurations(log, testResiliency), storeName)
a := NewActors(ActorsOpts{
StateStore: store,
AppChannel: b.appChannel,
Config: *b.config,
TracingSpec: tracingSpec,
Features: b.featureSpec,
Resiliency: resiliency.FromConfigurations(log, testResiliency),
StateStoreName: storeName,
})

return a.(*actorsRuntime)
}

func newTestActorsRuntimeWithMock(appChannel channel.AppChannel) *actorsRuntime {
spec := config.TracingSpec{SamplingRate: "1"}
store := fakeStore()
config := NewConfig("", TestAppID, []string{"placement:5050"}, 0, "", config.ApplicationConfig{})
a := NewActors(store, appChannel, nil, config, nil, spec, nil, resiliency.New(log), "actorStore")
config := NewConfig(ConfigOpts{
AppID: TestAppID,
PlacementAddresses: []string{"placement:5050"},
AppConfig: config.ApplicationConfig{},
})

a := NewActors(ActorsOpts{
StateStore: store,
AppChannel: appChannel,
Config: config,
TracingSpec: spec,
Resiliency: resiliency.New(log),
StateStoreName: "actorStore",
})

return a.(*actorsRuntime)
}

func newTestActorsRuntimeWithMockWithoutPlacement(appChannel channel.AppChannel) *actorsRuntime {
spec := config.TracingSpec{SamplingRate: "1"}
config := NewConfig("", TestAppID, []string{""}, 0, "", config.ApplicationConfig{})
a := NewActors(nil, appChannel, nil, config, nil, spec, nil, resiliency.New(log), "actorStore")
config := NewConfig(ConfigOpts{
AppID: TestAppID,
PlacementAddresses: []string{""},
AppConfig: config.ApplicationConfig{},
})
a := NewActors(ActorsOpts{
AppChannel: appChannel,
Config: config,
TracingSpec: spec,
Resiliency: resiliency.New(log),
StateStoreName: "actorStore",
})

return a.(*actorsRuntime)
}

func newTestActorsRuntimeWithMockAndNoStore(appChannel channel.AppChannel) *actorsRuntime {
spec := config.TracingSpec{SamplingRate: "1"}
var store state.Store
config := NewConfig("", TestAppID, []string{""}, 0, "", config.ApplicationConfig{})
a := NewActors(store, appChannel, nil, config, nil, spec, nil, resiliency.New(log), "actorStore")
config := NewConfig(ConfigOpts{
AppID: TestAppID,
PlacementAddresses: []string{""},
AppConfig: config.ApplicationConfig{},
})
a := NewActors(ActorsOpts{
StateStore: store,
AppChannel: appChannel,
Config: config,
TracingSpec: spec,
Resiliency: resiliency.New(log),
StateStoreName: "actorStore",
})

return a.(*actorsRuntime)
}
Expand All @@ -351,8 +399,20 @@ func newTestActorsRuntimeWithMockAndActorMetadataPartition(appChannel channel.Ap
},
},
}
c := NewConfig("", TestAppID, []string{"placement:5050"}, 0, "", appConfig)
a := NewActors(store, appChannel, nil, c, nil, spec, []config.FeatureSpec{}, resiliency.New(log), "actorStore")
c := NewConfig(ConfigOpts{
AppID: TestAppID,
PlacementAddresses: []string{"placement:5050"},
AppConfig: appConfig,
})
a := NewActors(ActorsOpts{
StateStore: store,
AppChannel: appChannel,
Config: c,
TracingSpec: spec,
Features: []config.FeatureSpec{},
Resiliency: resiliency.New(log),
StateStoreName: "actorStore",
})

return a.(*actorsRuntime)
}
Expand Down Expand Up @@ -1904,7 +1964,14 @@ func TestConfig(t *testing.T) {
Reentrancy: config.ReentrancyConfig{},
RemindersStoragePartitions: 0,
}
c := NewConfig("localhost:5050", "app1", []string{"placement:5050"}, 3500, "default", appConfig)
c := NewConfig(ConfigOpts{
HostAddress: "localhost:5050",
AppID: "app1",
PlacementAddresses: []string{"placement:5050"},
Port: 3500,
Namespace: "default",
AppConfig: appConfig,
})
assert.Equal(t, "localhost:5050", c.HostAddress)
assert.Equal(t, "app1", c.AppID)
assert.Equal(t, []string{"placement:5050"}, c.PlacementAddresses)
Expand All @@ -1920,7 +1987,14 @@ func TestConfig(t *testing.T) {
func TestReentrancyConfig(t *testing.T) {
appConfig := DefaultAppConfig
t.Run("Test empty reentrancy values", func(t *testing.T) {
c := NewConfig("localhost:5050", "app1", []string{"placement:5050"}, 3500, "default", appConfig)
c := NewConfig(ConfigOpts{
HostAddress: "localhost:5050",
AppID: "app1",
PlacementAddresses: []string{"placement:5050"},
Port: 3500,
Namespace: "default",
AppConfig: appConfig,
})
assert.False(t, c.Reentrancy.Enabled)
assert.NotNil(t, c.Reentrancy.MaxStackDepth)
assert.Equal(t, 32, *c.Reentrancy.MaxStackDepth)
Expand All @@ -1935,7 +2009,14 @@ func TestReentrancyConfig(t *testing.T) {
},
},
}
c := NewConfig("localhost:5050", "app1", []string{"placement:5050"}, 3500, "default", appConfig)
c := NewConfig(ConfigOpts{
HostAddress: "localhost:5050",
AppID: "app1",
PlacementAddresses: []string{"placement:5050"},
Port: 3500,
Namespace: "default",
AppConfig: appConfig,
})
assert.False(t, c.Reentrancy.Enabled)
assert.NotNil(t, c.Reentrancy.MaxStackDepth)
assert.Equal(t, 32, *c.Reentrancy.MaxStackDepth)
Expand All @@ -1944,7 +2025,14 @@ func TestReentrancyConfig(t *testing.T) {

t.Run("Test minimum reentrancy values", func(t *testing.T) {
appConfig.Reentrancy = config.ReentrancyConfig{Enabled: true}
c := NewConfig("localhost:5050", "app1", []string{"placement:5050"}, 3500, "default", appConfig)
c := NewConfig(ConfigOpts{
HostAddress: "localhost:5050",
AppID: "app1",
PlacementAddresses: []string{"placement:5050"},
Port: 3500,
Namespace: "default",
AppConfig: appConfig,
})
assert.True(t, c.Reentrancy.Enabled)
assert.NotNil(t, c.Reentrancy.MaxStackDepth)
assert.Equal(t, 32, *c.Reentrancy.MaxStackDepth)
Expand All @@ -1953,7 +2041,14 @@ func TestReentrancyConfig(t *testing.T) {
t.Run("Test full reentrancy values", func(t *testing.T) {
reentrancyLimit := 64
appConfig.Reentrancy = config.ReentrancyConfig{Enabled: true, MaxStackDepth: &reentrancyLimit}
c := NewConfig("localhost:5050", "app1", []string{"placement:5050"}, 3500, "default", appConfig)
c := NewConfig(ConfigOpts{
HostAddress: "localhost:5050",
AppID: "app1",
PlacementAddresses: []string{"placement:5050"},
Port: 3500,
Namespace: "default",
AppConfig: appConfig,
})
assert.True(t, c.Reentrancy.Enabled)
assert.NotNil(t, c.Reentrancy.MaxStackDepth)
assert.Equal(t, 64, *c.Reentrancy.MaxStackDepth)
Expand Down Expand Up @@ -2089,7 +2184,11 @@ func TestBasicReentrantActorLocking(t *testing.T) {

appConfig := DefaultAppConfig
appConfig.Reentrancy = config.ReentrancyConfig{Enabled: true}
reentrantConfig := NewConfig("", TestAppID, []string{"placement:5050"}, 0, "", appConfig)
reentrantConfig := NewConfig(ConfigOpts{
AppID: TestAppID,
PlacementAddresses: []string{"placement:5050"},
AppConfig: appConfig,
})
reentrantAppChannel := new(reentrantAppChannel)
reentrantAppChannel.nextCall = []*invokev1.InvokeMethodRequest{req2}
reentrantAppChannel.callLog = []string{}
Expand Down Expand Up @@ -2117,7 +2216,11 @@ func TestReentrantActorLockingOverMultipleActors(t *testing.T) {

appConfig := DefaultAppConfig
appConfig.Reentrancy = config.ReentrancyConfig{Enabled: true}
reentrantConfig := NewConfig("", TestAppID, []string{"placement:5050"}, 0, "", appConfig)
reentrantConfig := NewConfig(ConfigOpts{
AppID: TestAppID,
PlacementAddresses: []string{"placement:5050"},
AppConfig: appConfig,
})
reentrantAppChannel := new(reentrantAppChannel)
reentrantAppChannel.nextCall = []*invokev1.InvokeMethodRequest{req2, req3}
reentrantAppChannel.callLog = []string{}
Expand Down Expand Up @@ -2145,7 +2248,11 @@ func TestReentrancyStackLimit(t *testing.T) {
stackDepth := 0
appConfig := DefaultAppConfig
appConfig.Reentrancy = config.ReentrancyConfig{Enabled: true, MaxStackDepth: &stackDepth}
reentrantConfig := NewConfig("", TestAppID, []string{"placement:5050"}, 0, "", appConfig)
reentrantConfig := NewConfig(ConfigOpts{
AppID: TestAppID,
PlacementAddresses: []string{"placement:5050"},
AppConfig: appConfig,
})
reentrantAppChannel := new(reentrantAppChannel)
reentrantAppChannel.nextCall = []*invokev1.InvokeMethodRequest{}
reentrantAppChannel.callLog = []string{}
Expand Down Expand Up @@ -2176,7 +2283,11 @@ func TestReentrancyPerActor(t *testing.T) {
},
},
}
reentrantConfig := NewConfig("", TestAppID, []string{""}, 0, "", appConfig)
reentrantConfig := NewConfig(ConfigOpts{
AppID: TestAppID,
PlacementAddresses: []string{""},
AppConfig: appConfig,
})
reentrantAppChannel := new(reentrantAppChannel)
reentrantAppChannel.nextCall = []*invokev1.InvokeMethodRequest{req2}
reentrantAppChannel.callLog = []string{}
Expand Down Expand Up @@ -2212,7 +2323,11 @@ func TestReentrancyStackLimitPerActor(t *testing.T) {
},
},
}
reentrantConfig := NewConfig("", TestAppID, []string{""}, 0, "", appConfig)
reentrantConfig := NewConfig(ConfigOpts{
AppID: TestAppID,
PlacementAddresses: []string{""},
AppConfig: appConfig,
})
reentrantAppChannel := new(reentrantAppChannel)
reentrantAppChannel.nextCall = []*invokev1.InvokeMethodRequest{}
reentrantAppChannel.callLog = []string{}
Expand Down
Loading

0 comments on commit 1668566

Please sign in to comment.