Skip to content

Commit

Permalink
[Pulsar-sql]Using pulsar SQL query messages will appear `NoSuchLedger… (
Browse files Browse the repository at this point in the history
apache#9910)

* [Pulsar-sql]Using pulsar SQL query messages will appear `NoSuchLedger` when zk root directory changed (apache#2258)

Fixes apache#2258

*Motivation*

When zookeeper ledgers root path is changed, using pulsar-sql to query messages will cause `BKNoSuchLedgerExistsException`.

*Modifications*

To use new DefaultBkFactory(clientConfiguration),so that zk will be null in Bookeeper constructor;(Bookeeper.java row 113)
when metadataDriver will be initialized(Bookeeper.java row 167),zookeeper conection is null; we can jump to another  branch.If we have done the above steps,
finally, zkServers will be localhost:2181 rather than localhost:2181/pulsar in row 168(ZKMetadataDriverBase.java); the path that we use to get ledger is localhost:2181/pulsar/ledger/00/0000/L0001 rather than localhost:2181/pulsar/pulsar/ledger/00/0000/L0001;

* [Pulsar-sql]Using pulsar SQL query messages will appear `NoSuchLedger` when zk root directory changed (apache#2258)

Fixes apache#2258

*Motivation*

When zookeeper ledgers root path is changed, using pulsar-sql to query messages will cause `BKNoSuchLedgerExistsException`.

*Modifications*

To use new DefaultBkFactory(clientConfiguration),so that zk will be null in Bookeeper constructor;(Bookeeper.java row 113)
when metadataDriver will be initialized(Bookeeper.java row 167),zookeeper conection is null; we can jump to another  branch.If we have done the above steps,
finally, zkServers will be localhost:2181 rather than localhost:2181/pulsar in row 168(ZKMetadataDriverBase.java); the path that we use to get ledger is localhost:2181/pulsar/ledger/00/0000/L0001 rather than localhost:2181/pulsar/pulsar/ledger/00/0000/L0001;

* [Pulsar-sql]Using pulsar SQL query messages will appear `NoSuchLedger` when zk root directory changed (apache#2258)

Fixes apache#2258

*Motivation*

When zookeeper ledgers root path is changed, using pulsar-sql to query messages will cause `BKNoSuchLedgerExistsException`.

*Modifications*

To use new DefaultBkFactory(clientConfiguration),so that zk will be null in Bookeeper constructor;(Bookeeper.java row 113)
when metadataDriver will be initialized(Bookeeper.java row 167),zookeeper conection is null; we can jump to another  branch.If we have done the above steps,
finally, zkServers will be localhost:2181 rather than localhost:2181/pulsar in row 168(ZKMetadataDriverBase.java); the path that we use to get ledger is localhost:2181/pulsar/ledger/00/0000/L0001 rather than localhost:2181/pulsar/pulsar/ledger/00/0000/L0001;

Co-authored-by: [email protected] <1314520Ljq-->
  • Loading branch information
sakurafly123 authored Mar 24, 2021
1 parent 6704f12 commit b1ba8e5
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private ManagedLedgerFactoryImpl(ZooKeeper zkc, ClientConfiguration bkClientConf
zkc, config, NullStatsLogger.INSTANCE);
}

private ManagedLedgerFactoryImpl(ClientConfiguration clientConfiguration, String zkConnection, ManagedLedgerFactoryConfig config) throws Exception {
public ManagedLedgerFactoryImpl(ClientConfiguration clientConfiguration, String zkConnection, ManagedLedgerFactoryConfig config) throws Exception {
this(new DefaultBkFactory(clientConfiguration),
true,
ZooKeeperClient.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.*;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
Expand Down Expand Up @@ -69,6 +70,41 @@ public void testChangeZKPath() throws Exception {
List<Entry> entryList = cursor.readEntries(10);
Assert.assertEquals(10, entryList.size());

for (int i = 0; i < 10; i++) {
Entry entry = entryList.get(i);
Assert.assertEquals(("entry" + i).getBytes("UTF8"), entry.getData());
}
factory.shutdown();
}
@Test()
public void testChangeZKPath2() throws Exception {
ClientConfiguration configuration = new ClientConfiguration();
String zkConnectString = zkUtil.getZooKeeperConnectString() + "/test";
configuration.setMetadataServiceUri("zk://" + zkConnectString + "/ledgers");
configuration.setUseV2WireProtocol(true);
configuration.setEnableDigestTypeAutodetection(true);
configuration.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);

ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(configuration, zkConnectString,managedLedgerFactoryConfig);

ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setEnsembleSize(1)
.setWriteQuorumSize(1)
.setAckQuorumSize(1)
.setMetadataAckQuorumSize(1)
.setMetadataAckQuorumSize(1);
ManagedLedger ledger = factory.open("test-ledger", config);
ManagedCursor cursor = ledger.openCursor("test-c1");

for (int i = 0; i < 10; i++) {
String entry = "entry" + i;
ledger.addEntry(entry.getBytes("UTF8"));
}

List<Entry> entryList = cursor.readEntries(10);
Assert.assertEquals(10, entryList.size());

for (int i = 0; i < 10; i++) {
Entry entry = entryList.get(i);
Assert.assertEquals(("entry" + i).getBytes("UTF8"), entry.getData());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConf
pulsarConnectorConfig.getManagedLedgerNumWorkerThreads());
managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(
pulsarConnectorConfig.getManagedLedgerNumSchedulerThreads());
return new ManagedLedgerFactoryImpl(bkClientConfiguration, managedLedgerFactoryConfig);
return new ManagedLedgerFactoryImpl(bkClientConfiguration, pulsarConnectorConfig.getZookeeperUri(),managedLedgerFactoryConfig);
}

public ManagedLedgerConfig getManagedLedgerConfig(NamespaceName namespaceName, OffloadPolicies offloadPolicies,
Expand Down

0 comments on commit b1ba8e5

Please sign in to comment.