Skip to content

Commit

Permalink
[Transaction] Fix MLTransactionLog open manageLedger name problem. (a…
Browse files Browse the repository at this point in the history
…pache#10334)

## Motivation
now `MLTransactionLog` generate `managedLedger` name is illegal.
## implement
generate correct name and can generate metrics.
  • Loading branch information
congbobo184 authored Apr 23, 2021
1 parent 9279b5b commit b67a4b2
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.utils.StatsOutputStream;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -299,10 +300,11 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
}

checkReplicatedSubscriptionControllerState();

TopicName topicName = TopicName.get(topic);
if (brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
&& !checkTopicIsEventsNames(topic)
&& !topic.contains(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName())) {
&& !topicName.getEncodedLocalName().startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName())
&& !topicName.getEncodedLocalName().startsWith(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX)) {
this.transactionBuffer = brokerService.getPulsar()
.getTransactionBufferProvider().newTransactionBuffer(this, transactionCompletableFuture);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,20 @@
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Sets;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -87,4 +94,18 @@ public void testManagedLedgerMetrics() throws Exception {

}

@Test
public void testTransactionTopic() throws Exception {
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfo(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1);
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setMaxEntriesPerLedger(2);
new MLTransactionLogImpl(TransactionCoordinatorID.get(0),
pulsar.getManagedLedgerFactory(), managedLedgerConfig);
ManagedLedgerMetrics metrics = new ManagedLedgerMetrics(pulsar);
metrics.generate();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionLog;
import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback;
Expand All @@ -49,7 +51,7 @@ public class MLTransactionLogImpl implements TransactionLog {

private final ManagedLedger managedLedger;

public final static String TRANSACTION_LOG_PREFIX = NamespaceName.SYSTEM_NAMESPACE + "/transaction-log-";
public final static String TRANSACTION_LOG_PREFIX = "__transaction_log_";

private final ManagedCursor cursor;

Expand All @@ -59,14 +61,15 @@ public class MLTransactionLogImpl implements TransactionLog {

private final long tcId;

private final String topicName;
private final TopicName topicName;

public MLTransactionLogImpl(TransactionCoordinatorID tcID,
ManagedLedgerFactory managedLedgerFactory,
ManagedLedgerConfig managedLedgerConfig) throws Exception {
this.topicName = TRANSACTION_LOG_PREFIX + tcID;
this.topicName = TopicName.get(TopicDomain.persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX + tcID.getId());
this.tcId = tcID.getId();
this.managedLedger = managedLedgerFactory.open(topicName, managedLedgerConfig);
this.managedLedger = managedLedgerFactory.open(topicName.getPersistenceNamingEncoding(), managedLedgerConfig);
this.cursor = managedLedger.openCursor(TRANSACTION_SUBSCRIPTION_NAME,
CommandSubscribe.InitialPosition.Earliest);
this.entryQueue = new SpscArrayQueue<>(2000);
Expand Down

0 comments on commit b67a4b2

Please sign in to comment.