Skip to content

Commit

Permalink
Testing store response headers array performance (Azure#30596)
Browse files Browse the repository at this point in the history
  • Loading branch information
kushagraThapar authored Aug 23, 2022
1 parent af5eb8e commit 52bd48c
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import static com.azure.cosmos.CosmosDiagnostics.USER_AGENT_KEY;
Expand Down Expand Up @@ -152,7 +152,7 @@ public class CosmosException extends AzureException {
protected CosmosException(int statusCode, String message, Map<String, String> responseHeaders, Throwable cause) {
super(message, cause);
this.statusCode = statusCode;
this.responseHeaders = new ConcurrentSkipListMap<>(String.CASE_INSENSITIVE_ORDER);
this.responseHeaders = new ConcurrentHashMap<>();

// Since ConcurrentHashMap only takes non-null entries, so filtering them before putting them in.
if (responseHeaders != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Function;

/**
Expand All @@ -33,12 +33,19 @@ public class RxDocumentServiceResponse {
private CosmosDiagnostics cosmosDiagnostics;

public RxDocumentServiceResponse(DiagnosticsClientContext diagnosticsClientContext, StoreResponse response) {
this.headersMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
this.headersMap.putAll(response.getResponseHeaders());
String[] headerNames = response.getResponseHeaderNames();
String[] headerValues = response.getResponseHeaderValues();

this.headersMap = new HashMap<>(headerNames.length);

// Gets status code.
this.statusCode = response.getStatus();

// Extracts headers.
for (int i = 0; i < headerNames.length; i++) {
this.headersMap.put(headerNames[i], headerValues[i]);
}

this.storeResponse = response;
this.diagnosticsClientContext = diagnosticsClientContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,26 @@
import com.azure.cosmos.implementation.IRetryPolicy;
import com.azure.cosmos.implementation.ISessionToken;
import com.azure.cosmos.implementation.InternalServerErrorException;
import com.azure.cosmos.implementation.OpenConnectionResponse;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.RMResources;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.RxDocumentServiceResponse;
import com.azure.cosmos.implementation.SessionContainer;
import com.azure.cosmos.implementation.SessionTokenHelper;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.apachecommons.lang.math.NumberUtils;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.OpenConnectionResponse;
import com.azure.cosmos.implementation.apachecommons.lang.math.NumberUtils;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdOpenConnectionsHandler;
import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.function.Function;

Expand Down Expand Up @@ -151,11 +152,20 @@ private RxDocumentServiceResponse completeResponse(
StoreResponse storeResponse,
RxDocumentServiceRequest request) throws InternalServerErrorException {

Map<String, String> responseHeaders = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
responseHeaders.putAll(storeResponse.getResponseHeaders());
if (storeResponse.getResponseHeaderNames().length != storeResponse.getResponseHeaderValues().length) {
throw new InternalServerErrorException(RMResources.InvalidBackendResponse);
}

Map<String, String> headers = new HashMap<>(storeResponse.getResponseHeaderNames().length);
for (int idx = 0; idx < storeResponse.getResponseHeaderNames().length; idx++) {
String name = storeResponse.getResponseHeaderNames()[idx];
String value = storeResponse.getResponseHeaderValues()[idx];

headers.put(name, value);
}

this.updateResponseHeader(request, responseHeaders);
this.captureSessionToken(request, responseHeaders);
this.updateResponseHeader(request, headers);
this.captureSessionToken(request, headers);
BridgeInternal.recordRetryContextEndTime(request.requestContext.cosmosDiagnostics);
RxDocumentServiceResponse rxDocumentServiceResponse =
new RxDocumentServiceResponse(this.diagnosticsClientContext, storeResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Locale;
import java.util.Map;
import com.azure.cosmos.implementation.Utils;
import java.util.concurrent.ConcurrentSkipListMap;

/**
* Used internally to represents a response from the store.
*/
public class StoreResponse {
final static Logger LOGGER = LoggerFactory.getLogger(StoreResponse.class);
final private int status;
final private Map<String, String> responseHeaders;
final private String[] responseHeaderNames;
final private String[] responseHeaderValues;
final private byte[] content;

private int pendingRequestQueueSize;
Expand All @@ -40,8 +40,15 @@ public StoreResponse(
byte[] content) {

requestTimeline = RequestTimeline.empty();
responseHeaders = new ConcurrentSkipListMap<>(String.CASE_INSENSITIVE_ORDER);
responseHeaders.putAll(headerMap);
responseHeaderNames = new String[headerMap.size()];
responseHeaderValues = new String[headerMap.size()];

int i = 0;
for (Map.Entry<String, String> headerEntry : headerMap.entrySet()) {
responseHeaderNames[i] = headerEntry.getKey();
responseHeaderValues[i] = headerEntry.getValue();
i++;
}

this.status = status;
this.content = content;
Expand All @@ -54,8 +61,12 @@ public int getStatus() {
return status;
}

public Map<String, String> getResponseHeaders() {
return responseHeaders;
public String[] getResponseHeaderNames() {
return responseHeaderNames;
}

public String[] getResponseHeaderValues() {
return responseHeaderValues;
}

public int getRntbdChannelTaskQueueSize() {
Expand Down Expand Up @@ -128,7 +139,17 @@ public String getCorrelatedActivityId() {
}

public String getHeaderValue(String attribute) {
return responseHeaders.get(attribute);
if (this.responseHeaderValues == null || this.responseHeaderNames.length != this.responseHeaderValues.length) {
return null;
}

for (int i = 0; i < responseHeaderNames.length; i++) {
if (responseHeaderNames[i].equalsIgnoreCase(attribute)) {
return responseHeaderValues[i];
}
}

return null;
}

public double getRequestCharge() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.cosmos.implementation.InternalServerErrorException;
import com.azure.cosmos.implementation.RMResources;
import com.azure.cosmos.implementation.RequestChargeTracker;
import com.azure.cosmos.implementation.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -122,12 +123,18 @@ public StoreResponse toResponse(RequestChargeTracker requestChargeTracker) {
}

private static void setRequestCharge(StoreResponse response, CosmosException cosmosException, double totalRequestCharge) {
String totalRequestChargeString = Double.toString(totalRequestCharge);
if (cosmosException != null) {
cosmosException.getResponseHeaders().put(HttpConstants.HttpHeaders.REQUEST_CHARGE, totalRequestChargeString);
} else {
// Set total charge as final charge for the response.
response.getResponseHeaders().put(HttpConstants.HttpHeaders.REQUEST_CHARGE, totalRequestChargeString);
cosmosException.getResponseHeaders().put(HttpConstants.HttpHeaders.REQUEST_CHARGE,
Double.toString(totalRequestCharge));
}
// Set total charge as final charge for the response.
else if (response.getResponseHeaderNames() != null) {
for (int i = 0; i < response.getResponseHeaderNames().length; ++i) {
if (Strings.areEqualIgnoreCase(response.getResponseHeaderNames()[i], HttpConstants.HttpHeaders.REQUEST_CHARGE)) {
response.getResponseHeaderValues()[i] = Double.toString(totalRequestCharge);
break;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ public void headerNamesAreCaseInsensitive() {
HashMap<String, String> headerMap = new HashMap<>();
headerMap.put("key1", "value1");
headerMap.put("key2", "value2");
headerMap.put("KEY1", "value3");
headerMap.put("KEY3", "value3");

StoreResponse sp = new StoreResponse(200, headerMap, getUTF8BytesOrNull(content));

assertThat(sp.getStatus()).isEqualTo(200);
assertThat(sp.getResponseBody()).isEqualTo(getUTF8BytesOrNull(content));
assertThat(sp.getHeaderValue("key1")).isEqualTo("value3");
assertThat(sp.getHeaderValue("kEy1")).isEqualTo("value3");
assertThat(sp.getHeaderValue("KEY2")).isEqualTo("value2");
assertThat(sp.getHeaderValue("keY1")).isEqualTo("value1");
assertThat(sp.getHeaderValue("kEy2")).isEqualTo("value2");
assertThat(sp.getHeaderValue("KEY3")).isEqualTo("value3");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import org.assertj.core.api.Condition;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -38,7 +38,7 @@ public Builder hasHeader(String headerKey) {
validators.add(new StoreResponseValidator() {
@Override
public void validate(StoreResponse resp) {
assertThat(resp.getResponseHeaders().containsKey(headerKey)).isTrue();
assertThat(Arrays.asList(resp.getResponseHeaderNames()).contains(headerKey)).isTrue();
}
});
return this;
Expand All @@ -48,9 +48,9 @@ public Builder withHeader(String headerKey, String headerValue) {
validators.add(new StoreResponseValidator() {
@Override
public void validate(StoreResponse resp) {
Map<String, String> responseHeaders = resp.getResponseHeaders();
assertThat(responseHeaders.containsKey(headerKey)).isTrue();
assertThat(responseHeaders.get(headerKey)).isEqualTo(headerValue);
assertThat(Arrays.asList(resp.getResponseHeaderNames())).asList().contains(headerKey);
int index = Arrays.asList(resp.getResponseHeaderNames()).indexOf(headerKey);
assertThat(resp.getResponseHeaderValues()[index]).isEqualTo(headerValue);
}
});
return this;
Expand All @@ -61,9 +61,10 @@ public Builder withHeaderValueCondition(String headerKey, Condition<String> cond
validators.add(new StoreResponseValidator() {
@Override
public void validate(StoreResponse resp) {
Map<String, String> responseHeaders = resp.getResponseHeaders();
assertThat(responseHeaders.containsKey(headerKey)).isTrue();
condition.matches(responseHeaders.get(headerKey));
assertThat(Arrays.asList(resp.getResponseHeaderNames())).asList().contains(headerKey);
int index = Arrays.asList(resp.getResponseHeaderNames()).indexOf(headerKey);
String value = resp.getResponseHeaderValues()[index];
condition.matches(value);
}
});
return this;
Expand Down

0 comments on commit 52bd48c

Please sign in to comment.