Skip to content

Commit

Permalink
Avoid NPEs at ledger creation when DNS failures happen (apache#7403)
Browse files Browse the repository at this point in the history
* Avoid NPEs at ledger creation when DNS failures happen

* Removed unnecessary try/catch
  • Loading branch information
merlimat authored Jul 1, 2020
1 parent 86e2610 commit a230427
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 28 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,4 @@ docker.debug-info
examples/flink/src/main/java/org/apache/flink/avro/generated
pulsar-flink/src/test/java/org/apache/flink/avro/generated
pulsar-client/src/test/java/org/apache/pulsar/client/avro/generated
/build/
Original file line number Diff line number Diff line change
Expand Up @@ -2283,6 +2283,7 @@ void internalFlushPendingMarkDeletes() {

void createNewMetadataLedger(final VoidCallback callback) {
ledger.mbean.startCursorLedgerCreateOp();

ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> {

if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
Expand Down Expand Up @@ -2349,7 +2350,6 @@ public void deleteComplete(int rc, Object ctx) {
});
}));
}, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name));

}

private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
Expand Down Expand Up @@ -2818,7 +2818,7 @@ public long[] getDeletedBatchIndexesAsLongArray(PositionImpl position) {
return null;
}
}

void updateReadStats(int readEntriesCount, long readEntriesSize) {
this.entriesReadCount += readEntriesCount;
this.entriesReadSize += readEntriesSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.protocol.schema.StoredSchema;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.zookeeper.CreateMode;
Expand Down Expand Up @@ -500,20 +501,24 @@ private CompletableFuture<Long> addEntry(LedgerHandle ledgerHandle, SchemaStorag
private CompletableFuture<LedgerHandle> createLedger(String schemaId) {
Map<String, byte[]> metadata = LedgerMetadataUtils.buildMetadataForSchema(schemaId);
final CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
bookKeeper.asyncCreateLedger(
config.getManagedLedgerDefaultEnsembleSize(),
config.getManagedLedgerDefaultWriteQuorum(),
config.getManagedLedgerDefaultAckQuorum(),
BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
LedgerPassword,
(rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Failed to create ledger", rc, -1, -1));
} else {
future.complete(handle);
}
}, null, metadata
);
try {
bookKeeper.asyncCreateLedger(
config.getManagedLedgerDefaultEnsembleSize(),
config.getManagedLedgerDefaultWriteQuorum(),
config.getManagedLedgerDefaultAckQuorum(),
BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
LedgerPassword,
(rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Failed to create ledger", rc, -1, -1));
} else {
future.complete(handle);
}
}, null, metadata);
} catch (Throwable t) {
log.error("[{}] Encountered unexpected error when creating schema ledger", schemaId, t);
return FutureUtil.failedFuture(t);
}
return future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.RawBatchConverter;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -310,18 +311,24 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>

private CompletableFuture<LedgerHandle> createLedger(BookKeeper bk, Map<String,byte[]> metadata) {
CompletableFuture<LedgerHandle> bkf = new CompletableFuture<>();
bk.asyncCreateLedger(conf.getManagedLedgerDefaultEnsembleSize(),
conf.getManagedLedgerDefaultWriteQuorum(),
conf.getManagedLedgerDefaultAckQuorum(),
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD,
(rc, ledger, ctx) -> {
if (rc != BKException.Code.OK) {
bkf.completeExceptionally(BKException.create(rc));
} else {
bkf.complete(ledger);
}
}, null, metadata);

try {
bk.asyncCreateLedger(conf.getManagedLedgerDefaultEnsembleSize(),
conf.getManagedLedgerDefaultWriteQuorum(),
conf.getManagedLedgerDefaultAckQuorum(),
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD,
(rc, ledger, ctx) -> {
if (rc != BKException.Code.OK) {
bkf.completeExceptionally(BKException.create(rc));
} else {
bkf.complete(ledger);
}
}, null, metadata);
} catch (Throwable t) {
log.error("Encountered unexpected error when creating compaction ledger", t);
return FutureUtil.failedFuture(t);
}
return bkf;
}

Expand Down

0 comments on commit a230427

Please sign in to comment.