Skip to content

Commit

Permalink
ThroughputControl-Global (Azure#19183)
Browse files Browse the repository at this point in the history
* ThroughputControl-Global

Co-authored-by: Annie Liang <[email protected]>
Co-authored-by: annie-mac <[email protected]>
Co-authored-by: annie-mac <[email protected]>
Co-authored-by: annie-mac <[email protected]>
  • Loading branch information
5 people authored Mar 2, 2021
1 parent de1c5db commit 39a7a66
Show file tree
Hide file tree
Showing 56 changed files with 2,339 additions and 838 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.azure.cosmos.implementation.query.QueryInfo;
import com.azure.cosmos.implementation.query.metrics.ClientSideMetrics;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.throughputControl.ThroughputControlMode;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosStoredProcedureProperties;
import com.azure.cosmos.models.FeedResponse;
Expand Down Expand Up @@ -796,18 +795,13 @@ public static List<PatchOperation> getPatchOperationsFromCosmosPatch(CosmosPatch
return cosmosPatchOperations.getPatchOperations();
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static ThroughputControlMode getThroughputControlMode(ThroughputControlGroup throughputControlGroup) {
return throughputControlGroup.getControlMode();
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static SqlQuerySpec getOfferQuerySpecFromResourceId(CosmosAsyncContainer container, String resourceId) {
return container.getDatabase().getOfferQuerySpecFromResourceId(resourceId);
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static CosmosAsyncContainer getTargetContainerFromThroughputControlGroup(ThroughputControlGroup controlGroup) {
return controlGroup.getTargetContainer();
public static CosmosAsyncContainer getControlContainerFromThroughputGlobalControlConfig(GlobalThroughputControlConfig globalControlConfig) {
return globalControlConfig.getControlContainer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.TracerProvider;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdMetrics;
import com.azure.cosmos.implementation.throughputControl.config.ThroughputControlGroupInternal;
import com.azure.cosmos.models.CosmosDatabaseProperties;
import com.azure.cosmos.models.CosmosDatabaseRequestOptions;
import com.azure.cosmos.models.CosmosDatabaseResponse;
Expand All @@ -23,6 +24,7 @@
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.util.Beta;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.azure.cosmos.util.UtilBridgeInternal;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -471,11 +473,23 @@ TracerProvider getTracerProvider(){
*
* @param group Throughput control group going to be enabled.
*/
void enableThroughputControlGroup(ThroughputControlGroup group) {
void enableThroughputControlGroup(ThroughputControlGroupInternal group) {
checkNotNull(group, "Throughput control group cannot be null");
this.asyncDocumentClient.enableThroughputControlGroup(group);
}

/**
* Create global throughput control config builder which will be used to build {@link GlobalThroughputControlConfig}.
*
* @param databaseId The database id of the control container.
* @param containerId The container id of the control container.
* @return A {@link GlobalThroughputControlConfigBuilder}.
*/
@Beta(value = Beta.SinceVersion.V4_13_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public GlobalThroughputControlConfigBuilder createGlobalThroughputControlConfigBuilder(String databaseId, String containerId) {
return new GlobalThroughputControlConfigBuilder(this, databaseId, containerId);
}

private CosmosPagedFlux<CosmosDatabaseProperties> queryDatabasesInternal(SqlQuerySpec querySpec, CosmosQueryRequestOptions options){
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
pagedFluxOptions.setTracerInformation(this.tracerProvider, "queryDatabases", this.serviceEndpoint, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import com.azure.cosmos.implementation.batch.BatchExecutor;
import com.azure.cosmos.implementation.batch.BulkExecutor;
import com.azure.cosmos.implementation.query.QueryInfo;
import com.azure.cosmos.implementation.throughputControl.ThroughputControlMode;
import com.azure.cosmos.implementation.throughputControl.config.ThroughputControlGroupFactory;
import com.azure.cosmos.implementation.throughputControl.config.GlobalThroughputControlGroup;
import com.azure.cosmos.implementation.throughputControl.config.LocalThroughputControlGroup;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosConflictProperties;
import com.azure.cosmos.models.CosmosContainerProperties;
Expand Down Expand Up @@ -51,7 +53,6 @@
import static com.azure.core.util.FluxUtil.withContext;
import static com.azure.cosmos.implementation.Utils.getEffectiveCosmosChangeFeedRequestOptions;
import static com.azure.cosmos.implementation.Utils.setContinuationTokenAndMaxItemCount;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -1436,65 +1437,35 @@ public Mono<List<FeedRange>> getFeedRanges() {
}

/**
* Enable the throughput control group with local control mode.
*
* @param groupName The throughput control group name.
* @param targetThroughput The target throughput for the control group.
* {@codesnippet com.azure.cosmos.throughputControl.localControl}
*
* @return A {@link ThroughputControlGroup}.
* @param groupConfig A {@link ThroughputControlGroupConfig}.
*/
@Beta(value = Beta.SinceVersion.V4_13_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
ThroughputControlGroup enableThroughputLocalControlGroup(String groupName, int targetThroughput) {
return this.enableThroughputLocalControlGroup(groupName, targetThroughput, false);
public void enableLocalThroughputControlGroup(ThroughputControlGroupConfig groupConfig) {
LocalThroughputControlGroup localControlGroup = ThroughputControlGroupFactory.createThroughputLocalControlGroup(groupConfig, this);
this.database.getClient().enableThroughputControlGroup(localControlGroup);
}

/**
* Enable the throughput control group with global control mode.
* The defined throughput limit will be shared across different clients.
*
* @param groupName The throughput control group name.
* @param targetThroughput The target throughput for the control group.
* @param isDefault Flag to indicate whether this group will be used as default.
* {@codesnippet com.azure.cosmos.throughputControl.globalControl}
*
* @return A {@link ThroughputControlGroup}.
* @param groupConfig The throughput control group configuration, see {@link GlobalThroughputControlGroup}.
* @param globalControlConfig The global throughput control configuration, see {@link GlobalThroughputControlConfig}.
*/
@Beta(value = Beta.SinceVersion.V4_13_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
ThroughputControlGroup enableThroughputLocalControlGroup(String groupName, int targetThroughput, boolean isDefault) {
return this.enableThroughputControlGroup(groupName, targetThroughput, null, ThroughputControlMode.LOCAL, isDefault);
}

/**
*
* @param groupName The throughput control group name.
* @param targetThroughputThreshold The target throughput threshold for the control group.
*
* @return A {@link ThroughputControlGroup}.
*/
@Beta(value = Beta.SinceVersion.V4_13_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
ThroughputControlGroup enableThroughputLocalControlGroup(String groupName, double targetThroughputThreshold) {
return this.enableThroughputLocalControlGroup(groupName, targetThroughputThreshold, false);
}

/**
*
* @param groupName The throughput control group name.
* @param targetThroughputThreshold The target throughput threshold for the control group.
* @param isDefault Flag to indicate whether this group will be used as default.
* @return A {@link ThroughputControlGroup}.
*/
@Beta(value = Beta.SinceVersion.V4_13_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
ThroughputControlGroup enableThroughputLocalControlGroup(String groupName, double targetThroughputThreshold, boolean isDefault) {
return this.enableThroughputControlGroup(groupName, null, targetThroughputThreshold, ThroughputControlMode.LOCAL, isDefault);
}

private ThroughputControlGroup enableThroughputControlGroup(
String groupName,
Integer targetThroughput,
Double targetThroughputThreshold,
ThroughputControlMode controlMode,
boolean isDefault) {
public void enableGlobalThroughputControlGroup(
ThroughputControlGroupConfig groupConfig,
GlobalThroughputControlConfig globalControlConfig) {

ThroughputControlGroup throughputControlGroup = new ThroughputControlGroup(
groupName, this, targetThroughput, targetThroughputThreshold, controlMode, isDefault);
this.database.getClient().enableThroughputControlGroup(throughputControlGroup);
GlobalThroughputControlGroup globalControlGroup =
ThroughputControlGroupFactory.createThroughputGlobalControlGroup(groupConfig, globalControlConfig, this);

return throughputControlGroup;
this.database.getClient().enableThroughputControlGroup(globalControlGroup);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.util.Beta;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.azure.cosmos.util.CosmosPagedIterable;
import com.azure.cosmos.util.UtilBridgeInternal;
Expand Down Expand Up @@ -213,4 +214,16 @@ public void close() {
private <T> CosmosPagedIterable<T> getCosmosPagedIterable(CosmosPagedFlux<T> cosmosPagedFlux) {
return UtilBridgeInternal.createCosmosPagedIterable(cosmosPagedFlux);
}

/**
* Create global throughput control config builder which will be used to build {@link GlobalThroughputControlConfig}.
*
* @param databaseId The database id of the control container.
* @param containerId The container id of the control container.
* @return A {@link GlobalThroughputControlConfigBuilder}.
*/
@Beta(value = Beta.SinceVersion.V4_13_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public GlobalThroughputControlConfigBuilder createGlobalThroughputControlConfigBuilder(String databaseId, String containerId) {
return new GlobalThroughputControlConfigBuilder(this.asyncClientWrapper, databaseId, containerId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.cosmos;

import com.azure.cosmos.implementation.throughputControl.config.GlobalThroughputControlGroup;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosItemIdentity;
import com.azure.cosmos.models.CosmosItemResponse;
Expand Down Expand Up @@ -744,52 +745,28 @@ public List<FeedRange> getFeedRanges() {
}

/**
* Enable the throughput control group with local control mode.
*
* @param groupName The throughput control group name.
* @param targetThroughput The target throughput for the control group.
* {@codesnippet com.azure.cosmos.throughputControl.localControl}
*
* @return A {@link ThroughputControlGroup}.
* @param groupConfig A {@link GlobalThroughputControlConfig}.
*/
@Beta(value = Beta.SinceVersion.V4_13_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
ThroughputControlGroup enableThroughputLocalControlGroup(String groupName, int targetThroughput) {
return this.asyncContainer.enableThroughputLocalControlGroup(groupName, targetThroughput);
public void enableLocalThroughputControlGroup(ThroughputControlGroupConfig groupConfig) {
this.asyncContainer.enableLocalThroughputControlGroup(groupConfig);
}

/**
* Enable the throughput control group with global control mode.
* The defined throughput limit will be shared across different clients.
*
* @param groupName The throughput control group name.
* @param targetThroughput The target throughput for the control group.
* @param isDefault Flag to indicate whether this group will be used as default.
* {@codesnippet com.azure.cosmos.throughputControl.globalControl}
*
* @return A {@link ThroughputControlGroup}.
* @param groupConfig The throughput control group configuration, see {@link GlobalThroughputControlGroup}.
* @param globalControlConfig The global throughput control configuration, see {@link GlobalThroughputControlConfig}.
*/
@Beta(value = Beta.SinceVersion.V4_13_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
ThroughputControlGroup enableThroughputLocalControlGroup(String groupName, int targetThroughput, boolean isDefault) {
return this.asyncContainer.enableThroughputLocalControlGroup(groupName, targetThroughput, isDefault);
}

/**
*
* @param groupName The throughput control group name.
* @param targetThroughputThreshold The target throughput threshold for the control group.
*
* @return A {@link ThroughputControlGroup}.
*/
@Beta(value = Beta.SinceVersion.V4_13_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
ThroughputControlGroup enableThroughputLocalControlGroup(String groupName, double targetThroughputThreshold) {
return this.asyncContainer.enableThroughputLocalControlGroup(groupName, targetThroughputThreshold);
}

/**
*
* @param groupName The throughput control group name.
* @param targetThroughputThreshold The target throughput threshold for the control group.
* @param isDefault Flag to indicate whether this group will be used as default.
*
* @return A {@link ThroughputControlGroup}.
*/
@Beta(value = Beta.SinceVersion.V4_13_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
ThroughputControlGroup enableThroughputLocalControlGroup(String groupName, double targetThroughputThreshold, boolean isDefault) {
return this.asyncContainer.enableThroughputLocalControlGroup(groupName, targetThroughputThreshold, isDefault);
public void enableGlobalThroughputControlGroup(ThroughputControlGroupConfig groupConfig, GlobalThroughputControlConfig globalControlConfig) {
this.asyncContainer.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos;

import com.azure.cosmos.util.Beta;

import java.time.Duration;

/**
* This configuration is used for throughput global control mode.
* It contains configuration about the extra container which will track all the clients throughput usage for a certain control group.
*/
@Beta(value = Beta.SinceVersion.V4_13_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public class GlobalThroughputControlConfig {
private final CosmosAsyncContainer controlContainer;
private final Duration controlItemRenewInterval;
private final Duration controlItemExpireInterval;

GlobalThroughputControlConfig(
CosmosAsyncContainer controlContainer,
Duration controlItemRenewInterval,
Duration controlItemExpireInterval) {

this.controlContainer = controlContainer;
this.controlItemRenewInterval = controlItemRenewInterval;
this.controlItemExpireInterval = controlItemExpireInterval;
}

/**
* Get the control container.
* This is the container to track all other clients throughput usage.
*
* @return The {@link CosmosAsyncContainer}.
*/
CosmosAsyncContainer getControlContainer() {
return controlContainer;
}

/**
* Get the control item renew interval.
*
* This controls how often the client is going to update the throughput usage of itself
* and adjust its own throughput share based on the throughput usage of other clients.
*
* In short words, it controls how quickly the shared throughput will reload balanced across different clients.
*
* By default, it is 5s.
*
* @return The control item renew interval.
*/
@Beta(value = Beta.SinceVersion.V4_13_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public Duration getControlItemRenewInterval() {
return this.controlItemRenewInterval;
}

/**
* Get the control item expire interval.
*
* A client may be offline due to various reasons (being shutdown, network issue... ).
* This controls how quickly we will detect the client has been offline and hence allow its throughput share to be taken by other clients.
*
* By default, it is 11s.
*
* @return The control item renew interval.
*/
@Beta(value = Beta.SinceVersion.V4_13_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public Duration getControlItemExpireInterval() {
return this.controlItemExpireInterval;
}
}
Loading

0 comments on commit 39a7a66

Please sign in to comment.