Skip to content

Commit

Permalink
Allow overriding MaxItemCount for query/changefeed in Spark connector (
Browse files Browse the repository at this point in the history
…Azure#23466)

* Increasing default MaxItemCount for queires/changefeed in the Spark Connector

* Fixed comment

* Update BulkWriter.scala
  • Loading branch information
FabianMeiswinkel authored Aug 11, 2021
1 parent ec9f217 commit ff7be27
Show file tree
Hide file tree
Showing 15 changed files with 65 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private <T> CosmosItemResponse<T> blockItemResponse(Mono<CosmosItemResponse<T>>
}

private <T> CosmosPagedIterable<T> getCosmosPagedIterable(CosmosPagedFlux<T> cosmosPagedFlux) {
return UtilBridgeInternal.createCosmosPagedIterable(cosmosPagedFlux);
return new CosmosPagedIterable<>(cosmosPagedFlux);
}

private CosmosItemResponse<Object> blockDeleteItemResponse(Mono<CosmosItemResponse<Object>> deleteItemMono) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Configuration Reference:
| :--- | :---- | :--- |
| `spark.cosmos.useGatewayMode` | `false` | Use gateway mode for the client operations |
| `spark.cosmos.read.forceEventualConsistency` | `true` | Makes the client use Eventual consistency for read operations instead of using the default account level consistency |
| `spark.cosmos.read.maxItemCount` | `1000` | Overrides the maximum number of documents that can be returned for a single query- or change feed request. The default value is `1000` - consider increasing this only for average document sizes significantly smaller than 1KB. |
| `spark.cosmos.applicationName` | None | Application name |
| `spark.cosmos.preferredRegionsList` | None | Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., `[East US, West US]` or `East US, West US`) provided preferred regions will be used as hint. You should use a collocated spark cluster with your Cosmos DB account and pass the spark cluster region as preferred region. See list of azure regions [here](https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.locationnames?view=azure-dotnet&preserve-view=true). Please note that you can also use `spark.cosmos.preferredRegions` as alias |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,11 @@ private object BulkWriter {
// UPDATE - reverting back to 15 minutes - causing an unreasonably large delay/hang
// due to a backend issue doesn't sound right for most customers (helpful during my own
// long stress runs - but for customers 15 minutes is more reasonable)
val maxAllowedMinutesWithoutAnyProgress = 15
// UPDATE - TODO @fabianm - with 15 minutes the end-to-end sample fails too often - because the extensive 429/3088
// intervals are around 2 hours. So I need to increase this threshold for now again - will move it
// to 45 minutes - and when I am back from vacation will drive an investigation to improve the
// end-to-end behavior on 429/3088 with the backend and monitoring teams.
val maxAllowedMinutesWithoutAnyProgress = 45
//scalastyle:on magic.number

// let's say the spark executor VM has 16 CPU cores.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.azure.cosmos.spark.ChangeFeedPartitionReader.LsnPropertyName
import com.azure.cosmos.spark.CosmosPredicates.requireNotNull
import com.azure.cosmos.spark.CosmosTableSchemaInferrer.LsnAttributeName
import com.azure.cosmos.spark.diagnostics.LoggerHelper
import com.azure.cosmos.util.CosmosPagedIterable
import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.Row
Expand Down Expand Up @@ -56,15 +57,17 @@ private case class ChangeFeedPartitionReader
SparkBridgeImplementationInternal.extractLsnFromChangeFeedContinuation(this.partition.continuationState.get)
log.logDebug(s"Request options for Range '${partition.feedRange.min}-${partition.feedRange.max}' LSN '$startLsn'")

CosmosChangeFeedRequestOptions.createForProcessingFromContinuation(this.partition.continuationState.get)
CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(this.partition.continuationState.get)
.setMaxItemCount(readConfig.maxItemCount)
}

private val rowSerializer: ExpressionEncoder.Serializer[Row] = RowSerializerPool.getOrCreateSerializer(readSchema)

