Skip to content

Commit

Permalink
Adding operation context and elevating few logs on query path (Azure#…
Browse files Browse the repository at this point in the history
…28303)

* Removes limitation to not honor `spark.cosmos.partitioning.feedRangeFilter` in `cosmos.oltp.changeFeed` DataSource.

* Adding changelog

* Adding operation context and elevating few logs on query path

* Update TransientIOErrorsRetryingIterator.scala

* Revert "Update TransientIOErrorsRetryingIterator.scala"

This reverts commit 46f2e8b.

* Revert "Adding operation context and elevating few logs on query path"

This reverts commit 843f345.

* Revert "Revert "Adding operation context and elevating few logs on query path""

This reverts commit 01987d5.

* Revert "Revert "Update TransientIOErrorsRetryingIterator.scala""

This reverts commit 23bf7dd.

* Update ItemsPartitionReader.scala

* Few additional logs

* Update ParallelDocumentQueryExecutionContext.java

* Update ParallelDocumentQueryExecutionContextBase.java

* Adding logs

* Update TransientIOErrorsRetryingIterator.scala
  • Loading branch information
FabianMeiswinkel authored Apr 22, 2022
1 parent 4584aa9 commit a8e6cf9
Show file tree
Hide file tree
Showing 16 changed files with 240 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

package com.azure.cosmos.spark

import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple
import com.azure.cosmos.implementation.spark.{OperationContextAndListenerTuple, OperationListener}
import com.azure.cosmos.implementation.{CosmosClientMetadataCachesSnapshot, ImplementationBridgeHelpers, SparkBridgeImplementationInternal, SparkRowItem, Strings}
import com.azure.cosmos.models.{CosmosParameterizedQuery, CosmosQueryRequestOptions, ModelBridgeInternal}
import com.azure.cosmos.spark.BulkWriter.getThreadInfo
import com.azure.cosmos.spark.diagnostics.{DiagnosticsContext, DiagnosticsLoader, LoggerHelper, SparkTaskContext}
import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.spark.TaskContext
Expand All @@ -16,6 +17,8 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.types.StructType

import java.util.UUID

// per spark task there will be one CosmosPartitionReader.
// This provides iterator to read from the assigned spark partition
// For now we are creating only one spark partition
Expand All @@ -32,13 +35,21 @@ private case class ItemsPartitionReader
extends PartitionReader[InternalRow] {

private lazy val log = LoggerHelper.getLogger(diagnosticsConfig, this.getClass)
log.logInfo(s"Instantiated ${this.getClass.getSimpleName}")

private val queryOptions = ImplementationBridgeHelpers
.CosmosQueryRequestOptionsHelper
.getCosmosQueryRequestOptionsAccessor
.disallowQueryPlanRetrieval(new CosmosQueryRequestOptions())

private val operationContext = initializeOperationContext()

log.logInfo(s"Instantiated ${this.getClass.getSimpleName}, Context: ${operationContext.toString} ${getThreadInfo}")

private val containerTargetConfig = CosmosContainerConfig.parseCosmosContainerConfig(config)
log.logInfo(s"Reading from feed range $feedRange of " +
s"container ${containerTargetConfig.database}.${containerTargetConfig.container} - " +
s"correlationActivityId ${diagnosticsContext.correlationActivityId}, " +
s"query: ${cosmosQuery.toString}")
s"query: ${cosmosQuery.toString}, Context: ${operationContext.toString} ${getThreadInfo}")

private val readConfig = CosmosReadConfig.parseCosmosReadConfig(config)
private val clientCacheItem = CosmosClientCache(
Expand All @@ -51,37 +62,38 @@ private case class ItemsPartitionReader
config, containerTargetConfig, clientCacheItem.client)
SparkUtils.safeOpenConnectionInitCaches(cosmosAsyncContainer, log)

private val queryOptions = ImplementationBridgeHelpers
.CosmosQueryRequestOptionsHelper
.getCosmosQueryRequestOptionsAccessor
.disallowQueryPlanRetrieval(new CosmosQueryRequestOptions())


