Skip to content

Commit

Permalink
Fix an issue where the continuation token was not properly saved in t…
Browse files Browse the repository at this point in the history
…he lease collection documents. (Azure#5508)
  • Loading branch information
milismsft authored Sep 25, 2019
1 parent 114b0a4 commit b14012f
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -363,19 +363,23 @@ public Mono<Lease> checkpoint(Lease lease, String continuationToken) {
throw new IllegalArgumentException("continuationToken must be a non-empty string");
}

return this.leaseUpdater.updateLease(
lease,
this.createItemForLease(lease.getId()),
this.requestOptionsFactory.createRequestOptions(lease),
serverLease -> {
if (serverLease.getOwner() != null && !serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) {
logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner());
throw new LeaseLostException(lease);
}
serverLease.setContinuationToken(continuationToken);
CosmosItem itemForLease = this.createItemForLease(lease.getId());

return serverLease;
});
return this.leaseDocumentClient.readItem(itemForLease, this.requestOptionsFactory.createRequestOptions(lease))
.map( documentResourceResponse -> ServiceItemLease.fromDocument(documentResourceResponse.properties()))
.flatMap( refreshedLease -> this.leaseUpdater.updateLease(
refreshedLease,
this.createItemForLease(lease.getId()),
this.requestOptionsFactory.createRequestOptions(lease),
serverLease -> {
if (serverLease.getOwner() != null && !serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) {
logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner());
throw new LeaseLostException(lease);
}
serverLease.setContinuationToken(continuationToken);

return serverLease;
}));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ public RuntimeException getResultException() {
private Mono<Void> dispatchChanges(FeedResponse<CosmosItemProperties> response) {
ChangeFeedObserverContext context = new ChangeFeedObserverContextImpl(this.settings.getPartitionKeyRangeId(), response, this.checkpointer);

this.observer.processChanges(context, response.results());
return Mono.empty();
return this.observer.processChanges(context, response.results());
}
}

0 comments on commit b14012f

Please sign in to comment.