Skip to content

Commit

Permalink
Added Nested partition key support feature (Azure#14338)
Browse files Browse the repository at this point in the history
* Added Nested partition key support feature

* Fixed double serialization issue, code review comments

* Fixed code review comments

* Code review comments

* Code review comments

* Removed unused imports

* Fixed query metrics test
  • Loading branch information
kushagraThapar authored Aug 26, 2020
1 parent 45d0389 commit 03a7e29
Show file tree
Hide file tree
Showing 31 changed files with 921 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import com.azure.cosmos.implementation.TracerProvider;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.query.QueryInfo;
import com.azure.cosmos.implementation.query.Transformer;
import com.azure.cosmos.models.CosmosConflictProperties;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
Expand All @@ -33,7 +32,6 @@
import com.azure.cosmos.util.UtilBridgeInternal;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

import java.util.List;
import java.util.function.Function;
Expand Down Expand Up @@ -608,7 +606,27 @@ public Mono<CosmosItemResponse<Object>> deleteItem(
}
ModelBridgeInternal.setPartitionKey(options, partitionKey);
RequestOptions requestOptions = ModelBridgeInternal.toRequestOptions(options);
return withContext(context -> deleteItemInternal(itemId, requestOptions, context));
return withContext(context -> deleteItemInternal(itemId, null, requestOptions, context));
}

/**
* Deletes the item.
* <p>
* After subscription the operation will be performed.
* The {@link Mono} upon successful completion will contain a single Cosmos item response for the deleted item.
*
* @param <T> the type parameter.
* @param item item to be deleted.
* @param options the request options.
* @return an {@link Mono} containing the Cosmos item resource response.
*/
public <T> Mono<CosmosItemResponse<Object>> deleteItem(T item, CosmosItemRequestOptions options) {
if (options == null) {
options = new CosmosItemRequestOptions();
}
RequestOptions requestOptions = ModelBridgeInternal.toRequestOptions(options);
InternalObjectNode internalObjectNode = InternalObjectNode.fromObjectToInternalObjectNode(item);
return withContext(context -> deleteItemInternal(internalObjectNode.getId(), internalObjectNode, requestOptions, context));
}

