Skip to content

Commit

Permalink
Fixed an issue causing failures when using change feed in batch mode …
Browse files Browse the repository at this point in the history
…with a batch location and `ChangeFeedBatch.planInputPartitions` is called multiple times (Azure#39635)

* Fixing ADF issue

* Changelogs

* Fixing RowPoolSerializer regression

* Update SparkE2EChangeFeedITest.scala

* Reverting scala test version upgrade

* Exclduing netty-all in Spark 3.1 and 3.2

* Revert "Reverting scala test version upgrade"

This reverts commit 220931f.

* Update pom.xml

* Adding CollectionRid validation when existing and new latestOffset differ
  • Loading branch information
FabianMeiswinkel authored Apr 11, 2024
1 parent 73c9151 commit 6c57b99
Show file tree
Hide file tree
Showing 17 changed files with 322 additions and 242 deletions.
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed an issue causing failures when using change feed in batch mode with a batch location and `ChangeFeedBatch.planInputPartitions` is called multiple times (for example because physcial query plan gets retrieved) and some changes have been made in the monitored container between those calls). - See [PR 39635](https://github.com/Azure/azure-sdk-for-java/pull/39635)

#### Other Changes
* Optimized the partitioning strategy implementation details to avoid unnecessarily high RU usage. - See [PR 39438](https://github.com/Azure/azure-sdk-for-java/pull/39438)
Expand Down
12 changes: 12 additions & 0 deletions sdk/cosmos/azure-cosmos-spark_3-1_2-12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,24 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.1</version> <!-- {x-version-update;cosmos-spark_3-1_org.apache.spark:spark-sql_2.12;external_dependency} -->
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.1.1</version> <!-- {x-version-update;cosmos-spark_3-1_org.apache.spark:spark-hive_2.12;external_dependency} -->
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.types.StructType

import java.time.Instant

/**
* Spark serializers are not thread-safe - and expensive to create (dynamic code generation)
* So we will use this object pool to allow reusing serializers based on the targeted schema.
Expand All @@ -17,33 +15,15 @@ import java.time.Instant
* A clean-up task is used to purge serializers for schemas which weren't used anymore
* For each schema we have an object pool that will use a soft-limit to limit the memory footprint
*/
private object RowSerializerPool extends RowSerializerPoolBase[RowSerializerQueue] {
private object RowSerializerPool {
private val serializerFactorySingletonInstance =
new RowSerializerPoolInstance((schema: StructType) => RowEncoder(schema).createSerializer())

def getOrCreateSerializer(schema: StructType): ExpressionEncoder.Serializer[Row] = {
schemaScopedSerializerMap.get(schema) match {
case Some(objectPool) => objectPool.borrowSerializer(schema)
case None => RowEncoder(schema).createSerializer()
}
serializerFactorySingletonInstance.getOrCreateSerializer(schema)
}

def returnSerializerToPool(schema: StructType, serializer: ExpressionEncoder.Serializer[Row]): Boolean = {
schemaScopedSerializerMap.get(schema) match {
case Some(objectPool) => objectPool.returnSerializer(serializer)
case None =>
val newQueue = new RowSerializerQueue()
newQueue.returnSerializer(serializer)
schemaScopedSerializerMap.putIfAbsent(schema, newQueue).isEmpty
}
}
}

private class RowSerializerQueue extends RowSerializerQueueBase() {
override def borrowSerializer(schema: StructType): ExpressionEncoder.Serializer[Row] = {
lastBorrowedAny.set(Instant.now.toEpochMilli)
Option.apply(objectPool.poll()) match {
case Some(serializer) =>
estimatedSize.decrementAndGet()
serializer
case None => RowEncoder(schema).createSerializer()
}
serializerFactorySingletonInstance.returnSerializerToPool(schema, serializer)
}
}
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed an issue causing failures when using change feed in batch mode with a batch location and `ChangeFeedBatch.planInputPartitions` is called multiple times (for example because physcial query plan gets retrieved) and some changes have been made in the monitored container between those calls). - See [PR 39635](https://github.com/Azure/azure-sdk-for-java/pull/39635)

#### Other Changes
* Optimized the partitioning strategy implementation details to avoid unnecessarily high RU usage. - See [PR 39438](https://github.com/Azure/azure-sdk-for-java/pull/39438)
Expand Down
12 changes: 12 additions & 0 deletions sdk/cosmos/azure-cosmos-spark_3-2_2-12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,24 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.2.0</version> <!-- {x-version-update;cosmos-spark_3-2_org.apache.spark:spark-sql_2.12;external_dependency} -->
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.2.0</version> <!-- {x-version-update;cosmos-spark_3-2_org.apache.spark:spark-hive_2.12;external_dependency} -->
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.types.StructType

import java.time.Instant

/**
* Spark serializers are not thread-safe - and expensive to create (dynamic code generation)
* So we will use this object pool to allow reusing serializers based on the targeted schema.
Expand All @@ -17,34 +15,15 @@ import java.time.Instant
* A clean-up task is used to purge serializers for schemas which weren't used anymore
* For each schema we have an object pool that will use a soft-limit to limit the memory footprint
*/
private object RowSerializerPool extends RowSerializerPoolBase[RowSerializerQueue] {
def getOrCreateSerializer(schema: StructType): ExpressionEncoder.Serializer[Row] = {
schemaScopedSerializerMap.get(schema) match {
case Some(objectPool) => objectPool.borrowSerializer(schema)
case None => RowEncoder(schema).createSerializer()
}
}

def returnSerializerToPool(schema: StructType, serializer: ExpressionEncoder.Serializer[Row]): Boolean = {
schemaScopedSerializerMap.get(schema) match {
case Some(objectPool) => objectPool.returnSerializer(serializer)
case None =>
val newQueue = new RowSerializerQueue()
newQueue.returnSerializer(serializer)
schemaScopedSerializerMap.putIfAbsent(schema, newQueue).isEmpty
}
private object RowSerializerPool {
private val serializerFactorySingletonInstance =
new RowSerializerPoolInstance((schema: StructType) => RowEncoder(schema).createSerializer())

}
}
def getOrCreateSerializer(schema: StructType): ExpressionEncoder.Serializer[Row] = {
serializerFactorySingletonInstance.getOrCreateSerializer(schema)
}

private class RowSerializerQueue extends RowSerializerQueueBase() {
override def borrowSerializer(schema: StructType): ExpressionEncoder.Serializer[Row] = {
lastBorrowedAny.set(Instant.now.toEpochMilli)
Option.apply(objectPool.poll()) match {
case Some(serializer) =>
estimatedSize.decrementAndGet()
serializer
case None => RowEncoder(schema).createSerializer()
}
}
def returnSerializerToPool(schema: StructType, serializer: ExpressionEncoder.Serializer[Row]): Boolean = {
serializerFactorySingletonInstance.returnSerializerToPool(schema, serializer)
}
}
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed an issue causing failures when using change feed in batch mode with a batch location and `ChangeFeedBatch.planInputPartitions` is called multiple times (for example because physcial query plan gets retrieved) and some changes have been made in the monitored container between those calls). - See [PR 39635](https://github.com/Azure/azure-sdk-for-java/pull/39635)

#### Other Changes
* Optimized the partitioning strategy implementation details to avoid unnecessarily high RU usage. - See [PR 39438](https://github.com/Azure/azure-sdk-for-java/pull/39438)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.types.StructType

import java.time.Instant

/**
* Spark serializers are not thread-safe - and expensive to create (dynamic code generation)
* So we will use this object pool to allow reusing serializers based on the targeted schema.
Expand All @@ -17,34 +15,15 @@ import java.time.Instant
* A clean-up task is used to purge serializers for schemas which weren't used anymore
* For each schema we have an object pool that will use a soft-limit to limit the memory footprint
*/
private object RowSerializerPool extends RowSerializerPoolBase[RowSerializerQueue] {
def getOrCreateSerializer(schema: StructType): ExpressionEncoder.Serializer[Row] = {
schemaScopedSerializerMap.get(schema) match {
case Some(objectPool) => objectPool.borrowSerializer(schema)
case None => RowEncoder(schema).createSerializer()
}
}

def returnSerializerToPool(schema: StructType, serializer: ExpressionEncoder.Serializer[Row]): Boolean = {
schemaScopedSerializerMap.get(schema) match {
case Some(objectPool) => objectPool.returnSerializer(serializer)
case None =>
val newQueue = new RowSerializerQueue()
newQueue.returnSerializer(serializer)
schemaScopedSerializerMap.putIfAbsent(schema, newQueue).isEmpty
}
private object RowSerializerPool {
private val serializerFactorySingletonInstance =
new RowSerializerPoolInstance((schema: StructType) => RowEncoder(schema).createSerializer())

}
}
def getOrCreateSerializer(schema: StructType): ExpressionEncoder.Serializer[Row] = {
serializerFactorySingletonInstance.getOrCreateSerializer(schema)
}

private class RowSerializerQueue extends RowSerializerQueueBase() {
override def borrowSerializer(schema: StructType): ExpressionEncoder.Serializer[Row] = {
lastBorrowedAny.set(Instant.now.toEpochMilli)
Option.apply(objectPool.poll()) match {
case Some(serializer) =>
estimatedSize.decrementAndGet()
serializer
case None => RowEncoder(schema).createSerializer()
}
}
def returnSerializerToPool(schema: StructType, serializer: ExpressionEncoder.Serializer[Row]): Boolean = {
serializerFactorySingletonInstance.returnSerializerToPool(schema, serializer)
}
}
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed an issue causing failures when using change feed in batch mode with a batch location and `ChangeFeedBatch.planInputPartitions` is called multiple times (for example because physcial query plan gets retrieved) and some changes have been made in the monitored container between those calls). - See [PR 39635](https://github.com/Azure/azure-sdk-for-java/pull/39635)

#### Other Changes
* Optimized the partitioning strategy implementation details to avoid unnecessarily high RU usage. - See [PR 39438](https://github.com/Azure/azure-sdk-for-java/pull/39438)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.types.StructType

import java.time.Instant

/**
* Spark serializers are not thread-safe - and expensive to create (dynamic code generation)
* So we will use this object pool to allow reusing serializers based on the targeted schema.
Expand All @@ -17,33 +15,15 @@ import java.time.Instant
* A clean-up task is used to purge serializers for schemas which weren't used anymore
* For each schema we have an object pool that will use a soft-limit to limit the memory footprint
*/
private object RowSerializerPool extends RowSerializerPoolBase[RowSerializerQueue] {
def getOrCreateSerializer(schema: StructType): ExpressionEncoder.Serializer[Row] = {
schemaScopedSerializerMap.get(schema) match {
case Some(objectPool) => objectPool.borrowSerializer(schema)
case None => RowEncoder(schema).createSerializer()
}
}
private object RowSerializerPool {
private val serializerFactorySingletonInstance =
new RowSerializerPoolInstance((schema: StructType) => RowEncoder(schema).createSerializer())

def returnSerializerToPool(schema: StructType, serializer: ExpressionEncoder.Serializer[Row]): Boolean = {
schemaScopedSerializerMap.get(schema) match {
case Some(objectPool) => objectPool.returnSerializer(serializer)
case None =>
val newQueue = new RowSerializerQueue()
newQueue.returnSerializer(serializer)
schemaScopedSerializerMap.putIfAbsent(schema, newQueue).isEmpty
}
}
}
def getOrCreateSerializer(schema: StructType): ExpressionEncoder.Serializer[Row] = {
serializerFactorySingletonInstance.getOrCreateSerializer(schema)
}

private class RowSerializerQueue extends RowSerializerQueueBase() {
override def borrowSerializer(schema: StructType): ExpressionEncoder.Serializer[Row] = {
lastBorrowedAny.set(Instant.now.toEpochMilli)
Option.apply(objectPool.poll()) match {
case Some(serializer) =>
estimatedSize.decrementAndGet()
serializer
case None => RowEncoder(schema).createSerializer()
}
}
def returnSerializerToPool(schema: StructType, serializer: ExpressionEncoder.Serializer[Row]): Boolean = {
serializerFactorySingletonInstance.returnSerializerToPool(schema, serializer)
}
}
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed an issue causing failures when using change feed in batch mode with a batch location and `ChangeFeedBatch.planInputPartitions` is called multiple times (for example because physcial query plan gets retrieved) and some changes have been made in the monitored container between those calls). - See [PR 39635](https://github.com/Azure/azure-sdk-for-java/pull/39635)

#### Other Changes
* Optimized the partitioning strategy implementation details to avoid unnecessarily high RU usage. - See [PR 39438](https://github.com/Azure/azure-sdk-for-java/pull/39438)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types.StructType

import java.time.Instant

/**
* Spark serializers are not thread-safe - and expensive to create (dynamic code generation)
* So we will use this object pool to allow reusing serializers based on the targeted schema.
Expand All @@ -17,37 +15,15 @@ import java.time.Instant
* A clean-up task is used to purge serializers for schemas which weren't used anymore
* For each schema we have an object pool that will use a soft-limit to limit the memory footprint
*/
private object RowSerializerPool extends RowSerializerPoolBase[RowSerializerQueue] {

def getOrCreateSerializer(schema: StructType): ExpressionEncoder.Serializer[Row] = {
schemaScopedSerializerMap.get(schema) match {
case Some(objectPool) => objectPool.borrowSerializer(schema)
case None => ExpressionEncoder.apply(schema).createSerializer()
}
}

def returnSerializerToPool(schema: StructType, serializer: ExpressionEncoder.Serializer[Row]): Boolean = {
schemaScopedSerializerMap.get(schema) match {
case Some(objectPool) => objectPool.returnSerializer(serializer)
case None =>
val newQueue = new RowSerializerQueue()
newQueue.returnSerializer(serializer)
schemaScopedSerializerMap.putIfAbsent(schema, newQueue).isEmpty
}
private object RowSerializerPool {
private val serializerFactorySingletonInstance =
new RowSerializerPoolInstance((schema: StructType) => ExpressionEncoder.apply(schema).createSerializer())

}
}

private class RowSerializerQueue extends RowSerializerQueueBase() {
def getOrCreateSerializer(schema: StructType): ExpressionEncoder.Serializer[Row] = {
serializerFactorySingletonInstance.getOrCreateSerializer(schema)
}

override def borrowSerializer(schema: StructType): ExpressionEncoder.Serializer[Row] = {
lastBorrowedAny.set(Instant.now.toEpochMilli)
Option.apply(objectPool.poll()) match {
case Some(serializer) =>
estimatedSize.decrementAndGet()
serializer
case None => ExpressionEncoder.apply(schema).createSerializer()
}
}
def returnSerializerToPool(schema: StructType, serializer: ExpressionEncoder.Serializer[Row]): Boolean = {
serializerFactorySingletonInstance.returnSerializerToPool(schema, serializer)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ private[cosmos] object SparkBridgeImplementationInternal extends BasicLoggingTra
isOffsetValid
}

def validateCollectionRidOfChangeFeedStates
(
continuationLeft: String,
continuationRight: String,
): Boolean = {
val extractedRidLeft = extractCollectionRid(continuationLeft)
val extractedRidRight = extractCollectionRid(continuationRight)
extractedRidLeft.equalsIgnoreCase(extractedRidRight)
}

def createChangeFeedStateJson
(
startOffsetContinuationState: String,
Expand Down
Loading

0 comments on commit 6c57b99

Please sign in to comment.