Skip to content

Commit

Permalink
add read-timeout option to async managed-ledger read (apache#3019)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored Jan 10, 2019
1 parent 06c819e commit 99f05b5
Show file tree
Hide file tree
Showing 12 changed files with 455 additions and 161 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,9 @@ autoSkipNonRecoverableData=false
# operation timeout while updating managed-ledger metadata.
managedLedgerMetadataOperationsTimeoutSeconds=60

# Read entries timeout when broker tries to read messages from bookkeeper.
managedLedgerReadEntryTimeoutSeconds=120

### --- Load balancer --- ###

# Enable load balancer
Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,9 @@ autoSkipNonRecoverableData=false
# operation timeout while updating managed-ledger metadata.
managedLedgerMetadataOperationsTimeoutSeconds=60

# Read entries timeout when broker tries to read messages from bookkeeper.
managedLedgerReadEntryTimeoutSeconds=120

### --- Load balancer --- ###

loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class ManagedLedgerConfig {
private long offloadLedgerDeletionLagMs = TimeUnit.HOURS.toMillis(4);
private long offloadAutoTriggerSizeThresholdBytes = -1;
private long metadataOperationsTimeoutSeconds = 60;
private long readEntryTimeoutSeconds = 120;

private DigestType digestType = DigestType.CRC32C;
private byte[] password = "".getBytes(Charsets.UTF_8);
Expand Down Expand Up @@ -532,4 +533,25 @@ public ManagedLedgerConfig setMetadataOperationsTimeoutSeconds(long metadataOper
this.metadataOperationsTimeoutSeconds = metadataOperationsTimeoutSeconds;
return this;
}

/**
* Ledger read-entry timeout
*
* @return
*/
public long getReadEntryTimeoutSeconds() {
return readEntryTimeoutSeconds;
}

/**
* Ledger read entry timeout after which callback will be completed with failure. (disable timeout by setting
* readTimeoutSeconds <= 0)
*
* @param readTimeoutSeconds
* @return
*/
public ManagedLedgerConfig setReadEntryTimeoutSeconds(long readEntryTimeoutSeconds) {
this.readEntryTimeoutSeconds = readEntryTimeoutSeconds;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ public class ManagedCursorImpl implements ManagedCursor {
@SuppressWarnings("unused")
private volatile OpReadEntry waitingReadOp = null;

private static final int FALSE = 0;
private static final int TRUE = 1;
public static final int FALSE = 0;
public static final int TRUE = 1;
private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> RESET_CURSOR_IN_PROGRESS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "resetCursorInProgress");
@SuppressWarnings("unused")
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
Expand All @@ -32,6 +34,8 @@
import org.apache.bookkeeper.mledger.Position;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.TRUE;
import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;

class OpReadEntry implements ReadEntriesCallback {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private ReadOnlyCursor createReadOnlyCursor(PositionImpl startPosition) {
@Override
void asyncReadEntry(PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
this.getLedgerHandle(position.getLedgerId()).thenAccept((ledger) -> {
this.entryCache.asyncReadEntry(ledger, position, callback, ctx);
asyncReadEntry(ledger, position, callback, ctx);
}).exceptionally((ex) -> {
log.error("[{}] Error opening ledger for reading at position {} - {}", new Object[]{this.name, position, ex.getMessage()});
callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -54,13 +55,16 @@
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
Expand Down Expand Up @@ -2248,4 +2252,76 @@ public void createComplete(int rc, LedgerHandle lh, Object ctx) {

ledger.close();
}

/**
* It verifies that asyncRead timesout if it doesn't receive response from bk-client in configured timeout
*
* @throws Exception
*/
@Test
public void testManagedLedgerWithReadEntryTimeOut() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig().setReadEntryTimeoutSeconds(1);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("timeout_ledger_test", config);

BookKeeper bk = mock(BookKeeper.class);
doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), any(), any(), any(), any(), any());
AtomicReference<ManagedLedgerException> responseException1 = new AtomicReference<>();
CountDownLatch latch1 = new CountDownLatch(1);

CompletableFuture<LedgerEntries> entriesFuture = new CompletableFuture<>();
ReadHandle ledgerHandle = mock(ReadHandle.class);
doReturn(entriesFuture).when(ledgerHandle).readAsync(PositionImpl.earliest.getLedgerId(),
PositionImpl.earliest.getEntryId());

// (1) test read-timeout for: ManagedLedger.asyncReadEntry(..)
ledger.asyncReadEntry(ledgerHandle, PositionImpl.earliest, new ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
responseException1.set(null);
latch1.countDown();
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
responseException1.set(exception);
latch1.countDown();
}
}, null);
ledger.asyncCreateLedger(bk, config, null, new CreateCallback() {
@Override
public void createComplete(int rc, LedgerHandle lh, Object ctx) {

}
}, Collections.emptyMap());
latch1.await(config.getReadEntryTimeoutSeconds() + 2, TimeUnit.SECONDS);
assertNotNull(responseException1.get());
assertEquals(responseException1.get().getMessage(), BKException.getMessage(BKException.Code.TimeoutException));

// (2) test read-timeout for: ManagedLedger.asyncReadEntry(..)
CountDownLatch latch2 = new CountDownLatch(1);
AtomicReference<ManagedLedgerException> responseException2 = new AtomicReference<>();
PositionImpl readPositionRef = PositionImpl.earliest;
ManagedCursorImpl cursor = new ManagedCursorImpl(bk, config, ledger, "cursor1");
OpReadEntry opReadEntry = OpReadEntry.create(cursor, readPositionRef, 1, new ReadEntriesCallback() {

@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
latch2.countDown();
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
responseException2.set(exception);
latch2.countDown();
}

}, null);
ledger.asyncReadEntry(ledgerHandle, PositionImpl.earliest.getEntryId(), PositionImpl.earliest.getEntryId(),
false, opReadEntry, null);
latch2.await(config.getReadEntryTimeoutSeconds() + 2, TimeUnit.SECONDS);
assertNotNull(responseException2.get());
assertEquals(responseException2.get().getMessage(), BKException.getMessage(BKException.Code.TimeoutException));

ledger.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "operation timeout while updating managed-ledger metadata."
)
private long managedLedgerMetadataOperationsTimeoutSeconds = 60;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Read entries timeout when broker tries to read messages from bookkeeper "
+ "(disable timeout by setting readTimeoutSeconds <= 0)"
)
private long managedLedgerReadEntryTimeoutSeconds = 60;



/*** --- Load balancer --- ****/
@FieldContext(
Expand Down Expand Up @@ -1134,4 +1142,4 @@ public Optional<Integer> getWebServicePort() {
public Optional<Integer> getWebServicePortTls() {
return Optional.ofNullable(webServicePortTls);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,7 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t

managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
Expand Down
6 changes: 6 additions & 0 deletions site/_data/config/broker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,12 @@ configs:
- name: autoSkipNonRecoverableData
default: 'false'
description: Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list.It helps when data-ledgers gets corrupted at bookkeeper and managed-cursor is stuck at that ledger.
- name: managedLedgerMetadataOperationsTimeoutSeconds
default: '60'
description: Operation timeout while updating managed-ledger metadata.
- name: managedLedgerReadEntryTimeoutSeconds
default: '120'
description: Read entries timeout when broker tries to read messages from bookkeeper.
- name: loadBalancerEnabled
default: 'true'
description: Enable load balancer
Expand Down
4 changes: 4 additions & 0 deletions site/_data/config/standalone.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ configs:
default: '14400'
- name: autoSkipNonRecoverableData
default: 'false'
- name: managedLedgerMetadataOperationsTimeoutSeconds
default: '60'
- name: managedLedgerReadEntryTimeoutSeconds
default: '120'
- name: loadBalancerEnabled
default: 'false'
- name: loadBalancerPlacementStrategy
Expand Down

0 comments on commit 99f05b5

Please sign in to comment.