Skip to content

Commit

Permalink
Azure-cosmos-Spark - allow configuration of initial batch size (Azure…
Browse files Browse the repository at this point in the history
…#36068)

* Azure-cosmos-Spark - allow configuration of initial batch size

* Making test names unique in Bulk e2e tests

* Adding InitialMicroBatchSize in addition to MaxMicroBatchSize in options

* Removinge MaxMicroBatchSize config and replacing with constant instead

* Update DataLoader.java

* Update PartitionScopeThresholdsTest.java

* Update CosmosBulkExecutionOptions.java
  • Loading branch information
FabianMeiswinkel authored Jul 25, 2023
1 parent fa2a830 commit d255c1e
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,7 @@ private void bulkCreateItems(final CosmosAsyncDatabase database,
container.getId());

// We want to wait longer depending on the number of documents in each iteration
final CosmosBulkExecutionOptions cosmosBulkExecutionOptions =
ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.setMaxMicroBatchSize(new CosmosBulkExecutionOptions(), MAX_BATCH_SIZE);
final CosmosBulkExecutionOptions cosmosBulkExecutionOptions =new CosmosBulkExecutionOptions();
container.executeBulkOperations(Flux.fromIterable(cosmosItemOperations), cosmosBulkExecutionOptions)
.blockLast(BATCH_DATA_LOAD_WAIT_DURATION);

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.21.0-beta.1 (Unreleased)

#### Features Added
* Added a new configuration setting `spark.cosmos.write.bulk.initialBatchSize` to allow specifying the initial micro batch size for bulk operations. The batch size will be tuned automatically based on the throttling rate afterwards - by default it starts initially with 100 documents per batch. This can lead to exceeding the requested throughput when using throughput control in the first few seconds of a Spark job. This usually isn't a problem - but if there is the desire to avoid this, reducing the initial micro batch size - for example setting it to `1` - would avoid the initial spike in RU/s usage. - See [PR 36068](https://github.com/Azure/azure-sdk-for-java/pull/36068)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.21.0-beta.1 (Unreleased)

#### Features Added
* Added a new configuration setting `spark.cosmos.write.bulk.initialBatchSize` to allow specifying the initial micro batch size for bulk operations. The batch size will be tuned automatically based on the throttling rate afterwards - by default it starts initially with 100 documents per batch. This can lead to exceeding the requested throughput when using throughput control in the first few seconds of a Spark job. This usually isn't a problem - but if there is the desire to avoid this, reducing the initial micro batch size - for example setting it to `1` - would avoid the initial spike in RU/s usage. - See [PR 36068](https://github.com/Azure/azure-sdk-for-java/pull/36068)

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,28 @@ class BulkWriter(container: CosmosAsyncContainer,
)
ThroughputControlHelper.populateThroughputControlGroupName(cosmosBulkExecutionOptions, writeConfig.throughputControlConfig)

writeConfig.maxMicroBatchPayloadSizeInBytes match {
case Some(customMaxMicroBatchPayloadSizeInBytes) =>
ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor
.setMaxMicroBatchPayloadSizeInBytes(
cosmosBulkExecutionOptions,
customMaxMicroBatchPayloadSizeInBytes
)
case None =>
}

writeConfig.initialMicroBatchSize match {
case Some(customInitialMicroBatchSize) =>
ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor
.setInitialMicroBatchSize(
cosmosBulkExecutionOptions,
customInitialMicroBatchSize
)
case None =>
}

private val operationContext = initializeOperationContext()
private val cosmosPatchHelperOpt = writeConfig.itemWriteStrategy match {
case ItemWriteStrategy.ItemPatch => Some(new CosmosPatchHelper(diagnosticsConfig, writeConfig.patchConfigs.get))
Expand Down Expand Up @@ -143,7 +165,7 @@ class BulkWriter(container: CosmosAsyncContainer,

bulkOperationResponseFlux.subscribe(
resp => {
var isGettingRetried = new AtomicBoolean(false)
val isGettingRetried = new AtomicBoolean(false)
try {
val itemOperation = resp.getOperation
val itemOperationFound = activeOperations.remove(itemOperation)
Expand Down Expand Up @@ -206,7 +228,7 @@ class BulkWriter(container: CosmosAsyncContainer,
var acquisitionAttempt = 0
val activeOperationsSemaphoreTimeout = 10
val operationContext = OperationContext(getId(objectNode), partitionKeyValue, getETag(objectNode), 1)
var numberOfIntervalsWithIdenticalActiveOperationSnapshots = new AtomicLong(0)
val numberOfIntervalsWithIdenticalActiveOperationSnapshots = new AtomicLong(0)
// Don't clone the activeOperations for the first iteration
// to reduce perf impact before the Semaphore has been acquired
// this means if the semaphore can't be acquired within 10 minutes
Expand All @@ -219,7 +241,7 @@ class BulkWriter(container: CosmosAsyncContainer,
log.logDebug(s"Not able to acquire semaphore, Context: ${operationContext.toString} ${getThreadInfo}")
if (subscriptionDisposable.isDisposed) {
captureIfFirstFailure(
new IllegalStateException("Can't accept any new work - BulkWriter has been disposed already"));
new IllegalStateException("Can't accept any new work - BulkWriter has been disposed already"))
}

throwIfProgressStaled(
Expand Down Expand Up @@ -296,7 +318,7 @@ class BulkWriter(container: CosmosAsyncContainer,

val cosmosPatchOperations = cosmosPatchHelper.createCosmosPatchOperations(itemId, partitionKeyDefinition, objectNode)

val requestOptions = new CosmosBulkPatchItemRequestOptions();
val requestOptions = new CosmosBulkPatchItemRequestOptions()
if (patchConfigs.filter.isDefined && !StringUtils.isEmpty(patchConfigs.filter.get)) {
requestOptions.setFilterPredicate(patchConfigs.filter.get)
}
Expand Down Expand Up @@ -352,7 +374,7 @@ class BulkWriter(container: CosmosAsyncContainer,
s"attemptNumber=${context.attemptNumber}, exceptionMessage=${exceptionMessage}, " +
s"Context: {${operationContext.toString}} ${getThreadInfo}")

this.pendingRetries.incrementAndGet();
this.pendingRetries.incrementAndGet()

// this is to ensure the submission will happen on a different thread in background
// and doesn't block the active thread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ private[spark] object CosmosConfigNames {
val WriteBulkMaxPendingOperations = "spark.cosmos.write.bulk.maxPendingOperations"
val WriteBulkMaxConcurrentPartitions = "spark.cosmos.write.bulk.maxConcurrentCosmosPartitions"
val WriteBulkPayloadSizeInBytes = "spark.cosmos.write.bulk.targetedPayloadSizeInBytes"
val WriteBulkInitialBatchSize = "spark.cosmos.write.bulk.initialBatchSize"
val WritePointMaxConcurrency = "spark.cosmos.write.point.maxConcurrency"
val WritePatchDefaultOperationType = "spark.cosmos.write.patch.defaultOperationType"
val WritePatchColumnConfigs = "spark.cosmos.write.patch.columnConfigs"
Expand Down Expand Up @@ -166,6 +167,7 @@ private[spark] object CosmosConfigNames {
WriteBulkMaxPendingOperations,
WriteBulkMaxConcurrentPartitions,
WriteBulkPayloadSizeInBytes,
WriteBulkInitialBatchSize,
WritePointMaxConcurrency,
WritePatchDefaultOperationType,
WritePatchColumnConfigs,
Expand Down Expand Up @@ -818,7 +820,8 @@ private case class CosmosWriteConfig(itemWriteStrategy: ItemWriteStrategy,
maxConcurrentCosmosPartitions: Option[Int] = None,
patchConfigs: Option[CosmosPatchConfigs] = None,
throughputControlConfig: Option[CosmosThroughputControlConfig] = None,
maxMicroBatchPayloadSizeInBytes: Option[Int] = None)
maxMicroBatchPayloadSizeInBytes: Option[Int] = None,
initialMicroBatchSize: Option[Int] = None)

private object CosmosWriteConfig {
private val DefaultMaxRetryCount = 10
Expand All @@ -839,6 +842,16 @@ private object CosmosWriteConfig {
"room for one document - to avoid that the request size exceeds the Cosmos DB maximum of 2 MB too often " +
"which would result in retries and having to transmit large network payloads multiple times.")

private val initialMicroBatchSize = CosmosConfigEntry[Int](key = CosmosConfigNames.WriteBulkInitialBatchSize,
defaultValue = Option.apply(BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST),
mandatory = false,
parseFromStringFunction = initialBatchSizeString => initialBatchSizeString.toInt,
helpMessage = "Cosmos DB initial bulk micro batch size - a micro batch will be flushed to the backend " +
"when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch " +
"size is getting automatically tuned based on the throttling rate. By default the " +
"initial micro batch size is 100. Reduce this when you want to avoid that the first few requests consume " +
"too many RUs.")

private val bulkMaxPendingOperations = CosmosConfigEntry[Int](key = CosmosConfigNames.WriteBulkMaxPendingOperations,
mandatory = false,
parseFromStringFunction = bulkMaxConcurrencyAsString => bulkMaxConcurrencyAsString.toInt,
Expand Down Expand Up @@ -979,6 +992,7 @@ private object CosmosWriteConfig {
var patchConfigsOpt = Option.empty[CosmosPatchConfigs]
val throughputControlConfigOpt = CosmosThroughputControlConfig.parseThroughputControlConfig(cfg)
val microBatchPayloadSizeInBytesOpt = CosmosConfigEntry.parse(cfg, microBatchPayloadSizeInBytes)
val initialBatchSizeOpt = CosmosConfigEntry.parse(cfg, initialMicroBatchSize)

assert(bulkEnabledOpt.isDefined)

Expand All @@ -1004,7 +1018,8 @@ private object CosmosWriteConfig {
maxConcurrentCosmosPartitions = CosmosConfigEntry.parse(cfg, bulkMaxConcurrentPartitions),
patchConfigs = patchConfigsOpt,
throughputControlConfig = throughputControlConfigOpt,
maxMicroBatchPayloadSizeInBytes = microBatchPayloadSizeInBytesOpt)
maxMicroBatchPayloadSizeInBytes = microBatchPayloadSizeInBytesOpt,
initialMicroBatchSize = initialBatchSizeOpt)
}

def parsePatchColumnConfigs(cfg: Map[String, String], inputSchema: StructType): TrieMap[String, CosmosPatchColumnConfig] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,42 @@ class SparkE2EWriteITest
//scalastyle:off magic.number
//scalastyle:off null

private case class UpsertParameterTest(bulkEnabled: Boolean, itemWriteStrategy: ItemWriteStrategy, hasId: Boolean = true)
private case class UpsertParameterTest(bulkEnabled: Boolean, itemWriteStrategy: ItemWriteStrategy, hasId: Boolean = true, initialBatchSize: Option[Int] = None)

private val upsertParameterTest = Seq(
UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite),

UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite),
UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemAppend)
UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None),
UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = Some(1)),
UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None),
UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemAppend, initialBatchSize = None)
)

for (UpsertParameterTest(bulkEnabled, itemWriteStrategy, hasId) <- upsertParameterTest) {
it should s"support upserts with bulkEnabled = $bulkEnabled itemWriteStrategy = $itemWriteStrategy hasId = $hasId" in {
for (UpsertParameterTest(bulkEnabled, itemWriteStrategy, hasId, initialBatchSize) <- upsertParameterTest) {
it should s"support upserts with bulkEnabled = $bulkEnabled itemWriteStrategy = $itemWriteStrategy hasId = $hasId initialBatchSize = $initialBatchSize" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY

val cfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.serialization.inclusionMode" -> "NonDefault"
)
val cfg = {

initialBatchSize match {
case Some(customInitialBatchSize) =>
Map(
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.serialization.inclusionMode" -> "NonDefault",
"spark.cosmos.write.bulk.initialBatchSize" -> customInitialBatchSize.toString,
)
case None =>
Map (
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.serialization.inclusionMode" -> "NonDefault"
)
}
}

val cfgOverwrite = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ public class PartitionScopeThresholdsTest {
@Test(groups = { "unit" })
public void neverThrottledShouldResultInMaxBatchSize() {
String pkRangeId = UUID.randomUUID().toString();
int maxBatchSize = 1_000;
int maxBatchSize = 100;
PartitionScopeThresholds thresholds =
new PartitionScopeThresholds(
pkRangeId,
ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.setMaxMicroBatchSize(new CosmosBulkExecutionOptions(), maxBatchSize));
new CosmosBulkExecutionOptions());

assertThat(thresholds.getTargetMicroBatchSizeSnapshot())
.isEqualTo(maxBatchSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,9 @@ CosmosBulkExecutionOptions setTargetedMicroBatchRetryRate(
double minRetryRate,
double maxRetryRate);

int getMaxMicroBatchSize(CosmosBulkExecutionOptions options);
int getInitialMicroBatchSize(CosmosBulkExecutionOptions options);

CosmosBulkExecutionOptions setMaxMicroBatchSize(CosmosBulkExecutionOptions options, int maxMicroBatchSize);
CosmosBulkExecutionOptions setInitialMicroBatchSize(CosmosBulkExecutionOptions options, int initialMicroBatchSize);

int getMaxMicroBatchPayloadSizeInBytes(CosmosBulkExecutionOptions options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public PartitionScopeThresholds(String pkRangeId, CosmosBulkExecutionOptions opt
this.targetMicroBatchSize = new AtomicInteger(
ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.getMaxMicroBatchSize(options));
.getInitialMicroBatchSize(options));
this.totalOperationCount = new AtomicLong(0);
this.currentThresholds = new AtomicReference<>(new CurrentIntervalThresholds());

Expand Down Expand Up @@ -106,16 +106,12 @@ private void reevaluateThresholds(
int microBatchSizeBefore = this.targetMicroBatchSize.get();
int microBatchSizeAfter = microBatchSizeBefore;

int maxMicroBatchSize = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.getMaxMicroBatchSize(options);

if (retryRate < this.minRetryRate && microBatchSizeBefore < maxMicroBatchSize) {
if (retryRate < this.minRetryRate && microBatchSizeBefore < BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST) {
int targetedNewBatchSize = Math.min(
Math.min(
microBatchSizeBefore * 2,
microBatchSizeBefore + (int)(maxMicroBatchSize * this.avgRetryRate)),
maxMicroBatchSize);
microBatchSizeBefore + (int)(BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST * this.avgRetryRate)),
BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST);
if (this.targetMicroBatchSize.compareAndSet(microBatchSizeBefore, targetedNewBatchSize)) {
microBatchSizeAfter = targetedNewBatchSize;
}
Expand Down
Loading

0 comments on commit d255c1e

Please sign in to comment.