Skip to content

Commit

Permalink
Reuse ManagedLedgerFactory instances across SQL queries (apache#4813)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored and jerrypeng committed Jul 26, 2019
1 parent 88c8544 commit f88ea9d
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,6 @@ public final void shutdown() {
} catch (Exception e) {
log.error(e, "Failed to close pulsar connector");
}
try {
PulsarConnectorCache.shutdown();
} catch (Exception e) {
log.error("Failed to shutdown pulsar connector cache");
}
try {
lifeCycleManager.stop();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;

public class PulsarConnectorCache {

private static final Logger log = Logger.get(PulsarConnectorCache.class);

private static PulsarConnectorCache instance;
@VisibleForTesting
static PulsarConnectorCache instance;

private final ManagedLedgerFactory managedLedgerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,6 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHand
return new FixedSplitSource(splits);
}

@VisibleForTesting
ManagedLedgerFactory getManagedLedgerFactory() throws Exception {
ClientConfiguration bkClientConfiguration = new ClientConfiguration()
.setZkServers(this.pulsarConnectorConfig.getZookeeperUri())
.setClientTcpNoDelay(false)
.setStickyReadsEnabled(false)
.setUseV2WireProtocol(true);
return new ManagedLedgerFactoryImpl(bkClientConfiguration);
}

@VisibleForTesting
Collection<PulsarSplit> getSplitsPartitionedTopic(int numSplits, TopicName topicName, PulsarTableHandle
tableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle> tupleDomain) throws Exception {
Expand All @@ -165,59 +155,40 @@ Collection<PulsarSplit> getSplitsPartitionedTopic(int numSplits, TopicName topic

int splitRemainder = actualNumSplits % numPartitions;

ManagedLedgerFactory managedLedgerFactory = getManagedLedgerFactory();

try {
List<PulsarSplit> splits = new LinkedList<>();
for (int i = 0; i < numPartitions; i++) {

int splitsForThisPartition = (splitRemainder > i) ? splitsPerPartition + 1 : splitsPerPartition;
splits.addAll(
getSplitsForTopic(
topicName.getPartition(i).getPersistenceNamingEncoding(),
managedLedgerFactory,
splitsForThisPartition,
tableHandle,
schemaInfo,
topicName.getPartition(i).getLocalName(),
tupleDomain)
);
}
return splits;
} finally {
if (managedLedgerFactory != null) {
try {
managedLedgerFactory.shutdown();
} catch (Exception e) {
log.error(e);
}
}
ManagedLedgerFactory managedLedgerFactory = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig)
.getManagedLedgerFactory();

List<PulsarSplit> splits = new LinkedList<>();
for (int i = 0; i < numPartitions; i++) {

int splitsForThisPartition = (splitRemainder > i) ? splitsPerPartition + 1 : splitsPerPartition;
splits.addAll(
getSplitsForTopic(
topicName.getPartition(i).getPersistenceNamingEncoding(),
managedLedgerFactory,
splitsForThisPartition,
tableHandle,
schemaInfo,
topicName.getPartition(i).getLocalName(),
tupleDomain));
}
return splits;
}

@VisibleForTesting
Collection<PulsarSplit> getSplitsNonPartitionedTopic(int numSplits, TopicName topicName, PulsarTableHandle
tableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle> tupleDomain) throws Exception {
ManagedLedgerFactory managedLedgerFactory = null;
try {
managedLedgerFactory = getManagedLedgerFactory();

return getSplitsForTopic(
topicName.getPersistenceNamingEncoding(),
managedLedgerFactory,
numSplits,
tableHandle,
schemaInfo,
tableHandle.getTableName(), tupleDomain);
} finally {
if (managedLedgerFactory != null) {
try {
managedLedgerFactory.shutdown();
} catch (Exception e) {
log.error(e);
}
}
}
Collection<PulsarSplit> getSplitsNonPartitionedTopic(int numSplits, TopicName topicName,
PulsarTableHandle tableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle> tupleDomain)
throws Exception {
ManagedLedgerFactory managedLedgerFactory = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig)
.getManagedLedgerFactory();

return getSplitsForTopic(
topicName.getPersistenceNamingEncoding(),
managedLedgerFactory,
numSplits,
tableHandle,
schemaInfo,
tableHandle.getTableName(), tupleDomain);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,8 @@ public Long answer(InvocationOnMock invocationOnMock) throws Throwable {
}
});

doReturn(managedLedgerFactory).when(this.pulsarSplitManager).getManagedLedgerFactory();
PulsarConnectorCache.instance = mock(PulsarConnectorCache.class);
when(PulsarConnectorCache.instance.getManagedLedgerFactory()).thenReturn(managedLedgerFactory);

for (Map.Entry<TopicName, PulsarSplit> split : splits.entrySet()) {

Expand Down

0 comments on commit f88ea9d

Please sign in to comment.