diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/LeaseStoreManagerImpl.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/LeaseStoreManagerImpl.java index af6ae3dccd675..5938608031200 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/LeaseStoreManagerImpl.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/LeaseStoreManagerImpl.java @@ -363,19 +363,23 @@ public Mono checkpoint(Lease lease, String continuationToken) { throw new IllegalArgumentException("continuationToken must be a non-empty string"); } - return this.leaseUpdater.updateLease( - lease, - this.createItemForLease(lease.getId()), - this.requestOptionsFactory.createRequestOptions(lease), - serverLease -> { - if (serverLease.getOwner() != null && !serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) { - logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner()); - throw new LeaseLostException(lease); - } - serverLease.setContinuationToken(continuationToken); + CosmosItem itemForLease = this.createItemForLease(lease.getId()); - return serverLease; - }); + return this.leaseDocumentClient.readItem(itemForLease, this.requestOptionsFactory.createRequestOptions(lease)) + .map( documentResourceResponse -> ServiceItemLease.fromDocument(documentResourceResponse.properties())) + .flatMap( refreshedLease -> this.leaseUpdater.updateLease( + refreshedLease, + this.createItemForLease(lease.getId()), + this.requestOptionsFactory.createRequestOptions(lease), + serverLease -> { + if (serverLease.getOwner() != null && !serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) { + logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner()); + throw new LeaseLostException(lease); + } + serverLease.setContinuationToken(continuationToken); + + return serverLease; + })); } @Override diff --git a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/PartitionProcessorImpl.java b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/PartitionProcessorImpl.java index a32f1f4e5d524..6a2960a59e7ed 100644 --- a/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/PartitionProcessorImpl.java +++ b/sdk/cosmos/microsoft-azure-cosmos/src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/PartitionProcessorImpl.java @@ -174,7 +174,6 @@ public RuntimeException getResultException() { private Mono dispatchChanges(FeedResponse response) { ChangeFeedObserverContext context = new ChangeFeedObserverContextImpl(this.settings.getPartitionKeyRangeId(), response, this.checkpointer); - this.observer.processChanges(context, response.results()); - return Mono.empty(); + return this.observer.processChanges(context, response.results()); } }