Skip to content

Commit

Permalink
Fixing maxItemCount from Spark to control batch Size of query requests (
Browse files Browse the repository at this point in the history
Azure#23963)

* Fixing maxItemCount from Spark to control batch Size of query requests

* Fixing build warnings

* Adding more robust temp file cleanup

* Fixing test issues

* Reducing BulkWriter.DefaultMaxPendingOperationPerCore
  • Loading branch information
FabianMeiswinkel authored Sep 10, 2021
1 parent 7f0dd99 commit eb3b7d2
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,14 @@ private object BulkWriter {
// let's say we have a cosmos container with 1M RU which is 167 partitions
// let's say we are ingesting items of size 1KB
// let's say max request size is 1MB
// hence we want 2MB/ 1KB items per partition to be buffered
// 2 * 1024 * 167 items should get buffered on a 16 CPU core VM
// so per CPU core we want (2 * 1024 * 167 / 16) max items to be buffered
val DefaultMaxPendingOperationPerCore: Int = 2 * 1024 * 167 / 16
// hence we want 1MB/ 1KB items per partition to be buffered
// 1024 * 167 items should get buffered on a 16 CPU core VM
// so per CPU core we want (1024 * 167 / 16) max items to be buffered
// Reduced the targeted buffer from 2MB per partition and core to 1 MB because
// we had a few customers seeing to high CPU usage with the previous setting
// Reason is that several customers use larger than 1 KB documents so we need
// to be less aggressive with the buffering
val DefaultMaxPendingOperationPerCore: Int = 1024 * 167 / 16

val emitFailureHandler: EmitFailureHandler =
(_, emitResult) => if (emitResult.equals(EmitResult.FAIL_NON_SERIALIZED)) true else false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
package com.azure.cosmos.spark

import com.azure.cosmos.implementation.routing.LocationHelper
import com.azure.cosmos.implementation.spark.OperationListener
import com.azure.cosmos.models.{CosmosChangeFeedRequestOptions, CosmosParameterizedQuery, FeedRange}
import com.azure.cosmos.spark.ChangeFeedModes.ChangeFeedMode
import com.azure.cosmos.spark.ChangeFeedStartFromModes.{ChangeFeedStartFromMode, PointInTime}
import com.azure.cosmos.spark.ItemWriteStrategy.{ItemWriteStrategy, values}
import com.azure.cosmos.spark.PartitioningStrategies.PartitioningStrategy
import com.azure.cosmos.spark.SchemaConversionModes.SchemaConversionMode
import com.azure.cosmos.spark.diagnostics.SimpleDiagnosticsProvider
import com.azure.cosmos.spark.diagnostics.{DiagnosticsProvider, SimpleDiagnosticsProvider}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
Expand Down Expand Up @@ -410,7 +409,7 @@ private object DiagnosticsConfig {
classOf[SimpleDiagnosticsProvider].getName
} else {
// this is experimental and to be used by cosmos db dev engineers.
Class.forName(diagnostics).asSubclass(classOf[OperationListener]).getDeclaredConstructor()
Class.forName(diagnostics).asSubclass(classOf[DiagnosticsProvider]).getDeclaredConstructor()
diagnostics
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.azure.cosmos.implementation.{HttpConstants, OperationType, RxDocument

// scalastyle:off multiple.string.literals

private[spark] final class SimpleDiagnosticsSlf4jLogger(classType: Class[_])
private[spark] class SimpleDiagnosticsSlf4jLogger(classType: Class[_])
extends DefaultMinimalSlf4jLogger(classType: Class[_]) {

override def logItemWriteCompletion(writeOperation: WriteOperation): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.spark

import com.azure.cosmos.spark.diagnostics.{DiagnosticsProvider, ILogger}
import org.apache.commons.io.FileUtils

import java.io.{File, IOException}
import java.nio.file.Files
import scala.collection.concurrent.TrieMap

// only when diagnostics enabled,
// - logs each individual writes success and/or failures with id,pk
// - logs each documentServiceRequest and documentServiceResponse
private[spark] class SimpleFileDiagnosticsProvider extends DiagnosticsProvider {
override def getLogger(classType: Class[_]): ILogger =
SimpleFileDiagnosticsProvider.getOrCreateSingletonLoggerInstance(classType)
}

private[spark] object SimpleFileDiagnosticsProvider {
val singletonInstances: TrieMap[Class[_], SimpleFileDiagnosticsSlf4jLogger] =
new TrieMap[Class[_], SimpleFileDiagnosticsSlf4jLogger]()

private val folderName = System.getProperty("java.io.tmpdir") + "/SimpleFileDiagnostics"
private val folder = new File(folderName)
FileUtils.forceDeleteOnExit(folder)

def getOrCreateSingletonLoggerInstance(classType: Class[_]): SimpleFileDiagnosticsSlf4jLogger = {
if (!folder.exists()) {
folder.mkdirs()
}

singletonInstances.applyOrElse(
classType,
ct => new SimpleFileDiagnosticsSlf4jLogger(ct))
}

def getLogFile(classType: Class[_]): File = {
folder.createNewFile()
val logFile = new File(
folder.getAbsolutePath()
+ File.separator
+ classType.getSimpleName().replace("$", "") + ".txt")

FileUtils.forceDeleteOnExit(logFile)
logFile
}

def reset(): Unit = {
try {
FileUtils.deleteDirectory(folder)
} catch {
case _: IOException =>
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.spark

import com.azure.cosmos.spark.diagnostics.SimpleDiagnosticsSlf4jLogger
import com.nimbusds.jose.util.StandardCharset

import java.io.{BufferedReader, File, FileOutputStream, FileReader}
import java.nio.charset.StandardCharsets
import scala.collection.mutable.ListBuffer

// scalastyle:off multiple.string.literals

private[spark] final class SimpleFileDiagnosticsSlf4jLogger(classType: Class[_])
extends SimpleDiagnosticsSlf4jLogger(classType: Class[_]) {

private val thisLock = new Object()
private val file =SimpleFileDiagnosticsProvider.getLogFile(classType)

if (!file.exists()) {
file.createNewFile()
}

private val fos = new FileOutputStream(file, true)

override def logInfo(msg: => String): Unit = {
super.logInfo(msg)
val line = msg + "|||"
writeToFile(line)
}

override def logInfo(msg: => String, throwable: Throwable): Unit = {
super.logInfo(msg, throwable)
val line = if (Option.apply(throwable).isDefined) {
msg + "|||" + throwable.toString.filter(_ >= ' ')
} else {
msg + "|||"
}

writeToFile(line)
}

private def writeToFile(line: String) {
thisLock.synchronized {
val fileLock = fos.getChannel.lock()
try {
fos.write(line.getBytes(StandardCharsets.UTF_8))
fos.write(System.getProperty("line.separator").getBytes())
fos.flush()
fileLock.release()
} catch {
case t: Throwable => fileLock.release()
}
}
}

def getMessages(): ListBuffer[(String, Option[String])] = {
val result = new ListBuffer[(String, Option[String])]
thisLock.synchronized {
val reader = new BufferedReader(new FileReader(file))
var continue = true
while (continue) {
val line = reader.readLine()
if (line == null) {
continue = false
} else {
val columns = line.split("\\|\\|\\|")
if (columns.size == 2 && columns(1) != null && columns(1).length > 0) {
result.append((columns(0), Some(columns(1))))
} else {
result.append((columns(0), None))
}
}
}

}
result
}
}
// // scalastyle:on multiple.string.literals

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class SparkE2EQueryITest

//scalastyle:off multiple.string.literals
//scalastyle:off magic.number
//scalastyle:off file.size.limit
//scalastyle:off null

// NOTE: due to some bug in the emulator, sub-range feed range doesn't work
// "spark.cosmos.read.partitioning.strategy" -> "Restrictive" is added to the query tests
Expand Down Expand Up @@ -202,6 +204,80 @@ class SparkE2EQueryITest
item.getAs[String]("id") shouldEqual id
}

"spark query" can "override maxItemCount" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY

val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainer)

// assert that there is more than one range to ensure the test really is testing the parallelization of work
container.getFeedRanges.block().size() should be > 1

for (age <- 1 to 20) {
for (state <- Array(true, false)) {
val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
objectNode.put("name", "Shrodigner's cat")
objectNode.put("type", "cat")
objectNode.put("age", age)
objectNode.put("isAlive", state)
objectNode.put("id", UUID.randomUUID().toString)
container.createItem(objectNode).block()
}
}

val cfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.read.partitioning.strategy" -> "Restrictive",
"spark.cosmos.read.maxItemCount" -> "2",
"spark.cosmos.diagnostics" -> SimpleFileDiagnosticsProvider.getClass.getName.replace("$", "")
)

SimpleFileDiagnosticsProvider.reset()

// scalastyle:off underscore.import
// scalastyle:off import.grouping
import org.apache.spark.sql.types._
// scalastyle:on underscore.import
// scalastyle:on import.grouping

val customSchema = StructType(Array(
StructField("id", StringType),
StructField("name", StringType),
StructField("type", StringType),
StructField("age", IntegerType),
StructField("isAlive", BooleanType)
))

val df = spark.read.schema(customSchema).format("cosmos.oltp").options(cfg).load()
val rowsArray = df.where("isAlive = 'true' and type = 'cat'").orderBy("age").collect()
rowsArray should have size 20

for (index <- 0 until rowsArray.length) {
val row = rowsArray(index)
row.getAs[String]("name") shouldEqual "Shrodigner's cat"
row.getAs[String]("type") shouldEqual "cat"
row.getAs[Integer]("age") shouldEqual index + 1
row.getAs[Boolean]("isAlive") shouldEqual true
}

// validate from diagnostics that all responses had at most 2 records (instead of the default of up to 100)
val logger = SimpleFileDiagnosticsProvider.getOrCreateSingletonLoggerInstance(ItemsPartitionReader.getClass)
val messages = logger.getMessages()
SimpleFileDiagnosticsProvider.reset()
messages should not be null
messages.size should not be 0
for ((msg, throwable) <- messages) {
val itemCountPos = msg.indexOf("itemCount:")
if (itemCountPos > 0) {
val startPos = itemCountPos + "itemCount:".length
val itemCount = msg.substring(startPos, msg.indexOf(",", startPos)).toInt
itemCount should be <= 2
}
}
}

"spark query" can "use user provided schema" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY
Expand Down Expand Up @@ -495,6 +571,8 @@ class SparkE2EQueryITest
container.createItem(objectNode).block()
}

Thread.sleep(2000)

for( _ <- 1 to samplingSize) {
val objectNode2 = Utils.getSimpleObjectMapper.createObjectNode()
val arr = objectNode2.putArray("object_array")
Expand Down Expand Up @@ -544,6 +622,8 @@ class SparkE2EQueryITest
container.createItem(objectNode).block()
}

Thread.sleep(2000)

for( _ <- 1 to samplingSize) {
val objectNode2 = Utils.getSimpleObjectMapper.createObjectNode()
objectNode2.put("wheels", 4)
Expand Down Expand Up @@ -778,4 +858,6 @@ class SparkE2EQueryITest

//scalastyle:on magic.number
//scalastyle:on multiple.string.literals
//scalastyle:on file.size.limit
//scalastyle:on null
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,24 @@ public final class CosmosPagedFlux<T> extends ContinuablePagedFlux<String, T, Fe

private final Consumer<FeedResponse<T>> feedResponseConsumer;
private ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor cosmosDiagnosticsAccessor;
private final int defaultPageSize;

CosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction) {
this.optionsFluxFunction = optionsFluxFunction;
this.feedResponseConsumer = null;
this.cosmosDiagnosticsAccessor = ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor();
this(optionsFluxFunction, null, -1);
}

CosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction,
Consumer<FeedResponse<T>> feedResponseConsumer) {
this(optionsFluxFunction, feedResponseConsumer, -1);
}

CosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction,
Consumer<FeedResponse<T>> feedResponseConsumer,
int defaultPageSize) {
this.optionsFluxFunction = optionsFluxFunction;
this.feedResponseConsumer = feedResponseConsumer;
this.cosmosDiagnosticsAccessor = ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor();
this.defaultPageSize = defaultPageSize;
}

/**
Expand All @@ -100,27 +106,27 @@ public CosmosPagedFlux<T> handle(Consumer<FeedResponse<T>> newFeedResponseConsum

@Override
public Flux<FeedResponse<T>> byPage() {
CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
CosmosPagedFluxOptions cosmosPagedFluxOptions = this.createCosmosPagedFluxOptions();
return FluxUtil.fluxContext(context -> byPage(cosmosPagedFluxOptions, context));
}

@Override
public Flux<FeedResponse<T>> byPage(String continuationToken) {
CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
CosmosPagedFluxOptions cosmosPagedFluxOptions = this.createCosmosPagedFluxOptions();
cosmosPagedFluxOptions.setRequestContinuation(continuationToken);
return FluxUtil.fluxContext(context -> byPage(cosmosPagedFluxOptions, context));
}

@Override
public Flux<FeedResponse<T>> byPage(int preferredPageSize) {
CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
CosmosPagedFluxOptions cosmosPagedFluxOptions = this.createCosmosPagedFluxOptions();
cosmosPagedFluxOptions.setMaxItemCount(preferredPageSize);
return FluxUtil.fluxContext(context -> byPage(cosmosPagedFluxOptions, context));
}

@Override
public Flux<FeedResponse<T>> byPage(String continuationToken, int preferredPageSize) {
CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
CosmosPagedFluxOptions cosmosPagedFluxOptions = this.createCosmosPagedFluxOptions();
cosmosPagedFluxOptions.setRequestContinuation(continuationToken);
cosmosPagedFluxOptions.setMaxItemCount(preferredPageSize);
return FluxUtil.fluxContext(context -> byPage(cosmosPagedFluxOptions, context));
Expand All @@ -144,6 +150,20 @@ public void subscribe(CoreSubscriber<? super T> coreSubscriber) {
}).subscribe(coreSubscriber);
}

CosmosPagedFlux<T> withDefaultPageSize(int pageSize) {
return new CosmosPagedFlux<T>(this.optionsFluxFunction, this.feedResponseConsumer, pageSize);
}

private CosmosPagedFluxOptions createCosmosPagedFluxOptions() {
CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();

if (this.defaultPageSize > 0) {
cosmosPagedFluxOptions.setMaxItemCount(this.defaultPageSize);
}

return cosmosPagedFluxOptions;
}

private Flux<FeedResponse<T>> byPage(CosmosPagedFluxOptions pagedFluxOptions, Context context) {
final AtomicReference<Context> parentContext = new AtomicReference<>(Context.NONE);
AtomicReference<Instant> startTime = new AtomicReference<>();
Expand Down
Loading

0 comments on commit eb3b7d2

Please sign in to comment.