Skip to content

Commit

Permalink
Fix for connection state listener (Azure#27242)
Browse files Browse the repository at this point in the history
* collect connection state listner metrics
* move connection state listener to RntbdRequestManager level, which improvements the handling when the connection is idle

Co-authored-by: annie-mac <[email protected]>
Co-authored-by: annie-mac <[email protected]>
  • Loading branch information
3 people authored Apr 18, 2022
1 parent 162b541 commit c1210cf
Show file tree
Hide file tree
Showing 18 changed files with 206 additions and 81 deletions.
5 changes: 1 addition & 4 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@
#### Features Added
* Added Beta API `continueOnInitError` in `ThroughputControlGroupConfigBuilder` - See [PR 27702](https://github.com/Azure/azure-sdk-for-java/pull/27702)

#### Breaking Changes

#### Bugs Fixed

#### Other Changes
* Added improvement for handling idle connection close event when `connectionEndpointRediscoveryEnabled` is enabled - See [PR 27242](https://github.com/Azure/azure-sdk-for-java/pull/27242)

### 4.28.1 (2022-04-08)
#### Other Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public Mono<AddressInformation[]> resolveAsync(
}

@Override
public void updateAddresses(RxDocumentServiceRequest request, URI serverKey) {
public int updateAddresses(URI serverKey) {
throw new NotImplementedException("updateAddresses() is not supported in AddressResolver");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class GatewayAddressCache implements IAddressCache {
Expand Down Expand Up @@ -167,10 +168,12 @@ public GatewayAddressCache(
}

@Override
public void updateAddresses(final URI serverKey) {
public int updateAddresses(final URI serverKey) {

Objects.requireNonNull(serverKey, "expected non-null serverKey");

AtomicInteger updatedCacheEntryCount = new AtomicInteger(0);

if (this.tcpConnectionEndpointRediscoveryEnabled) {
this.serverPartitionAddressToPkRangeIdMap.computeIfPresent(serverKey, (uri, partitionKeyRangeIdentitySet) -> {

Expand All @@ -180,6 +183,8 @@ public void updateAddresses(final URI serverKey) {
} else {
this.serverPartitionAddressCache.remove(partitionKeyRangeIdentity);
}

updatedCacheEntryCount.incrementAndGet();
}

return null;
Expand All @@ -188,6 +193,7 @@ public void updateAddresses(final URI serverKey) {
logger.warn("tcpConnectionEndpointRediscovery is not enabled, should not reach here.");
}

return updatedCacheEntryCount.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class GlobalAddressResolver implements IAddressResolver {
Expand Down Expand Up @@ -110,23 +111,23 @@ Mono<Void> openAsync(DocumentCollection collection) {
}

@Override
public void updateAddresses(final RxDocumentServiceRequest request, final URI serverKey) {
public int updateAddresses(final URI serverKey) {

Objects.requireNonNull(request, "expected non-null request");
Objects.requireNonNull(serverKey, "expected non-null serverKey");

if (this.tcpConnectionEndpointRediscoveryEnabled) {
URI serviceEndpoint = this.endpointManager.resolveServiceEndpoint(request);
this.addressCacheByEndpoint.computeIfPresent(serviceEndpoint, (ignored, endpointCache) -> {
AtomicInteger updatedCount = new AtomicInteger(0);

if (this.tcpConnectionEndpointRediscoveryEnabled) {
for (EndpointCache endpointCache : this.addressCacheByEndpoint.values()) {
final GatewayAddressCache addressCache = endpointCache.addressCache;
addressCache.updateAddresses(serverKey);

return endpointCache;
});
updatedCount.accumulateAndGet(addressCache.updateAddresses(serverKey), (oldValue, newValue) -> oldValue + newValue);
}
} else {
logger.warn("tcpConnectionEndpointRediscovery is not enabled, should not reach here.");
}

return updatedCount.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public interface IAddressCache {
*
*
*/
void updateAddresses(URI serverKey);
int updateAddresses(URI serverKey);

/**
* Resolves physical addresses by either PartitionKeyRangeIdentity.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ Mono<AddressInformation[]> resolveAsync(
RxDocumentServiceRequest request,
boolean forceRefreshPartitionAddresses);

void updateAddresses(RxDocumentServiceRequest request, URI serverKey);
int updateAddresses(URI serverKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,18 @@ public class RntbdClientChannelHandler extends ChannelInitializer<Channel> imple
private static final Logger logger = LoggerFactory.getLogger(RntbdClientChannelHandler.class);
private final ChannelHealthChecker healthChecker;
private final Config config;
private final RntbdConnectionStateListener connectionStateListener;

RntbdClientChannelHandler(final Config config, final ChannelHealthChecker healthChecker) {
RntbdClientChannelHandler(
final Config config,
final ChannelHealthChecker healthChecker,
final RntbdConnectionStateListener connectionStateListener) {
checkNotNull(healthChecker, "expected non-null healthChecker");
checkNotNull(config, "expected non-null config");

this.healthChecker = healthChecker;
this.config = config;
this.connectionStateListener = connectionStateListener;
}

/**
Expand Down Expand Up @@ -98,7 +104,9 @@ protected void initChannel(final Channel channel) {

final RntbdRequestManager requestManager = new RntbdRequestManager(
this.healthChecker,
this.config.maxRequestsPerChannel());
this.config.maxRequestsPerChannel(),
this.connectionStateListener);

final long idleConnectionTimerResolutionInNanos = config.idleConnectionTimerResolutionInNanos();
final ChannelPipeline pipeline = channel.pipeline();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,30 +183,39 @@ public final class RntbdClientChannelPool implements ChannelPool {

private final ScheduledFuture<?> pendingAcquisitionExpirationFuture;
private final ClientTelemetry clientTelemetry;

/**
* Initializes a newly created {@link RntbdClientChannelPool} instance.
*
* @param bootstrap the {@link Bootstrap} that is used for connections.
* @param config the {@link Config} that is used for the channel pool instance created.
* @param clientTelemetry the {@link ClientTelemetry} that is used to track client telemetry related metrics.
* @param connectionStateListener the {@link RntbdConnectionStateListener}.
*/
RntbdClientChannelPool(final RntbdServiceEndpoint endpoint, final Bootstrap bootstrap, final Config config, final ClientTelemetry clientTelemetry) {
this(endpoint, bootstrap, config, new RntbdClientChannelHealthChecker(config), clientTelemetry);
RntbdClientChannelPool(
final RntbdServiceEndpoint endpoint,
final Bootstrap bootstrap,
final Config config,
final ClientTelemetry clientTelemetry,
final RntbdConnectionStateListener connectionStateListener) {
this(endpoint, bootstrap, config, new RntbdClientChannelHealthChecker(config), clientTelemetry, connectionStateListener);
}

private RntbdClientChannelPool(
final RntbdServiceEndpoint endpoint,
final Bootstrap bootstrap,
final Config config,
final RntbdClientChannelHealthChecker healthChecker,
final ClientTelemetry clientTelemetry) {
final ClientTelemetry clientTelemetry,
final RntbdConnectionStateListener connectionStateListener) {

checkNotNull(endpoint, "expected non-null endpoint");
checkNotNull(bootstrap, "expected non-null bootstrap");
checkNotNull(config, "expected non-null config");
checkNotNull(healthChecker, "expected non-null healthChecker");

this.endpoint = endpoint;
this.poolHandler = new RntbdClientChannelHandler(config, healthChecker);
this.poolHandler = new RntbdClientChannelHandler(config, healthChecker, connectionStateListener);
this.executor = bootstrap.config().group().next();
this.healthChecker = healthChecker;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

package com.azure.cosmos.implementation.directconnectivity.rntbd;

import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.directconnectivity.IAddressResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -22,6 +20,7 @@ public class RntbdConnectionStateListener {

private final IAddressResolver addressResolver;
private final RntbdEndpoint endpoint;
private final RntbdConnectionStateListenerMetrics metrics;

// endregion

Expand All @@ -30,55 +29,45 @@ public class RntbdConnectionStateListener {
public RntbdConnectionStateListener(final IAddressResolver addressResolver, final RntbdEndpoint endpoint) {
this.addressResolver = checkNotNull(addressResolver, "expected non-null addressResolver");
this.endpoint = checkNotNull(endpoint, "expected non-null endpoint");
this.metrics = new RntbdConnectionStateListenerMetrics();
}

// endregion

// region Methods

public void onException(final RxDocumentServiceRequest request, Throwable exception) {
checkNotNull(request, "expect non-null request");
public void onException(Throwable exception) {
checkNotNull(exception, "expect non-null exception");

if (exception instanceof GoneException) {
final Throwable cause = exception.getCause();

if (cause != null) {

// GoneException was produced by the client, not the server
//
// This could occur for example:
//
// * an operation fails due to an IOException which indicates a connection reset by the server,
// * a channel closes unexpectedly because the server stopped taking requests, or
// * an error was detected by the transport client (e.g., IllegalStateException)
// * a request timed out in pending acquisition queue
// * a request failed fast in admission control layer due to high load
// * channel connect timed out
//
// Currently, only ClosedChannelException will raise onConnectionEvent since it is more sure of a signal the server is going down.

if (cause instanceof IOException) {

if (cause instanceof ClosedChannelException) {
this.onConnectionEvent(RntbdConnectionEvent.READ_EOF, request, exception);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Will not raise the connection state change event for error {}", cause);
}
}
this.metrics.record();

// * An operation could fail due to an IOException which indicates a connection reset by the server,
// * or a channel closes unexpectedly because the server stopped taking requests
//
// Currently, only ClosedChannelException will raise onConnectionEvent since it is more sure of a signal the server is going down.

if (exception instanceof IOException) {

if (exception instanceof ClosedChannelException) {
this.metrics.recordAddressUpdated(this.onConnectionEvent(RntbdConnectionEvent.READ_EOF, exception));
} else {
if (logger.isDebugEnabled()) {
logger.debug("Will not raise the connection state change event for error", exception);
}
}
}
}

public RntbdConnectionStateListenerMetrics getMetrics() {
return this.metrics;
}

// endregion

// region Privates

private void onConnectionEvent(final RntbdConnectionEvent event, final RxDocumentServiceRequest request, final Throwable exception) {
private int onConnectionEvent(final RntbdConnectionEvent event, final Throwable exception) {

checkNotNull(request, "expected non-null request");
checkNotNull(exception, "expected non-null exception");

if (event == RntbdConnectionEvent.READ_EOF) {
Expand All @@ -92,11 +81,13 @@ private void onConnectionEvent(final RntbdConnectionEvent event, final RxDocumen
RntbdObjectMapper.toJson(exception));
}

this.addressResolver.updateAddresses(request, this.endpoint.serverKey());
return this.addressResolver.updateAddresses(this.endpoint.serverKey());
} else {
logger.warn("Endpoint closed while onConnectionEvent: {}", this.endpoint);
}
}

return 0;
}
// endregion
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.directconnectivity.rntbd;

import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;

@JsonSerialize(using = RntbdConnectionStateListenerMetrics.RntbdConnectionStateListenerMetricsJsonSerializer.class)
public final class RntbdConnectionStateListenerMetrics implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(RntbdConnectionStateListenerMetrics.class);

private final AtomicReference<Instant> lastCallTimestamp;
private final AtomicReference<Pair<Instant, Integer>> lastActionableContext;

public RntbdConnectionStateListenerMetrics() {

this.lastCallTimestamp = new AtomicReference<>();
this.lastActionableContext = new AtomicReference<>();
}

public void recordAddressUpdated(int addressEntryUpdatedCount) {
this.lastActionableContext.set(Pair.of(this.lastCallTimestamp.get(), addressEntryUpdatedCount));
}

public void record() {
this.lastCallTimestamp.set(Instant.now());
}

final static class RntbdConnectionStateListenerMetricsJsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer<RntbdConnectionStateListenerMetrics> {

public RntbdConnectionStateListenerMetricsJsonSerializer() {
}

@Override
public void serialize(RntbdConnectionStateListenerMetrics metrics, JsonGenerator writer, SerializerProvider serializers) throws IOException {
writer.writeStartObject();

writer.writeStringField(
"lastCallTimestamp",
metrics.lastCallTimestamp.get() == null ? "N/A" : metrics.lastCallTimestamp.toString());

if (metrics.lastActionableContext.get() != null) {
writer.writeStringField("lastActionableContext", metrics.lastActionableContext.get().toString());
}

writer.writeEndObject();
}
}
}
Loading

0 comments on commit c1210cf

Please sign in to comment.