Skip to content

Commit

Permalink
Cosmos Spark: Added option to override the MicroBatchPayloadSize in b…
Browse files Browse the repository at this point in the history
…ytes (Azure#35379)

* Added option to override the MicroBatchPayloadSize in bytes in Cosmos DB Spark connector

Added option to override the MicroBatchPayloadSize in bytes from Spark connector to accommodate better efficiency when documents are large > 110KB

* Update BulkWriter.scala

* Update BulkWriter.scala

* Fixing test build breaks

* Added changelog
  • Loading branch information
FabianMeiswinkel authored Jun 9, 2023
1 parent ab854d8 commit 52fd556
Show file tree
Hide file tree
Showing 14 changed files with 120 additions and 21 deletions.
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

#### Features Added
* Added support for priority based throttling - See [PR 35238](https://github.com/Azure/azure-sdk-for-java/pull/35238)

* Added new configuration parameter `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` to allow increasing the micro batch payload size for better efficiency when documents are often above 110 KB. - See [PR 35379](https://github.com/Azure/azure-sdk-for-java/pull/35379)
*
#### Breaking Changes

#### Bugs Fixed
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#### Features Added
* Added support for priority based throttling - See [PR 35238](https://github.com/Azure/azure-sdk-for-java/pull/35238)
* Added new configuration parameter `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` to allow increasing the micro batch payload size for better efficiency when documents are often above 110 KB. - See [PR 35379](https://github.com/Azure/azure-sdk-for-java/pull/35379)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#### Features Added
* Added support for priority based throttling - See [PR 35238](https://github.com/Azure/azure-sdk-for-java/pull/35238)
* Added new configuration parameter `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` to allow increasing the micro batch payload size for better efficiency when documents are often above 110 KB. - See [PR 35379](https://github.com/Azure/azure-sdk-for-java/pull/35379)

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
| `spark.cosmos.write.point.maxConcurrency` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size |
| `spark.cosmos.write.bulk.maxPendingOperations` | None | Cosmos DB Item Write bulk mode maximum pending operations. Defines a limit of bulk operations being processed concurrently. If not specified it will be determined based on the Spark executor VM Size. If the volume of data is large for the provisioned throughput on the destination container, this setting can be adjusted by following the estimation of `1000 x Cores` |
| `spark.cosmos.write.bulk.enabled` | `true` | Cosmos DB Item Write bulk enabled |
| `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` | `220201` | When the targeted payload size is reached for buffered documents, the request is sent to the backend. The default value is optimized for small documents <= 10 KB - when documents often exceed 110 KB, it can help to increase this value to up to about `1500000` (should still be smaller than 2 MB). |

#### Patch Config
| Config Property Name | Default | Description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class BulkWriter(container: CosmosAsyncContainer,
private val closed = new AtomicBoolean(false)
private val lock = new ReentrantLock
private val pendingTasksCompleted = lock.newCondition
private val pendingRetries = new AtomicLong(0);
private val activeTasks = new AtomicInteger(0)
private val errorCaptureFirstException = new AtomicReference[Throwable]()
private val bulkInputEmitter: Sinks.Many[models.CosmosItemOperation] = Sinks.many().unicast().onBackpressureBuffer()
Expand Down Expand Up @@ -351,12 +352,15 @@ class BulkWriter(container: CosmosAsyncContainer,
s"attemptNumber=${context.attemptNumber}, exceptionMessage=${exceptionMessage}, " +
s"Context: {${operationContext.toString}} ${getThreadInfo}")

this.pendingRetries.incrementAndGet();

// this is to ensure the submission will happen on a different thread in background
// and doesn't block the active thread
val deferredRetryMono = SMono.defer(() => {
scheduleWriteInternal(itemOperation.getPartitionKeyValue,
itemOperation.getItem.asInstanceOf[ObjectNode],
OperationContext(context.itemId, context.partitionKeyValue, context.eTag, context.attemptNumber + 1))
this.pendingRetries.decrementAndGet()
SMono.empty
})

Expand Down Expand Up @@ -487,10 +491,13 @@ class BulkWriter(container: CosmosAsyncContainer,
try {
val numberOfIntervalsWithIdenticalActiveOperationSnapshots = new AtomicLong(0)
var activeTasksSnapshot = activeTasks.get()
while (activeTasksSnapshot > 0 && errorCaptureFirstException.get == null) {
var pendingRetriesSnapshot = pendingRetries.get()
while ((pendingRetriesSnapshot > 0 || activeTasksSnapshot > 0)
&& errorCaptureFirstException.get == null) {

log.logInfo(
s"Waiting for pending activeTasks $activeTasksSnapshot, " +
s"Context: ${operationContext.toString} ${getThreadInfo}")
s"Waiting for pending activeTasks $activeTasksSnapshot and/or pendingRetries " +
s"$pendingRetriesSnapshot, Context: ${operationContext.toString} ${getThreadInfo}")
val activeOperationsSnapshot = activeOperations.clone()
val awaitCompleted = pendingTasksCompleted.await(1, TimeUnit.MINUTES)
if (!awaitCompleted) {
Expand All @@ -501,20 +508,21 @@ class BulkWriter(container: CosmosAsyncContainer,
)
}
activeTasksSnapshot = activeTasks.get()
pendingRetriesSnapshot = pendingRetries.get()
val semaphoreAvailablePermitsSnapshot = semaphore.availablePermits()

if (awaitCompleted) {
log.logInfo(s"Waiting completed for pending activeTasks $activeTasksSnapshot, " +
s"Context: ${operationContext.toString} ${getThreadInfo}")
log.logInfo(s"Waiting completed for pending activeTasks $activeTasksSnapshot, pendingRetries " +
s"$pendingRetriesSnapshot Context: ${operationContext.toString} ${getThreadInfo}")
} else {
log.logInfo(s"Waiting interrupted for pending activeTasks $activeTasksSnapshot - " +
s"available permits ${semaphoreAvailablePermitsSnapshot}, " +
log.logInfo(s"Waiting interrupted for pending activeTasks $activeTasksSnapshot , pendingRetries " +
s"$pendingRetriesSnapshot - available permits ${semaphoreAvailablePermitsSnapshot}, " +
s"Context: ${operationContext.toString} ${getThreadInfo}")
}
}

log.logInfo(s"Waiting completed for pending activeTasks $activeTasksSnapshot, " +
s"Context: ${operationContext.toString} ${getThreadInfo}")
log.logInfo(s"Waiting completed for pending activeTasks $activeTasksSnapshot, pendingRetries " +
s"$pendingRetriesSnapshot Context: ${operationContext.toString} ${getThreadInfo}")
} finally {
lock.unlock()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.cosmos.spark

import com.azure.core.management.AzureEnvironment
import com.azure.cosmos.implementation.batch.BatchRequestResponseConstants
import com.azure.cosmos.implementation.routing.LocationHelper
import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, Strings}
import com.azure.cosmos.models.{CosmosChangeFeedRequestOptions, CosmosParameterizedQuery, DedicatedGatewayRequestOptions, FeedRange}
Expand Down Expand Up @@ -80,6 +81,7 @@ private[spark] object CosmosConfigNames {
val WriteBulkEnabled = "spark.cosmos.write.bulk.enabled"
val WriteBulkMaxPendingOperations = "spark.cosmos.write.bulk.maxPendingOperations"
val WriteBulkMaxConcurrentPartitions = "spark.cosmos.write.bulk.maxConcurrentCosmosPartitions"
val WriteBulkPayloadSizeInBytes = "spark.cosmos.write.bulk.targetedPayloadSizeInBytes"
val WritePointMaxConcurrency = "spark.cosmos.write.point.maxConcurrency"
val WritePatchDefaultOperationType = "spark.cosmos.write.patch.defaultOperationType"
val WritePatchColumnConfigs = "spark.cosmos.write.patch.columnConfigs"
Expand Down Expand Up @@ -163,6 +165,7 @@ private[spark] object CosmosConfigNames {
WriteBulkEnabled,
WriteBulkMaxPendingOperations,
WriteBulkMaxConcurrentPartitions,
WriteBulkPayloadSizeInBytes,
WritePointMaxConcurrency,
WritePatchDefaultOperationType,
WritePatchColumnConfigs,
Expand Down Expand Up @@ -812,7 +815,8 @@ private case class CosmosWriteConfig(itemWriteStrategy: ItemWriteStrategy,
pointMaxConcurrency: Option[Int] = None,
maxConcurrentCosmosPartitions: Option[Int] = None,
patchConfigs: Option[CosmosPatchConfigs] = None,
throughputControlConfig: Option[CosmosThroughputControlConfig] = None)
throughputControlConfig: Option[CosmosThroughputControlConfig] = None,
maxMicroBatchPayloadSizeInBytes: Option[Int] = None)

private object CosmosWriteConfig {
private val DefaultMaxRetryCount = 10
Expand All @@ -824,6 +828,15 @@ private object CosmosWriteConfig {
parseFromStringFunction = bulkEnabledAsString => bulkEnabledAsString.toBoolean,
helpMessage = "Cosmos DB Item Write bulk enabled")

private val microBatchPayloadSizeInBytes = CosmosConfigEntry[Int](key = CosmosConfigNames.WriteBulkPayloadSizeInBytes,
defaultValue = Option.apply(BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES),
mandatory = false,
parseFromStringFunction = payloadSizeInBytesString => payloadSizeInBytesString.toInt,
helpMessage = "Cosmos DB target bulk micro batch size in bytes - a micro batch will be flushed to the backend " +
"when its payload size exceeds this value. For best efficiency its value should be low enough to leave enough " +
"room for one document - to avoid that the request size exceeds the Cosmos DB maximum of 2 MB too often " +
"which would result in retries and having to transmit large network payloads multiple times.")

private val bulkMaxPendingOperations = CosmosConfigEntry[Int](key = CosmosConfigNames.WriteBulkMaxPendingOperations,
mandatory = false,
parseFromStringFunction = bulkMaxConcurrencyAsString => bulkMaxConcurrencyAsString.toInt,
Expand Down Expand Up @@ -963,6 +976,7 @@ private object CosmosWriteConfig {
val bulkEnabledOpt = CosmosConfigEntry.parse(cfg, bulkEnabled)
var patchConfigsOpt = Option.empty[CosmosPatchConfigs]
val throughputControlConfigOpt = CosmosThroughputControlConfig.parseThroughputControlConfig(cfg)
val microBatchPayloadSizeInBytesOpt = CosmosConfigEntry.parse(cfg, microBatchPayloadSizeInBytes)

assert(bulkEnabledOpt.isDefined)

Expand All @@ -987,7 +1001,8 @@ private object CosmosWriteConfig {
pointMaxConcurrency = CosmosConfigEntry.parse(cfg, pointWriteConcurrency),
maxConcurrentCosmosPartitions = CosmosConfigEntry.parse(cfg, bulkMaxConcurrentPartitions),
patchConfigs = patchConfigsOpt,
throughputControlConfig = throughputControlConfigOpt)
throughputControlConfig = throughputControlConfigOpt,
maxMicroBatchPayloadSizeInBytes = microBatchPayloadSizeInBytesOpt)
}

def parsePatchColumnConfigs(cfg: Map[String, String], inputSchema: StructType): TrieMap[String, CosmosPatchColumnConfig] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.azure.cosmos.spark

import com.azure.core.management.AzureEnvironment
import com.azure.cosmos.implementation.batch.BatchRequestResponseConstants
import com.azure.cosmos.spark.CosmosPatchOperationTypes.Increment
import com.azure.cosmos.spark.utils.CosmosPatchTestHelper
import org.apache.spark.sql.types.{NumericType, StructType}
Expand Down Expand Up @@ -722,6 +723,31 @@ class CosmosConfigSpec extends UnitSpec {
)
}

"Customizing MaxBulKPayloadSizeInBytes" should "be possible" in {
val schema = CosmosPatchTestHelper.getPatchConfigTestSchema()
var userConfig = Map(
"spark.cosmos.write.strategy" -> "ItemOverwrite",
"spark.cosmos.write.bulk.enabled" -> "True",
)
var writeConfig: CosmosWriteConfig = CosmosWriteConfig.parseWriteConfig(userConfig, schema)
writeConfig should not be null
writeConfig.maxMicroBatchPayloadSizeInBytes should not be null
writeConfig.maxMicroBatchPayloadSizeInBytes.isDefined shouldEqual true
writeConfig.maxMicroBatchPayloadSizeInBytes.get shouldEqual BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES

userConfig = Map(
"spark.cosmos.write.strategy" -> "ItemOverwrite",
"spark.cosmos.write.bulk.enabled" -> "True",
"spark.cosmos.write.bulk.targetedPayloadSizeInBytes" -> "1000000",
)

writeConfig = CosmosWriteConfig.parseWriteConfig(userConfig, schema)
writeConfig should not be null
writeConfig.maxMicroBatchPayloadSizeInBytes should not be null
writeConfig.maxMicroBatchPayloadSizeInBytes.isDefined shouldEqual true
writeConfig.maxMicroBatchPayloadSizeInBytes.get shouldEqual 1000000
}

"Config Parser" should "validate default operation types for patch configs" in {
val schema = CosmosPatchTestHelper.getPatchConfigTestSchema()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.List;
import java.util.UUID;

import static com.azure.cosmos.implementation.batch.BatchRequestResponseConstants.MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES;
import static com.azure.cosmos.implementation.batch.BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES;
import static com.azure.cosmos.implementation.batch.BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -331,7 +331,7 @@ public void batchWithTooManyOperationsTest() {
@Test(groups = {"simple"}, timeOut = TIMEOUT * 10)
public void batchLargerThanServerRequest() {
int operationCount = 20;
int appxDocSize = (MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES * 11) / operationCount;
int appxDocSize = (DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES * 11) / operationCount;

// Increase the doc size by a bit so all docs won't fit in one server request.
appxDocSize = (int)(appxDocSize * 1.05);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void validateAllSetValuesInCosmosBulkItemResponse() {
ServerOperationBatchRequest serverOperationBatchRequest = PartitionKeyRangeServerBatchRequest.createBatchRequest(
PartitionKey.NONE.toString(),
Arrays.asList(arrayOperations),
BatchRequestResponseConstants.MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES,
BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES,
BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST);

// Create dummy result
Expand Down Expand Up @@ -135,7 +135,7 @@ public void validateEmptyHeaderInCosmosBulkItemResponse() {
ServerOperationBatchRequest serverOperationBatchRequest = PartitionKeyRangeServerBatchRequest.createBatchRequest(
PartitionKey.NONE.toString(),
Arrays.asList(arrayOperations),
BatchRequestResponseConstants.MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES,
BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES,
BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,10 @@ CosmosBulkExecutionOptions setTargetedMicroBatchRetryRate(

CosmosBulkExecutionOptions setMaxMicroBatchSize(CosmosBulkExecutionOptions options, int maxMicroBatchSize);

int getMaxMicroBatchPayloadSizeInBytes(CosmosBulkExecutionOptions options);

CosmosBulkExecutionOptions setMaxMicroBatchPayloadSizeInBytes(CosmosBulkExecutionOptions options, int maxMicroBatchPayloadSizeInBytes);

int getMaxMicroBatchConcurrency(CosmosBulkExecutionOptions options);

Integer getMaxConcurrentCosmosPartitions(CosmosBulkExecutionOptions options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
public final class BatchRequestResponseConstants {

// Size limits:
public static final int MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES = 220201;
public static final int DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES = 220201;
public static final int MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST = 100;

public static final int DEFAULT_MAX_MICRO_BATCH_INTERVAL_IN_MILLISECONDS = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public final class BulkExecutor<TContext> implements Disposable {
ImplementationBridgeHelpers.CosmosAsyncClientHelper.getCosmosAsyncClientAccessor();

private final CosmosAsyncContainer container;
private final int maxMicroBatchPayloadSizeInBytes;
private final AsyncDocumentClient docClientWrapper;
private final String operationContextText;
private final OperationContextAndListenerTuple operationListener;
Expand All @@ -94,6 +95,7 @@ public final class BulkExecutor<TContext> implements Disposable {

// Options for bulk execution.
private final Long maxMicroBatchIntervalInMs;

private final TContext batchContext;
private final ConcurrentMap<String, PartitionScopeThresholds> partitionScopeThresholds;
private final CosmosBulkExecutionOptions cosmosBulkExecutionOptions;
Expand All @@ -120,6 +122,9 @@ public BulkExecutor(CosmosAsyncContainer container,
checkNotNull(inputOperations, "expected non-null inputOperations");
checkNotNull(cosmosBulkOptions, "expected non-null bulkOptions");

this.maxMicroBatchPayloadSizeInBytes = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.getMaxMicroBatchPayloadSizeInBytes(cosmosBulkOptions);
this.cosmosBulkExecutionOptions = cosmosBulkOptions;
this.container = container;
this.bulkSpanName = "nonTransactionalBatch." + this.container.getId();
Expand Down Expand Up @@ -489,7 +494,7 @@ private Flux<CosmosBulkOperationResponse<TContext>> executePartitionedGroup(

if (batchSize >= thresholds.getTargetMicroBatchSizeSnapshot() ||
age >= this.maxMicroBatchIntervalInMs ||
totalSerializedLength >= BatchRequestResponseConstants.MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES) {
totalSerializedLength >= this.maxMicroBatchPayloadSizeInBytes) {

logger.debug(
"BufferUntil - Flushing PKRange {} due to BatchSize ({}), payload size ({}) or age ({}), " +
Expand Down Expand Up @@ -558,7 +563,7 @@ private Flux<CosmosBulkOperationResponse<TContext>> executeOperations(

String pkRange = thresholds.getPartitionKeyRangeId();
ServerOperationBatchRequest serverOperationBatchRequest =
BulkExecutorUtil.createBatchRequest(operations, pkRange);
BulkExecutorUtil.createBatchRequest(operations, pkRange, this.maxMicroBatchPayloadSizeInBytes);
if (serverOperationBatchRequest.getBatchPendingOperations().size() > 0) {
serverOperationBatchRequest.getBatchPendingOperations().forEach(groupSink::next);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@

final class BulkExecutorUtil {

static ServerOperationBatchRequest createBatchRequest(List<CosmosItemOperation> operations, String partitionKeyRangeId) {
static ServerOperationBatchRequest createBatchRequest(List<CosmosItemOperation> operations, String partitionKeyRangeId, int maxMicroBatchPayloadSizeInBytes) {

return PartitionKeyRangeServerBatchRequest.createBatchRequest(
partitionKeyRangeId,
operations,
BatchRequestResponseConstants.MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES,
maxMicroBatchPayloadSizeInBytes,
BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST);
}

Expand Down
Loading

0 comments on commit 52fd556

Please sign in to comment.