private val cosmosSerializationConfig = CosmosSerializationConfig.parseSerializationConfig(config)
private val cosmosRowConverter = CosmosRowConverter.get(cosmosSerializationConfig)

private var operationContextAndListenerTuple: Option[OperationContextAndListenerTuple] = None

initializeDiagnosticsIfConfigured()

private def initializeDiagnosticsIfConfigured(): Unit = {
if (diagnosticsConfig.mode.isDefined) {
val taskContext = TaskContext.get
assert(taskContext != null)
private def initializeOperationContext(): SparkTaskContext = {
val taskContext = TaskContext.get

val taskDiagnosticsContext = SparkTaskContext(
diagnosticsContext.correlationActivityId,
if (taskContext != null) {
val taskDiagnosticsContext = SparkTaskContext(diagnosticsContext.correlationActivityId,
taskContext.stageId(),
taskContext.partitionId(),
taskContext.taskAttemptId(),
feedRange.toString + " " + cosmosQuery.toString)

val listener =
val listener: OperationListener =
DiagnosticsLoader.getDiagnosticsProvider(diagnosticsConfig).getLogger(this.getClass)

operationContextAndListenerTuple =
Some(new OperationContextAndListenerTuple(taskDiagnosticsContext, listener))
val operationContextAndListenerTuple = new OperationContextAndListenerTuple(taskDiagnosticsContext, listener)
ImplementationBridgeHelpers.CosmosQueryRequestOptionsHelper
.getCosmosQueryRequestOptionsAccessor.setOperationContext(queryOptions, operationContextAndListenerTuple.get)
.getCosmosQueryRequestOptionsAccessor
.setOperationContext(queryOptions, operationContextAndListenerTuple)

taskDiagnosticsContext
} else{
SparkTaskContext(diagnosticsContext.correlationActivityId,
-1,
-1,
-1,
"")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.azure.cosmos.spark

import com.azure.cosmos.CosmosException
import com.azure.cosmos.implementation.guava25.base.Throwables
import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple
import com.azure.cosmos.models.FeedResponse
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
Expand Down Expand Up @@ -48,6 +49,14 @@ private class TransientIOErrorsRetryingIterator[TSparkRow]
private val lastContinuationToken = new AtomicReference[String](null)
// scalastyle:on null
private val retryCount = new AtomicLong(0)
private lazy val operationContextString = operationContextAndListener match {
case Some(o) => if (o.getOperationContext != null) {
o.getOperationContext.toString
} else {
"n/a"
}
case None => "n/a"
}

private[spark] var currentFeedResponseIterator: Option[BufferedIterator[FeedResponse[TSparkRow]]] = None
private[spark] var currentItemIterator: Option[BufferedIterator[TSparkRow]] = None
Expand Down Expand Up @@ -83,7 +92,10 @@ private class TransientIOErrorsRetryingIterator[TSparkRow]
case None =>
val newPagedFlux = Some(cosmosPagedFluxFactory.apply(lastContinuationToken.get))
lastPagedFlux.getAndSet(newPagedFlux) match {
case Some(oldPagedFlux) => oldPagedFlux.cancelOn(Schedulers.boundedElastic())
case Some(oldPagedFlux) => {
logInfo(s"Attempting to cancel oldPagedFlux, Context: $operationContextString")
oldPagedFlux.cancelOn(Schedulers.boundedElastic())
}
case None =>
}
currentFeedResponseIterator = Some(
Expand Down Expand Up @@ -119,7 +131,6 @@ private class TransientIOErrorsRetryingIterator[TSparkRow]
// need to get attempt to get next FeedResponse to determine whether more records exist
None
}

} else {
Some(false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,12 @@ public Flux<FeedResponse<T>> executeAsync() {
INITIAL_TOP_VALUE,
this.options.getMaxItemCount(),
this.options.getMaxPrefetchPageCount(),
ModelBridgeInternal.getChangeFeedIsSplitHandlingDisabled(this.options));
ModelBridgeInternal.getChangeFeedIsSplitHandlingDisabled(this.options),
ImplementationBridgeHelpers
.CosmosChangeFeedRequestOptionsHelper
.getCosmosChangeFeedRequestOptionsAccessor()
.getOperationContext(this.options)
);
}

private RxDocumentServiceRequest createDocumentServiceRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import org.slf4j.Logger;
Expand All @@ -45,9 +46,10 @@ public ChangeFeedFetcher(
Map<String, Object> requestOptionProperties,
int top,
int maxItemCount,
boolean isSplitHandlingDisabled) {
boolean isSplitHandlingDisabled,
OperationContextAndListenerTuple operationContext) {

super(executeFunc, true, top, maxItemCount);
super(executeFunc, true, top, maxItemCount, operationContext);

checkNotNull(client, "Argument 'client' must not be null.");
checkNotNull(createRequestFunc, "Argument 'createRequestFunc' must not be null.");
Expand Down Expand Up @@ -80,7 +82,8 @@ public ChangeFeedFetcher(
this.changeFeedState,
retryPolicyInstance,
requestOptionProperties,
retryPolicyInstance.getRetryContext());
retryPolicyInstance.getRetryContext(),
() -> this.getOperationContextText());
this.createRequestFunc = () -> {
RxDocumentServiceRequest request = createRequestFunc.get();
this.feedRangeContinuationSplitRetryPolicy.onBeforeSendRequest(request);
Expand Down Expand Up @@ -177,20 +180,26 @@ private static final class FeedRangeContinuationSplitRetryPolicy extends Documen
private final Map<String, Object> requestOptionProperties;
private MetadataDiagnosticsContext diagnosticsContext;
private final RetryContext retryContext;
private final Supplier<String> operationContextTextProvider;

public FeedRangeContinuationSplitRetryPolicy(
RxDocumentClientImpl client,
ChangeFeedState state,
DocumentClientRetryPolicy nextRetryPolicy,
Map<String, Object> requestOptionProperties,
RetryContext retryContext) {
RetryContext retryContext,
Supplier<String> operationContextTextProvider) {

checkNotNull(
operationContextTextProvider,
"Argument 'operationContextTextProvider' must not be null.");
this.client = client;
this.state = state;
this.nextRetryPolicy = nextRetryPolicy;
this.requestOptionProperties = requestOptionProperties;
this.diagnosticsContext = null;
this.retryContext = retryContext;
this.operationContextTextProvider = operationContextTextProvider;
}

@Override
Expand All @@ -205,7 +214,10 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
return this.nextRetryPolicy.shouldRetry(e).flatMap(shouldRetryResult -> {
if (!shouldRetryResult.shouldRetry) {
if (!(e instanceof GoneException)) {
LOGGER.warn("Exception not applicable - will fail the request.", e);
LOGGER.warn(
"Exception not applicable - will fail the request. Context: {}",
this.operationContextTextProvider.get(),
e);
return Mono.just(ShouldRetryResult.noRetry());
}

Expand Down Expand Up @@ -235,9 +247,15 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
.handleSplit(client, (GoneException)e)
.flatMap(splitShouldRetryResult -> {
if (!splitShouldRetryResult.shouldRetry) {
LOGGER.warn("No partition split error - will fail the request.", e);
LOGGER.warn(
"No partition split error - will fail the request. Context: {}",
this.operationContextTextProvider.get(),
e);
} else {
LOGGER.debug("HandleSplit will retry.", e);
LOGGER.debug(
"HandleSplit will retry. Context: {}",
this.operationContextTextProvider.get(),
e);
}

return Mono.just(shouldRetryResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.cosmos.implementation.DocumentClientRetryPolicy;
import com.azure.cosmos.implementation.Exceptions;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.ObservableHelper;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.QueryMetrics;
Expand Down Expand Up @@ -38,6 +39,7 @@
import java.util.concurrent.Callable;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -89,6 +91,7 @@ void populatePartitionedQueryMetrics() {
}

protected final IDocumentQueryClient client;
protected final Supplier<String> operationContextTextProvider;
protected final String collectionRid;
protected final CosmosQueryRequestOptions cosmosQueryRequestOptions;
protected final Class<T> resourceType;
Expand Down Expand Up @@ -119,7 +122,8 @@ public DocumentProducer(
int initialPageSize, // = -1,
String initialContinuationToken,
int top,
FeedRangeEpkImpl feedRange) {
FeedRangeEpkImpl feedRange,
Supplier<String> operationContextTextProvider) {

this.client = client;
this.collectionRid = collectionResourceId;
Expand All @@ -129,7 +133,7 @@ public DocumentProducer(
this.fetchSchedulingMetrics = new SchedulingStopwatch();
this.fetchSchedulingMetrics.ready();
this.fetchExecutionRangeAccumulator = new FetchExecutionRangeAccumulator(feedRange.getRange().toString());

this.operationContextTextProvider = operationContextTextProvider;
this.executeRequestFuncWithRetries = request -> {
retries = -1;
this.fetchSchedulingMetrics.start();
Expand Down Expand Up @@ -181,7 +185,11 @@ public Flux<DocumentProducerFeedResponse> produceAsync() {
executeRequestFuncWithRetries,
top,
pageSize,
Paginator.getPreFetchCount(cosmosQueryRequestOptions, top, pageSize)
Paginator.getPreFetchCount(cosmosQueryRequestOptions, top, pageSize),
ImplementationBridgeHelpers
.CosmosQueryRequestOptionsHelper
.getCosmosQueryRequestOptionsAccessor()
.getOperationContext(cosmosQueryRequestOptions)
)
.map(rsp -> {
lastResponseContinuationToken = rsp.getContinuationToken();
Expand All @@ -198,27 +206,35 @@ private Flux<DocumentProducerFeedResponse> splitProof(Flux<DocumentProducerFeedR
return sourceFeedResponseObservable.onErrorResume( t -> {
CosmosException dce = Utils.as(t, CosmosException.class);
if (dce == null || !isSplit(dce)) {
logger.error("Unexpected failure", t);
logger.error(
"Unexpected failure, Context: {}",
this.operationContextTextProvider.get(),
t);
return Flux.error(t);
}

// we are dealing with Split
logger.info("DocumentProducer handling a partition split in [{}], detail:[{}]", feedRange, dce);
logger.info(
"DocumentProducer handling a partition split in [{}], detail:[{}], Context: {}",
feedRange,
dce,
this.operationContextTextProvider.get());
Mono<Utils.ValueHolder<List<PartitionKeyRange>>> replacementRangesObs = getReplacementRanges(feedRange.getRange());

// Since new DocumentProducers are instantiated for the new replacement ranges, if for the new
// replacement partitions split happens the corresponding DocumentProducer can recursively handle splits.
// so this is resilient to split on splits.
Flux<DocumentProducer<T>> replacementProducers = replacementRangesObs.flux().flatMap(
partitionKeyRangesValueHolder -> {
if (logger.isDebugEnabled()) {
if (logger.isInfoEnabled()) {
logger.info("Cross Partition Query Execution detected partition [{}] split into [{}] partitions,"
+ " last continuation token is [{}].",
feedRange,
partitionKeyRangesValueHolder.v.stream()
.map(ModelBridgeInternal::toJsonFromJsonSerializable)
.collect(Collectors.joining(", ")),
lastResponseContinuationToken);
+ " last continuation token is [{}]. - Context: {}",
feedRange,
partitionKeyRangesValueHolder.v.stream()
.map(ModelBridgeInternal::toJsonFromJsonSerializable)
.collect(Collectors.joining(", ")),
lastResponseContinuationToken,
this.operationContextTextProvider.get());
}
return Flux.fromIterable(createReplacingDocumentProducersOnSplit(partitionKeyRangesValueHolder.v));
});
Expand Down Expand Up @@ -257,7 +273,8 @@ protected DocumentProducer<T> createChildDocumentProducerOnSplit(
pageSize,
initialContinuationToken,
top,
new FeedRangeEpkImpl(targetRange.toRange()));
new FeedRangeEpkImpl(targetRange.toRange()),
this.operationContextTextProvider);
}

private Mono<Utils.ValueHolder<List<PartitionKeyRange>>> getReplacementRanges(Range<String> range) {
Expand Down
Loading

0 comments on commit a8e6cf9

Please sign in to comment.