Skip to content

Commit

Permalink
Introducing MAPQ: Multi-tenant, Auto-partitioned, Persistent Queue (c…
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir authored Jun 24, 2024
1 parent 83ebf7a commit de1aafc
Show file tree
Hide file tree
Showing 28 changed files with 2,927 additions and 0 deletions.
3 changes: 3 additions & 0 deletions common/log/tag/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ var (
ComponentPinotVisibilityManager = component("pinot-visibility-manager")
ComponentAsyncWFConsumptionManager = component("async-wf-consumption-manager")
ComponentGlobalRatelimiter = component("global-ratelimiter")
ComponentMapQ = component("mapq")
ComponentMapQTree = component("mapq-tree")
ComponentMapQTreeNode = component("mapq-tree-node")
)

// Pre-defined values for TagSysLifecycle
Expand Down
65 changes: 65 additions & 0 deletions common/mapq/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# MAPQ: Multi-tenant, Auto-partitioned, Persistent Queue

NOTE: This component is WIP.

## Overview

MAPQ is a new queue framework (introduced in June 2024), aiming to unify Cadence's internal task/request queues. The existing implementations for these applications are cumbersome and maintenance-heavy, with significant overlap and limited extensibility.
MAPQ will address the challenges of scalability, throughput, consistency, and ordering guarantees required by these diverse needs.


Challenges and Motivation
- History Task Queues: These queues are poorly understood and difficult to maintain, owing to the departure of their original developer and the non-maintainable state of the code. The design struggles with burst loads from timer/child workflow cases, requiring introduction of more granular task types and automated partitioning that the current system cannot support without extensive refactoring.
- Matching Task Lists: These are basic FIFO queues with some advanced features like sticky task lists, zonal isolation groups and partitioning. The most pressing issue is auto partitioning to reduce operational overhead.
- Async Request Queues: Initially integrated with Kafka topics as the request queue. Initial testing faced challenges like complex provisioning, inability to dynamically create topics/register consumers, poor visibility into the requests in the queue and difficult to tweak alerts. Async APIs are already designed with pluggable queue implementation already so swapping Kafka with something else will not be tricky.


### Goals

MAPQ will provide a solution tailored to meet the following goals:

- Multi-Tenancy: Guarantees fair access to resources for each tenant based on predefined quotas.
- Auto-Partitioning: Dynamically adjusts partitions based on specified fine-grained policies, supporting both random and deterministic message distribution across physically or virtually partitioned queues.
- Burst-Protection: Detects incoming message spikes and mitigates by utilizing dynamic auto-partitioning.
- Skew-Protection: Detects incoming message skews for given partition keys and mitigates by utilizing dynamic auto-partitioning.
- Advanced Partitioning Policies: Executes on a tree-like partitioning policy to support various levels of partition key hierarchies and strategies.
- Persistent: Ensures message durability via pluggable persistent layer.
- Delivery Guarantees: Guarantees at least once delivery.


### Problems with Existing Queues in Cadence

History Queues:

- Lack of granular partitioning and inextensibility of history queues make it difficult to address following pathological scenarios:
- Task prioritization: Background tasks like workflow deletion timer tasks share the same queue and consume from the same “processing budget” as other high priority tasks such as user created timers. This is because all timer tasks for a given shard are managed by a single queue.
- Multi tenancy: Tasks of the same type (e.g. all timers) are managed by a single queue and a noisy domain can drastically regress the experience of other domains. It is not possible to write tasks of a specific domain(s) to a separate queue and adjust read/write qps. Current queue granularity ends at task type (timer or transfer).
- Burst cases: Bursts of timers or child workflows are known issues that Cadence has no answers to. These bursts usually cause timeouts and may also impact processing of other domains’ tasks.

## High Level Design

MAPQ uses a tree data structure where nodes route incoming messages to child nodes. Nodes can be splitted/merged based on given policies. All leaf nodes are at the same level. Leaf nodes are the actual “queues” where messages are written to/read from via a provided persistent layer plugin.

The routing key per level, partitioning/departitioning strategy, RPS limits and other options are provided to MAPQ during initialization as a tree-like policy. It contains per level defaults and per-node (identified via path from root) overrides.

Once initialized the tree will have a minimal number of nodes provided in the policy but it respects policies for not-yet-existing nodes. Since MAPQ supports auto-partitioning there will be new nodes added/removed and it accepts providing policies for such nodes. For example, you might want to partition by domain only for bursty domains and allocate them specific RPS.


#### Tree structure with policies

![MAPQ partitioned queue tree](../../docs/images/mapq_partitioned_queue_tree_example.png)


#### Initialization and Object Hierarcy

![MAPQ initialization](../../docs/images/mapq_initialization.png)


#### Enqueue Flow

![MAPQ enqueue flow](../../docs/images/mapq_enqueue_flow.png)


#### Dispatch Flow

![MAPQ enqueue flow](../../docs/images/mapq_dispatch_flow.png)
85 changes: 85 additions & 0 deletions common/mapq/client_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package mapq

import (
"context"
"errors"
"fmt"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/mapq/tree"
"github.com/uber/cadence/common/mapq/types"
"github.com/uber/cadence/common/metrics"
)

type clientImpl struct {
logger log.Logger
scope metrics.Scope
persister types.Persister
consumerFactory types.ConsumerFactory
tree *tree.QueueTree
partitions []string
policies []types.NodePolicy
}

func (c *clientImpl) Start(ctx context.Context) error {
c.logger.Info("Starting MAPQ client")
err := c.tree.Start(ctx)
if err != nil {
return err
}

c.logger.Info("Started MAPQ client")
return nil
}

func (c *clientImpl) Stop(ctx context.Context) error {
c.logger.Info("Stopping MAPQ client")

// Stop the tree which will stop the dispatchers
if err := c.tree.Stop(ctx); err != nil {
return fmt.Errorf("failed to stop tree: %w", err)
}

// stop the consumer factory which will stop the consumers
err := c.consumerFactory.Stop(ctx)
if err != nil {
return fmt.Errorf("failed to stop consumer factory: %w", err)
}

c.logger.Info("Stopped MAPQ client")
return nil
}

func (c *clientImpl) Enqueue(ctx context.Context, items []types.Item) ([]types.ItemToPersist, error) {
return c.tree.Enqueue(ctx, items)
}

func (c *clientImpl) Ack(context.Context, types.Item) error {
return errors.New("not implemented")
}

func (c *clientImpl) Nack(context.Context, types.Item) error {
return errors.New("not implemented")
}
161 changes: 161 additions & 0 deletions common/mapq/client_impl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package mapq

import (
"context"
"testing"

"github.com/golang/mock/gomock"
"go.uber.org/goleak"

"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/mapq/types"
"github.com/uber/cadence/common/metrics"
)

func TestNew(t *testing.T) {
ctrl := gomock.NewController(t)

tests := []struct {
name string
opts []Options
wantErr bool
}{
{
name: "success",
opts: []Options{
WithPersister(types.NewMockPersister(ctrl)),
WithConsumerFactory(types.NewMockConsumerFactory(ctrl)),
},
},
{
name: "no persister",
wantErr: true,
opts: []Options{
WithConsumerFactory(types.NewMockConsumerFactory(ctrl)),
},
},
{
name: "no consumer factoru",
wantErr: true,
opts: []Options{
WithPersister(types.NewMockPersister(ctrl)),
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
logger := testlogger.New(t)
scope := metrics.NoopScope(0)
cl, err := New(logger, scope, tc.opts...)
if (err != nil) != tc.wantErr {
t.Errorf("New() error: %v, wantErr: %v", err, tc.wantErr)
}

if err != nil {
return
}

_, ok := cl.(*clientImpl)
if !ok {
t.Errorf("New() = %T, want *clientImpl", cl)
}
})
}
}

func TestStartStop(t *testing.T) {
defer goleak.VerifyNone(t)
ctrl := gomock.NewController(t)
consumerFactory := types.NewMockConsumerFactory(ctrl)
consumer := types.NewMockConsumer(ctrl)
consumerFactory.EXPECT().Stop(gomock.Any()).Return(nil).Times(1)
consumerFactory.EXPECT().New(gomock.Any()).Return(consumer, nil).Times(1)
opts := []Options{
WithPersister(types.NewMockPersister(ctrl)),
WithConsumerFactory(consumerFactory),
}
logger := testlogger.New(t)
scope := metrics.NoopScope(0)
cl, err := New(logger, scope, opts...)
if err != nil {
t.Fatalf("New() error: %v", err)
}

cl.Start(context.Background())
defer cl.Stop(context.Background())
}

func TestAck(t *testing.T) {
ctrl := gomock.NewController(t)
consumerFactory := types.NewMockConsumerFactory(ctrl)
consumer := types.NewMockConsumer(ctrl)
consumerFactory.EXPECT().Stop(gomock.Any()).Return(nil).Times(1)
consumerFactory.EXPECT().New(gomock.Any()).Return(consumer, nil).Times(1)
opts := []Options{
WithPersister(types.NewMockPersister(ctrl)),
WithConsumerFactory(consumerFactory),
}
logger := testlogger.New(t)
scope := metrics.NoopScope(0)
cl, err := New(logger, scope, opts...)
if err != nil {
t.Fatalf("New() error: %v", err)
}

cl.Start(context.Background())
defer cl.Stop(context.Background())

err = cl.Ack(context.Background(), nil)
if err == nil || err.Error() != "not implemented" {
t.Errorf("Ack() error: %q, want %q", err, "not implemented")
}
}

func TestNack(t *testing.T) {
ctrl := gomock.NewController(t)
consumerFactory := types.NewMockConsumerFactory(ctrl)
consumer := types.NewMockConsumer(ctrl)
consumerFactory.EXPECT().Stop(gomock.Any()).Return(nil).Times(1)
consumerFactory.EXPECT().New(gomock.Any()).Return(consumer, nil).Times(1)
opts := []Options{
WithPersister(types.NewMockPersister(ctrl)),
WithConsumerFactory(consumerFactory),
}
logger := testlogger.New(t)
scope := metrics.NoopScope(0)
cl, err := New(logger, scope, opts...)
if err != nil {
t.Fatalf("New() error: %v", err)
}

cl.Start(context.Background())
defer cl.Stop(context.Background())

err = cl.Nack(context.Background(), nil)
if err == nil || err.Error() != "not implemented" {
t.Errorf("Ack() error: %q, want %q", err, "not implemented")
}
}
72 changes: 72 additions & 0 deletions common/mapq/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package dispatcher

import (
"context"
"fmt"
"sync"
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/mapq/types"
)

type Dispatcher struct {
consumer types.Consumer
ctx context.Context
cancelCtx context.CancelFunc
wg sync.WaitGroup
}

func New(c types.Consumer) *Dispatcher {
ctx, cancelCtx := context.WithCancel(context.Background())
return &Dispatcher{
consumer: c,
ctx: ctx,
cancelCtx: cancelCtx,
}
}

func (d *Dispatcher) Start(ctx context.Context) error {
d.wg.Add(1)
go d.run()
return nil
}

func (d *Dispatcher) Stop(ctx context.Context) error {
d.cancelCtx()
timeout := 10 * time.Second
if dl, ok := ctx.Deadline(); ok {
timeout = time.Until(dl)
}
if !common.AwaitWaitGroup(&d.wg, timeout) {
return fmt.Errorf("failed to stop dispatcher in %v", timeout)
}
return nil
}

func (d *Dispatcher) run() {
defer d.wg.Done()
// TODO: implement
}
Loading

0 comments on commit de1aafc

Please sign in to comment.