Skip to content

Commit

Permalink
Fix bug related to managedLedger properties (apache#7357)
Browse files Browse the repository at this point in the history
* Remove re-read from zk, and use the same mutex when update metadata.

* Add setProperty(), deleteProperty() API and test ledger changed when add metadata.

* Add AsyncSetProperty(), asyncDeleteProperty() API and add related unit tests.

* Fix unit test.

* Fix exception propagation.
  • Loading branch information
zhanghaou authored Jun 28, 2020
1 parent 8ab644c commit a3a63a3
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ interface OffloadCallback {
void offloadFailed(ManagedLedgerException exception, Object ctx);
}

interface SetPropertiesCallback {
void setPropertiesComplete(Map<String, String> properties, Object ctx);
interface UpdatePropertiesCallback {
void updatePropertiesComplete(Map<String, String> properties, Object ctx);

void setPropertiesFailed(ManagedLedgerException exception, Object ctx);
void updatePropertiesFailed(ManagedLedgerException exception, Object ctx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -450,12 +450,52 @@ public interface ManagedLedger {
*/
Map<String, String> getProperties();

/**
* Add key-value to propertiesMap.
*
* @param key key of property to add
* @param value value of property to add
* @throws InterruptedException
* @throws ManagedLedgerException
*/
void setProperty(String key, String value) throws InterruptedException, ManagedLedgerException;

/**
* Async add key-value to propertiesMap.
*
* @param key key of property to add
* @param value value of property to add
* @param callback a callback which will be supplied with the newest properties in managedLedger.
* @param ctx a context object which will be passed to the callback on completion.
**/
void asyncSetProperty(String key, String value, final AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx);

/**
* Delete the property by key.
*
* @param key key of property to delete
* @throws InterruptedException
* @throws ManagedLedgerException
*/
void deleteProperty(String key) throws InterruptedException, ManagedLedgerException;

/**
* Async delete the property by key.
*
* @param key key of property to delete
* @param callback a callback which will be supplied with the newest properties in managedLedger.
* @param ctx a context object which will be passed to the callback on completion.
*/
void asyncDeleteProperty(String key, final AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx);

/**
* Update managed-ledger's properties.
*
* @param properties key-values of properties
* @throws InterruptedException
* @throws ManagedLedgerException
*/
void setProperties(Map<String, String> properties) throws InterruptedException;
void setProperties(Map<String, String> properties) throws InterruptedException, ManagedLedgerException;

/**
* Async update managed-ledger's properties.
Expand All @@ -464,9 +504,9 @@ public interface ManagedLedger {
* @param callback a callback which will be supplied with the newest properties in managedLedger.
* @param ctx a context object which will be passed to the callback on completion.
*/
void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.SetPropertiesCallback callback,
void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.UpdatePropertiesCallback callback,
Object ctx);

/**
* Trim consumed ledgers in background
* @param promise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,17 @@
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Range;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;

import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -67,7 +64,6 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
Expand All @@ -90,8 +86,8 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.SetPropertiesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
Expand Down Expand Up @@ -176,10 +172,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private ScheduledFuture<?> timeoutTask;

/**
* This lock is held while the ledgers list is updated asynchronously on the metadata store. Since we use the store
* This lock is held while the ledgers list or propertiesMap is updated asynchronously on the metadata store. Since we use the store
* version, we cannot have multiple concurrent updates.
*/
private final CallbackMutex ledgersListMutex = new CallbackMutex();
private final CallbackMutex metadataMutex = new CallbackMutex();
private final CallbackMutex trimmerMutex = new CallbackMutex();

private final CallbackMutex offloadMutex = new CallbackMutex();
Expand Down Expand Up @@ -1230,7 +1226,7 @@ public void operationComplete(Void v, Stat stat) {
log.debug("[{}] Updating of ledgers list after create complete. version={}", name, stat);
}
ledgersStat = stat;
ledgersListMutex.unlock();
metadataMutex.unlock();
updateLedgersIdsComplete(stat);
synchronized (ManagedLedgerImpl.this) {
mbean.addLedgerSwitchLatencySample(System.nanoTime() - lastLedgerCreationInitiationTimestamp,
Expand Down Expand Up @@ -1267,7 +1263,7 @@ public void operationFailed(MetaStoreException e) {
}
}, null);

ledgersListMutex.unlock();
metadataMutex.unlock();

synchronized (ManagedLedgerImpl.this) {
lastLedgerCreationFailureTimestamp = clock.millis();
Expand All @@ -1282,7 +1278,7 @@ public void operationFailed(MetaStoreException e) {
}

private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback) {
if (!ledgersListMutex.tryLock()) {
if (!metadataMutex.tryLock()) {
// Defer update for later
scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback), 100, TimeUnit.MILLISECONDS);
return;
Expand Down Expand Up @@ -2062,7 +2058,7 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) {
}

if (STATE_UPDATER.get(this) == State.CreatingLedger // Give up now and schedule a new trimming
|| !ledgersListMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list
|| !metadataMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list
scheduleDeferredTrimming(promise);
trimmerMutex.unlock();
return;
Expand Down Expand Up @@ -2101,7 +2097,7 @@ public void operationComplete(Void result, Stat stat) {
log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(),
TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
ledgersStat = stat;
ledgersListMutex.unlock();
metadataMutex.unlock();
trimmerMutex.unlock();

for (LedgerInfo ls : ledgersToDelete) {
Expand All @@ -2119,7 +2115,7 @@ public void operationComplete(Void result, Stat stat) {
@Override
public void operationFailed(MetaStoreException e) {
log.warn("[{}] Failed to update the list of ledgers after trimming", name, e);
ledgersListMutex.unlock();
metadataMutex.unlock();
trimmerMutex.unlock();

promise.completeExceptionally(e);
Expand Down Expand Up @@ -2531,15 +2527,15 @@ private CompletableFuture<Void> transformLedgerInfo(long ledgerId, LedgerInfoTra
private void tryTransformLedgerInfo(long ledgerId, LedgerInfoTransformation transformation,
CompletableFuture<Void> finalPromise) {
synchronized (this) {
if (!ledgersListMutex.tryLock()) {
if (!metadataMutex.tryLock()) {
// retry in 100 milliseconds
scheduledExecutor.schedule(
safeRun(() -> tryTransformLedgerInfo(ledgerId, transformation, finalPromise)), 100,
TimeUnit.MILLISECONDS);
} else { // lock acquired
CompletableFuture<Void> unlockingPromise = new CompletableFuture<>();
unlockingPromise.whenComplete((res, ex) -> {
ledgersListMutex.unlock();
metadataMutex.unlock();
if (ex != null) {
finalPromise.completeExceptionally(ex);
} else {
Expand Down Expand Up @@ -3020,6 +3016,10 @@ private ManagedLedgerInfo getManagedLedgerInfo() {
mlInfo.setTerminatedPosition(NestedPositionInfo.newBuilder().setLedgerId(lastConfirmedEntry.getLedgerId())
.setEntryId(lastConfirmedEntry.getEntryId()));
}
for (Map.Entry<String, String> property : propertiesMap.entrySet()) {
mlInfo.addProperties(MLDataFormats.KeyValue.newBuilder()
.setKey(property.getKey()).setValue(property.getValue()));
}

return mlInfo.build();
}
Expand Down Expand Up @@ -3271,57 +3271,96 @@ public Map<String, String> getProperties() {
}

@Override
public void setProperties(Map<String, String> properties) throws InterruptedException {
public void setProperty(String key, String value) throws InterruptedException, ManagedLedgerException {
Map<String, String> map = new HashMap<>();
map.put(key, value);
updateProperties(map, false, null);
}

@Override
public void asyncSetProperty(String key, String value, final UpdatePropertiesCallback callback, Object ctx) {
Map<String, String> map = new HashMap<>();
map.put(key, value);
asyncUpdateProperties(map, false, null, callback, ctx);
}

@Override
public void deleteProperty(String key) throws InterruptedException, ManagedLedgerException {
updateProperties(null, true, key);
}

@Override
public void asyncDeleteProperty(String key, final UpdatePropertiesCallback callback, Object ctx) {
asyncUpdateProperties(null, true, key, callback, ctx);
}

@Override
public void setProperties(Map<String, String> properties) throws InterruptedException, ManagedLedgerException {
updateProperties(properties, false, null);
}

@Override
public void asyncSetProperties(Map<String, String> properties, final UpdatePropertiesCallback callback,
Object ctx) {
asyncUpdateProperties(properties, false, null, callback, ctx);
}

private void updateProperties(Map<String, String> properties, boolean isDelete,
String deleteKey) throws InterruptedException, ManagedLedgerException {
final CountDownLatch latch = new CountDownLatch(1);
this.asyncSetProperties(properties, new SetPropertiesCallback() {
class Result {
ManagedLedgerException exception = null;
}
final Result result = new Result();
this.asyncUpdateProperties(properties, isDelete, deleteKey, new UpdatePropertiesCallback() {
@Override
public void setPropertiesComplete(Map<String, String> properties, Object ctx) {
public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
latch.countDown();
}

@Override
public void setPropertiesFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Update manageLedger's info failed:{}", name, exception.getMessage());
public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
result.exception = exception;
latch.countDown();
}
}, null);

latch.await();
if (!latch.await(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
throw new ManagedLedgerException("Timeout during update managedLedger's properties");
}

if (result.exception != null) {
log.error("[{}] Update managedLedger's properties failed", name, result.exception);
throw result.exception;
}
}

@Override
public void asyncSetProperties(Map<String, String> properties, final SetPropertiesCallback callback, Object ctx) {
store.getManagedLedgerInfo(name, false, new MetaStoreCallback<ManagedLedgerInfo>() {
private void asyncUpdateProperties(Map<String, String> properties, boolean isDelete,
String deleteKey, final UpdatePropertiesCallback callback, Object ctx) {
if (!metadataMutex.tryLock()) {
// Defer update for later
scheduledExecutor.schedule(() -> asyncUpdateProperties(properties, isDelete, deleteKey,
callback, ctx), 100, TimeUnit.MILLISECONDS);
return;
}
if (isDelete) {
propertiesMap.remove(deleteKey);
} else {
propertiesMap.putAll(properties);
}
store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
@Override
public void operationComplete(ManagedLedgerInfo result, Stat version) {
public void operationComplete(Void result, Stat version) {
ledgersStat = version;
// Update manageLedger's properties.
ManagedLedgerInfo.Builder info = ManagedLedgerInfo.newBuilder(result);
info.clearProperties();
for (Map.Entry<String, String> property : properties.entrySet()) {
info.addProperties(MLDataFormats.KeyValue.newBuilder().setKey(property.getKey()).setValue(property.getValue()));
}
store.asyncUpdateLedgerIds(name, info.build(), version, new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat version) {
ledgersStat = version;
propertiesMap.clear();
propertiesMap.putAll(properties);
callback.setPropertiesComplete(properties, ctx);
}

@Override
public void operationFailed(MetaStoreException e) {
log.error("[{}] Update manageLedger's info failed:{}", name, e.getMessage());
callback.setPropertiesFailed(e, ctx);
}
});
callback.updatePropertiesComplete(propertiesMap, ctx);
metadataMutex.unlock();
}

@Override
public void operationFailed(MetaStoreException e) {
log.error("[{}] Get manageLedger's info failed:{}", name, e.getMessage());
callback.setPropertiesFailed(e, ctx);
log.error("[{}] Update managedLedger's properties failed", name, e);
callback.updatePropertiesFailed(e, ctx);
metadataMutex.unlock();
}
});
}
Expand Down
Loading

0 comments on commit a3a63a3

Please sign in to comment.