private String getItemLink(String itemId) {
Expand Down Expand Up @@ -741,11 +759,12 @@ String getLink() {

private Mono<CosmosItemResponse<Object>> deleteItemInternal(
String itemId,
InternalObjectNode internalObjectNode,
RequestOptions requestOptions,
Context context) {
Mono<CosmosItemResponse<Object>> responseMono = this.getDatabase()
.getDocClientWrapper()
.deleteDocument(getItemLink(itemId), requestOptions)
.deleteDocument(getItemLink(itemId), internalObjectNode, requestOptions)
.map(response -> ModelBridgeInternal.createCosmosAsyncItemResponseWithObjectType(response))
.single();
return database.getClient().getTracerProvider().traceEnabledCosmosItemResponsePublisher(responseMono,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,18 @@ public CosmosItemResponse<Object> deleteItem(String itemId, PartitionKey partiti
return this.blockDeleteItemResponse(asyncContainer.deleteItem(itemId, partitionKey, options));
}

/**
* Deletes an item in the current container.
*
* @param <T> the type parameter.
* @param item the item to be deleted.
* @param options the options.
* @return the Cosmos item response.
*/
public <T> CosmosItemResponse<Object> deleteItem(T item, CosmosItemRequestOptions options) {
return this.blockDeleteItemResponse(asyncContainer.deleteItem(item, options));
}

/**
* Gets the Cosmos scripts using the current container as context.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,19 @@ Mono<ResourceResponse<Document>> upsertDocument(String collectionLink, Object do
*/
Mono<ResourceResponse<Document>> deleteDocument(String documentLink, RequestOptions options);

/**
* Deletes a document
* <p>
* After subscription the operation will be performed.
* The {@link Mono} upon successful completion will contain a single resource response for the deleted document.
* In case of failure the {@link Mono} will error.
*
* @param internalObjectNode the internalObjectNode to delete (containing the id).
* @param options the request options.
* @return a {@link Mono} containing the single resource response for the deleted document or an error.
*/
Mono<ResourceResponse<Document>> deleteDocument(String documentLink, InternalObjectNode internalObjectNode, RequestOptions options);

/**
* Reads a document
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,23 @@ public InternalObjectNode(ObjectNode propertyBag) {
super(propertyBag);
}

/**
* fromObjectToInternalObjectNode returns InternalObjectNode
*/
public static InternalObjectNode fromObjectToInternalObjectNode(Object cosmosItem) {
if (cosmosItem instanceof InternalObjectNode) {
return (InternalObjectNode) cosmosItem;
} else if (cosmosItem instanceof byte[]) {
return new InternalObjectNode((byte[]) cosmosItem);
} else {
try {
return new InternalObjectNode(InternalObjectNode.MAPPER.writeValueAsString(cosmosItem));
} catch (IOException e) {
throw new IllegalArgumentException("Can't serialize the object into the json string", e);
}
}
}

/**
* fromObject returns Document for compatibility with V2 sdk
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1055,13 +1055,17 @@ private void addPartitionKeyInformation(RxDocumentServiceRequest request,
// For backward compatibility, if collection doesn't have partition key defined, we assume all documents
// have empty value for it and user doesn't need to specify it explicitly.
partitionKeyInternal = PartitionKeyInternal.getEmpty();
} else if (contentAsByteBuffer != null) {
} else if (contentAsByteBuffer != null || objectDoc != null) {
InternalObjectNode internalObjectNode;
if (objectDoc instanceof InternalObjectNode) {
internalObjectNode = (InternalObjectNode) objectDoc;
} else {
} else if (contentAsByteBuffer != null) {
contentAsByteBuffer.rewind();
internalObjectNode = new InternalObjectNode(contentAsByteBuffer);
} else {
// This is a safety check, this should not happen ever.
// If it does, it is a SDK bug
throw new IllegalStateException("ContentAsByteBuffer and objectDoc are null");
}

Instant serializationStartTime = Instant.now();
Expand Down Expand Up @@ -1424,10 +1428,17 @@ private Mono<ResourceResponse<Document>> replaceDocumentInternal(String document
@Override
public Mono<ResourceResponse<Document>> deleteDocument(String documentLink, RequestOptions options) {
DocumentClientRetryPolicy requestRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
return ObservableHelper.inlineIfPossibleAsObs(() -> deleteDocumentInternal(documentLink, options, requestRetryPolicy), requestRetryPolicy);
return ObservableHelper.inlineIfPossibleAsObs(() -> deleteDocumentInternal(documentLink, null, options, requestRetryPolicy), requestRetryPolicy);
}

private Mono<ResourceResponse<Document>> deleteDocumentInternal(String documentLink, RequestOptions options,
@Override
public Mono<ResourceResponse<Document>> deleteDocument(String documentLink, InternalObjectNode internalObjectNode, RequestOptions options) {
DocumentClientRetryPolicy requestRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
return ObservableHelper.inlineIfPossibleAsObs(() -> deleteDocumentInternal(documentLink, internalObjectNode, options, requestRetryPolicy),
requestRetryPolicy);
}

private Mono<ResourceResponse<Document>> deleteDocumentInternal(String documentLink, InternalObjectNode internalObjectNode, RequestOptions options,
DocumentClientRetryPolicy retryPolicyInstance) {
try {
if (StringUtils.isEmpty(documentLink)) {
Expand All @@ -1445,7 +1456,7 @@ private Mono<ResourceResponse<Document>> deleteDocumentInternal(String documentL

Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = collectionCache.resolveCollectionAsync(BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), request);

Mono<RxDocumentServiceRequest> requestObs = addPartitionKeyInformation(request, null, null, options, collectionObs);
Mono<RxDocumentServiceRequest> requestObs = addPartitionKeyInformation(request, null, internalObjectNode, options, collectionObs);

return requestObs.flatMap(req -> {
return this.delete(req, retryPolicyInstance)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ private Object[][] query() {
new Object[] { "Select * from c where c.id = 'wrongId' order by c.id", false },
new Object[] { "Select count(1) from c where c.id = 'wrongId' group by c.pk", false },
new Object[] { "Select distinct c.pk from c where c.id = 'wrongId'", false },
new Object[] { "Select * from c where c.id = 'wrongId'", null },
new Object[] { "Select top 1 * from c where c.id = 'wrongId'", null },
new Object[] { "Select * from c where c.id = 'wrongId' order by c.id", null },
new Object[] { "Select count(1) from c where c.id = 'wrongId' group by c.pk", null },
new Object[] { "Select distinct c.pk from c where c.id = 'wrongId'", null },
new Object[] { "Select * from c where c.id = 'wrongId'", false },
new Object[] { "Select top 1 * from c where c.id = 'wrongId'", false },
new Object[] { "Select * from c where c.id = 'wrongId' order by c.id", false },
new Object[] { "Select count(1) from c where c.id = 'wrongId' group by c.pk", false },
new Object[] { "Select distinct c.pk from c where c.id = 'wrongId'", false },
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,19 @@ public void deleteItem() throws Exception {
assertThat(deleteResponse.getStatusCode()).isEqualTo(204);
}

@Test(groups = { "simple" }, timeOut = TIMEOUT)
public void deleteItemUsingEntity() throws Exception {
InternalObjectNode properties = getDocumentDefinition(UUID.randomUUID().toString());
CosmosItemResponse<InternalObjectNode> itemResponse = container.createItem(properties);
CosmosItemRequestOptions options = new CosmosItemRequestOptions();

CosmosItemResponse<?> deleteResponse = container.deleteItem(itemResponse.getItem(), options);
assertThat(deleteResponse.getStatusCode()).isEqualTo(204);
}


@Test(groups = { "simple" }, timeOut = TIMEOUT)
public void readAllItems() throws Exception{
public void readAllItems() throws Exception {
InternalObjectNode properties = getDocumentDefinition(UUID.randomUUID().toString());
CosmosItemResponse<InternalObjectNode> itemResponse = container.createItem(properties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,35 @@ public void deleteDocument(String documentId) throws InterruptedException {
validateItemFailure(readObservable, notFoundValidator);
}

@Test(groups = { "simple" }, timeOut = TIMEOUT, dataProvider = "documentCrudArgProvider")
public void deleteDocumentUsingEntity(String documentId) throws InterruptedException {
InternalObjectNode docDefinition = getDocumentDefinition(documentId);

CosmosItemResponse<InternalObjectNode> documentResponse = container.createItem(docDefinition,
new CosmosItemRequestOptions()).block();

CosmosItemRequestOptions options = new CosmosItemRequestOptions();
Mono<CosmosItemResponse<Object>> deleteObservable = container.deleteItem(documentResponse.getItem(), options);

CosmosItemResponseValidator validator =
new CosmosItemResponseValidator.Builder<CosmosItemResponse<InternalObjectNode>>()
.nullResource()
.build();
this.validateItemSuccess(deleteObservable, validator);

// attempt to read document which was deleted
waitIfNeededForReplicasToCatchUp(getClientBuilder());

Mono<CosmosItemResponse<InternalObjectNode>> readObservable = container.readItem(documentId,
new PartitionKey(ModelBridgeInternal.getObjectFromJsonSerializable(docDefinition, "mypk")),
options, InternalObjectNode.class);
FailureValidator notFoundValidator = new FailureValidator.Builder()
.resourceNotFound()
.documentClientExceptionToStringExcludesHeader(HttpConstants.HttpHeaders.AUTHORIZATION)
.build();
validateItemFailure(readObservable, notFoundValidator);
}

@Test(groups = { "simple" }, timeOut = TIMEOUT, dataProvider = "documentCrudArgProvider")
public void deleteDocument_undefinedPK(String documentId) throws InterruptedException {
InternalObjectNode docDefinition = new InternalObjectNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@

import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.azure.spring.data.cosmos.core.ResponseDiagnostics;
import com.azure.spring.data.cosmos.core.ResponseDiagnosticsProcessor;
import com.azure.spring.data.cosmos.exception.IllegalQueryException;
import com.azure.spring.data.cosmos.repository.support.CosmosEntityInformation;
import com.fasterxml.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -72,42 +69,4 @@ public static String getStringIDValue(Object idValue) {
throw new IllegalQueryException("Type of id field must be String or Integer or Long");
}
}

/**
* Creates partition key from json node by converting the jsonNode type to appropriate Java type.
* @param jsonNode jsonNode domain object
* @param entityInfo entityInfo for the domainType
* @return cosmos partitionKey object
*/
public static PartitionKey createPartitionKey(JsonNode jsonNode, CosmosEntityInformation<?, ?> entityInfo) {
String partitionKeyName = entityInfo.getPartitionKeyFieldName();
if (partitionKeyName == null) {
return PartitionKey.NONE;
}
return new PartitionKey(getValue(jsonNode.get(partitionKeyName)));
}

private static Object getValue(JsonNode value) {
if (value.isValueNode()) {
switch (value.getNodeType()) {
case BOOLEAN:
return value.asBoolean();
case NUMBER:
if (value.isInt()) {
return value.asInt();
} else if (value.isLong()) {
return value.asLong();
} else if (value.isDouble()) {
return value.asDouble();
} else {
return value;
}
case STRING:
return value.asText();
default:
return value;
}
}
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ public interface CosmosOperations {
*/
<T> T insert(String containerName, T objectToSave, PartitionKey partitionKey);

/**
* Inserts item
* @param containerName must not be {@literal null}
* @param objectToSave must not be {@literal null}
* @param <T> type class of domain type
* @return the inserted item
*/
<T> T insert(String containerName, T objectToSave);

/**
* Upserts an item with partition key
*
Expand Down Expand Up @@ -153,12 +162,11 @@ public interface CosmosOperations {
/**
* Delete using entity
*
* @param <T> type class of domain type
* @param containerName the container name
* @param entity the entity object
* @param id the id
* @param partitionKey the partition key
*/
void deleteEntityById(String containerName, Object entity, Object id, PartitionKey partitionKey);
<T> void deleteEntity(String containerName, T entity);

/**
* Delete all items in a container
Expand Down
Loading

0 comments on commit 03a7e29

Please sign in to comment.