private lazy val iterator: PeekingIterator[ObjectNode] = Iterators.peekingIterator(
cosmosAsyncContainer
.queryChangeFeed(changeFeedRequestOptions, classOf[ObjectNode])
.toIterable
new CosmosPagedIterable[ObjectNode](
cosmosAsyncContainer.queryChangeFeed(changeFeedRequestOptions, classOf[ObjectNode]),
readConfig.maxItemCount)
.iterator()
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ private object CosmosConfigNames {
val ApplicationName = "spark.cosmos.applicationName"
val UseGatewayMode = "spark.cosmos.useGatewayMode"
val ReadCustomQuery = "spark.cosmos.read.customQuery"
val ReadMaxItemCount = "spark.cosmos.read.maxItemCount"
val ReadForceEventualConsistency = "spark.cosmos.read.forceEventualConsistency"
val ReadSchemaConversionMode = "spark.cosmos.read.schemaConversionMode"
val ReadInferSchemaSamplingSize = "spark.cosmos.read.inferSchema.samplingSize"
Expand Down Expand Up @@ -85,6 +86,7 @@ private object CosmosConfigNames {
ReadCustomQuery,
ReadForceEventualConsistency,
ReadSchemaConversionMode,
ReadMaxItemCount,
ReadInferSchemaSamplingSize,
ReadInferSchemaEnabled,
ReadInferSchemaIncludeSystemProperties,
Expand Down Expand Up @@ -300,6 +302,7 @@ private object CosmosAccountConfig {

private case class CosmosReadConfig(forceEventualConsistency: Boolean,
schemaConversionMode: SchemaConversionMode,
maxItemCount: Int,
customQuery: Option[CosmosParameterizedQuery])

private object SchemaConversionModes extends Enumeration {
Expand All @@ -311,6 +314,7 @@ private object SchemaConversionModes extends Enumeration {

private object CosmosReadConfig {
private val DefaultSchemaConversionMode: SchemaConversionMode = SchemaConversionModes.Relaxed
private val DefaultMaxItemCount : Int = 1000

private val ForceEventualConsistency = CosmosConfigEntry[Boolean](key = CosmosConfigNames.ReadForceEventualConsistency,
mandatory = false,
Expand Down Expand Up @@ -339,12 +343,20 @@ private object CosmosReadConfig {
"etc.) that cannot be pushed down yet (at least in Spark 3.1) - so the custom query is a fallback to allow " +
"them to be pushed into the query sent to Cosmos.")

private val MaxItemCount = CosmosConfigEntry[Int](
key = CosmosConfigNames.ReadMaxItemCount,
mandatory = false,
defaultValue = Some(DefaultMaxItemCount),
parseFromStringFunction = queryText => queryText.toInt,
helpMessage = "The maximum number of documents returned in a single request. The default is 1000.")

def parseCosmosReadConfig(cfg: Map[String, String]): CosmosReadConfig = {
val forceEventualConsistency = CosmosConfigEntry.parse(cfg, ForceEventualConsistency)
val jsonSchemaConversionMode = CosmosConfigEntry.parse(cfg, JsonSchemaConversion)
val customQuery = CosmosConfigEntry.parse(cfg, CustomQuery)
val maxItemCount = CosmosConfigEntry.parse(cfg, MaxItemCount)

CosmosReadConfig(forceEventualConsistency.get, jsonSchemaConversionMode.get, customQuery)
CosmosReadConfig(forceEventualConsistency.get, jsonSchemaConversionMode.get, maxItemCount.get, customQuery)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ package com.azure.cosmos.spark
import com.azure.cosmos.CosmosAsyncClient
import com.azure.cosmos.models.CosmosQueryRequestOptions
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
import com.azure.cosmos.util.CosmosPagedIterable
import com.fasterxml.jackson.databind.JsonNode
import org.apache.spark.sql.catalyst.analysis.TypeCoercion

import java.util.stream.Collectors

// scalastyle:off underscore.import
import com.fasterxml.jackson.databind.node._

Expand Down Expand Up @@ -104,10 +107,10 @@ private object CosmosTableSchemaInferrer
val pagedFluxResponse =
sourceContainer.queryItems(queryText, queryOptions, classOf[ObjectNode])

val feedResponseList = pagedFluxResponse
.take(cosmosInferenceConfig.inferSchemaSamplingSize)
.collectList
.block
val feedResponseList = new CosmosPagedIterable[ObjectNode](pagedFluxResponse, cosmosReadConfig.maxItemCount)
.stream()
.limit(cosmosInferenceConfig.inferSchemaSamplingSize)
.collect(Collectors.toList[ObjectNode]())

inferSchema(feedResponseList.asScala,
cosmosInferenceConfig.inferSchemaQuery.isDefined || cosmosInferenceConfig.includeSystemProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.azure.cosmos.implementation.spark.{OperationContextAndListenerTuple,
import com.azure.cosmos.implementation.{CosmosClientMetadataCachesSnapshot, ImplementationBridgeHelpers, SparkBridgeImplementationInternal}
import com.azure.cosmos.models.{CosmosParameterizedQuery, CosmosQueryRequestOptions}
import com.azure.cosmos.spark.diagnostics.{DiagnosticsContext, DiagnosticsLoader, LoggerHelper, SparkTaskContext}
import com.azure.cosmos.util.CosmosPagedIterable
import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
Expand Down Expand Up @@ -71,15 +72,13 @@ private case class ItemsPartitionReader

queryOptions.setFeedRange(SparkBridgeImplementationInternal.toFeedRange(feedRange))

private lazy val iterator = cosmosAsyncContainer.queryItems(
cosmosQuery.toSqlQuerySpec,
queryOptions,
classOf[ObjectNode]
).toIterable.iterator()
private lazy val iterator = new CosmosPagedIterable[ObjectNode](
cosmosAsyncContainer.queryItems(cosmosQuery.toSqlQuerySpec, queryOptions, classOf[ObjectNode]),
readConfig.maxItemCount
).iterator()

private val rowSerializer: ExpressionEncoder.Serializer[Row] = RowSerializerPool.getOrCreateSerializer(readSchema)


override def next(): Boolean = iterator.hasNext

override def get(): InternalRow = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ class FilterAnalyzerSpec extends UnitSpec {
//scalastyle:off multiple.string.literals
//scalastyle:off magic.number

private[this] val readConfigWithoutCustomQuery = new CosmosReadConfig(true, SchemaConversionModes.Relaxed, None)
private[this] val readConfigWithoutCustomQuery =
new CosmosReadConfig(true, SchemaConversionModes.Relaxed, 100, None)
private[this] val queryText = "SELECT * FROM c WHERE c.abc='Hello World'"
private[this] val query = Some(CosmosParameterizedQuery(
queryText,
Expand All @@ -22,6 +23,7 @@ class FilterAnalyzerSpec extends UnitSpec {
private[this] val readConfigWithCustomQuery = new CosmosReadConfig(
true,
SchemaConversionModes.Relaxed,
100,
query)

"many filters" should "be translated to cosmos predicates with AND" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class SparkE2EChangeFeedITest
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.read.maxItemCount" -> "2",
"spark.cosmos.read.inferSchema.enabled" -> "false"
)

Expand All @@ -55,6 +56,7 @@ class SparkE2EChangeFeedITest
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.read.inferSchema.enabled" -> "false",
"spark.cosmos.read.maxItemCount" -> "1",
"spark.cosmos.changeFeed.mode" -> "Incremental"
)

Expand Down Expand Up @@ -84,6 +86,7 @@ class SparkE2EChangeFeedITest
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.read.maxItemCount" -> "1",
"spark.cosmos.read.inferSchema.enabled" -> "false"
)

Expand Down Expand Up @@ -122,6 +125,7 @@ class SparkE2EChangeFeedITest
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.read.inferSchema.enabled" -> "false",
"spark.cosmos.changeFeed.mode" -> "FullFidelity",
"spark.cosmos.read.maxItemCount" -> "1",
"spark.cosmos.changeFeed.startFrom" -> "NOW"
)

Expand Down Expand Up @@ -150,6 +154,7 @@ class SparkE2EChangeFeedITest
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.read.inferSchema.enabled" -> "false",
"spark.cosmos.read.maxItemCount" -> "200000",
"spark.cosmos.changeFeed.startFrom" -> "Beginning",
"spark.cosmos.read.partitioning.strategy" -> "Restrictive"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class SparkE2EQueryITest
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.read.maxItemCount" -> "1",
"spark.cosmos.read.partitioning.strategy" -> "Restrictive"
)

Expand Down Expand Up @@ -150,6 +151,7 @@ class SparkE2EQueryITest
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.read.maxItemCount" -> "5",
"spark.cosmos.read.partitioning.strategy" -> "Restrictive"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void close() {
}

private <T> CosmosPagedIterable<T> getCosmosPagedIterable(CosmosPagedFlux<T> cosmosPagedFlux) {
return UtilBridgeInternal.createCosmosPagedIterable(cosmosPagedFlux);
return new CosmosPagedIterable<>(cosmosPagedFlux);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ public CosmosScripts getScripts() {
// TODO: should make partitionkey public in CosmosAsyncItem and fix the below call

private <T> CosmosPagedIterable<T> getCosmosPagedIterable(CosmosPagedFlux<T> cosmosPagedFlux) {
return UtilBridgeInternal.createCosmosPagedIterable(cosmosPagedFlux);
return new CosmosPagedIterable<>(cosmosPagedFlux);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ <T> T throughputResponseToBlock(Mono<T> throughputResponse) {
}

private <T> CosmosPagedIterable<T> getCosmosPagedIterable(CosmosPagedFlux<T> cosmosPagedFlux) {
return UtilBridgeInternal.createCosmosPagedIterable(cosmosPagedFlux);
return new CosmosPagedIterable<>(cosmosPagedFlux);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,6 @@ CosmosTriggerResponse blockTriggerResponse(Mono<CosmosTriggerResponse> responseM
}

private <T> CosmosPagedIterable<T> getCosmosPagedIterable(CosmosPagedFlux<T> cosmosPagedFlux) {
return UtilBridgeInternal.createCosmosPagedIterable(cosmosPagedFlux);
return new CosmosPagedIterable<>(cosmosPagedFlux);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,22 @@ public final class CosmosPagedIterable<T> extends ContinuablePagedIterable<Strin
*
* @param cosmosPagedFlux the paged flux use as iterable
*/
CosmosPagedIterable(CosmosPagedFlux<T> cosmosPagedFlux) {
public CosmosPagedIterable(CosmosPagedFlux<T> cosmosPagedFlux) {
super(cosmosPagedFlux);
this.cosmosPagedFlux = cosmosPagedFlux;
}

/**
* Creates instance given {@link CosmosPagedFlux}.
*
* @param cosmosPagedFlux the paged flux use as iterable
* @param batchSize the preferred batchSize to be used when pulling data from the service
*/
public CosmosPagedIterable(CosmosPagedFlux<T> cosmosPagedFlux, int batchSize) {
super(cosmosPagedFlux, batchSize);
this.cosmosPagedFlux = cosmosPagedFlux;
}

/**
* Handle for invoking "side-effects" on each FeedResponse returned by CosmosPagedIterable
*
Expand Down

0 comments on commit ff7be27

Please sign in to comment.