Skip to content

Commit

Permalink
[improve][broker] Tidy up the system topic. (apache#15252)
Browse files Browse the repository at this point in the history
### Motivation

For apache#13520, apache#14643, apache#14949, we fix some issues related to system topic but result in checking the system topic in different method. So it's better to tidy up the system topic.

So put these system topic names into a new class called SystemTopicNames.
  • Loading branch information
Technoboy- authored Apr 25, 2022
1 parent 188d4f4 commit aa4df1b
Show file tree
Hide file tree
Showing 44 changed files with 234 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pulsar.broker.resources.TenantResources;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -311,7 +312,7 @@ public static void main(String[] args) throws Exception {
createNamespaceIfAbsent(resources, NamespaceName.SYSTEM_NAMESPACE, arguments.cluster);

// Create transaction coordinator assign partitioned topic
createPartitionedTopic(configStore, TopicName.TRANSACTION_COORDINATOR_ASSIGN,
createPartitionedTopic(configStore, SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
arguments.numTransactionCoordinators);

localStore.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.pulsar;

import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_ASSIGN;
import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN;
import com.beust.jcommander.Parameter;
import com.google.common.collect.Sets;
import java.io.File;
Expand All @@ -30,9 +30,11 @@
import org.apache.logging.log4j.LogManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
Expand Down Expand Up @@ -311,10 +313,15 @@ public void start() throws Exception {
createNameSpace(cluster, TopicName.PUBLIC_TENANT, TopicName.PUBLIC_TENANT + "/" + TopicName.DEFAULT_NAMESPACE);
//create pulsar system namespace
createNameSpace(cluster, SYSTEM_NAMESPACE.getTenant(), SYSTEM_NAMESPACE.toString());
if (config.isTransactionCoordinatorEnabled() && !admin.namespaces()
.getTopics(SYSTEM_NAMESPACE.toString())
.contains(TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString())) {
admin.topics().createPartitionedTopic(TRANSACTION_COORDINATOR_ASSIGN.toString(), 1);
if (config.isTransactionCoordinatorEnabled()) {
NamespaceResources.PartitionedTopicResources partitionedTopicResources =
broker.getPulsarResources().getNamespaceResources().getPartitionedTopicResources();
Optional<PartitionedTopicMetadata> getResult =
partitionedTopicResources.getPartitionedTopicMetadataAsync(TRANSACTION_COORDINATOR_ASSIGN).get();
if (!getResult.isPresent()) {
partitionedTopicResources.createPartitionedTopic(TRANSACTION_COORDINATOR_ASSIGN,
new PartitionedTopicMetadata(1));
}
}

log.debug("--- setup completed ---");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.beust.jcommander.Parameter;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.util.CmdGenerateDocs;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;

Expand Down Expand Up @@ -103,7 +103,8 @@ public static void main(String[] args) throws Exception {
arguments.cluster);

// Create transaction coordinator assign partitioned topic
PulsarClusterMetadataSetup.createPartitionedTopic(configStore, TopicName.TRANSACTION_COORDINATOR_ASSIGN,
PulsarClusterMetadataSetup.createPartitionedTopic(configStore,
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
arguments.numTransactionCoordinators);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_LOG;
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -113,7 +113,6 @@
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.broker.validator.MultipleListenerValidator;
import org.apache.pulsar.broker.web.WebService;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlet;
Expand Down Expand Up @@ -273,8 +272,6 @@ public enum State {
private volatile CompletableFuture<Void> closeFuture;
// key is listener name, value is pulsar address and pulsar ssl address
private Map<String, AdvertisedListener> advertisedListeners;
private NamespaceName heartbeatNamespaceV1;
private NamespaceName heartbeatNamespaceV2;

public PulsarService(ServiceConfiguration config) {
this(config, Optional.empty(), (exitCode) -> {
Expand Down Expand Up @@ -723,8 +720,6 @@ public void start() throws PulsarServerException {
createMetricsServlet();
this.addWebServerHandlers(webService, metricsServlet, this.config);
this.webService.start();
heartbeatNamespaceV1 = NamespaceService.getHeartbeatNamespace(this.advertisedAddress, this.config);
heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(this.advertisedAddress, this.config);

// Refresh addresses and update configuration, since the port might have been dynamically assigned
if (config.getBrokerServicePort().equals(Optional.of(0))) {
Expand Down Expand Up @@ -1129,7 +1124,7 @@ public void loadNamespaceTopics(NamespaceBundle bundle) {
.get(config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS)) {
try {
TopicName topicName = TopicName.get(topic);
if (bundle.includes(topicName) && !isTransactionSystemTopic(topicName)) {
if (bundle.includes(topicName) && !isTransactionInternalName(topicName)) {
CompletableFuture<Optional<Topic>> future = brokerService.getTopicIfExists(topic);
if (future != null) {
persistentTopics.add(future);
Expand Down Expand Up @@ -1709,20 +1704,6 @@ public void shutdownNow() {
processTerminator.accept(-1);
}


public static boolean isTransactionSystemTopic(TopicName topicName) {
String topic = topicName.toString();
return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString())
|| topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
|| topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
}

public static boolean isTransactionInternalName(TopicName topicName) {
String topic = topicName.toString();
return topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
|| topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
}

@VisibleForTesting
protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
return new BrokerService(pulsar, ioEventLoopGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
Expand Down Expand Up @@ -124,7 +125,7 @@ public void onLoad(NamespaceBundle bundle) {
if (ex == null) {
for (String topic : topics) {
TopicName name = TopicName.get(topic);
if (TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
if (SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
&& name.isPartitioned()) {
handleTcClientConnect(TransactionCoordinatorID.get(name.getPartitionIndex()));
Expand All @@ -144,7 +145,7 @@ public void unLoad(NamespaceBundle bundle) {
if (ex == null) {
for (String topic : topics) {
TopicName name = TopicName.get(topic);
if (TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
if (SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
&& name.isPartitioned()) {
removeTransactionMetadataStore(
Expand All @@ -170,7 +171,7 @@ public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tc
if (stores.get(tcId) != null) {
completableFuture.complete(null);
} else {
pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
pulsarService.getBrokerService().checkTopicNsOwnership(SystemTopicNames
.TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString())
.thenRun(() -> internalPinnedExecutor.execute(() -> {
final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.pulsar.broker.admin.impl;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName;
import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionCoordinatorAssign;
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.zafarkhaja.semver.Version;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -723,7 +723,7 @@ protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authorit
future.thenAccept(__ -> {
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
if (isTransactionCoordinatorAssign(topicName)) {
internalUnloadTransactionCoordinatorAsync(asyncResponse, authoritative);
} else {
internalUnloadNonPartitionedTopicAsync(asyncResponse, authoritative);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pulsar.client.admin.Transactions;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
Expand All @@ -68,7 +69,7 @@ public abstract class TransactionsBase extends AdminResource {
protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative,
Integer coordinatorId) {
if (coordinatorId != null) {
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
validateTopicOwnership(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
authoritative);
TransactionMetadataStore transactionMetadataStore =
pulsar().getTransactionMetadataStoreService().getStores()
Expand All @@ -80,7 +81,7 @@ protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean
}
asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
} else {
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
getPartitionedTopicMetadataAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
false, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
Expand Down Expand Up @@ -151,7 +152,7 @@ protected CompletableFuture<TransactionPendingAckStats> internalGetPendingAckSta
protected void internalGetTransactionMetadata(AsyncResponse asyncResponse,
boolean authoritative, int mostSigBits, long leastSigBits) {
try {
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
validateTopicOwnership(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
authoritative);
CompletableFuture<TransactionMetadata> transactionMetadataFuture = new CompletableFuture<>();
TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
Expand Down Expand Up @@ -260,7 +261,7 @@ protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
boolean authoritative, long timeout, Integer coordinatorId) {
try {
if (coordinatorId != null) {
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
validateTopicOwnership(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
authoritative);
TransactionMetadataStore transactionMetadataStore =
pulsar().getTransactionMetadataStoreService().getStores()
Expand Down Expand Up @@ -296,7 +297,7 @@ protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
asyncResponse.resume(transactionMetadata);
});
} else {
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
getPartitionedTopicMetadataAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
false, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
Expand Down Expand Up @@ -349,7 +350,7 @@ protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
protected void internalGetCoordinatorInternalStats(AsyncResponse asyncResponse, boolean authoritative,
boolean metadata, int coordinatorId) {
try {
TopicName topicName = TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
TopicName topicName = SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
validateTopicOwnership(topicName, authoritative);
TransactionMetadataStore metadataStore = pulsar().getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(coordinatorId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import io.prometheus.client.Counter;
Expand Down Expand Up @@ -1372,9 +1373,16 @@ public static String getSLAMonitorBrokerName(ServiceUnitId ns) {
}

public static boolean isSystemServiceNamespace(String namespace) {
return SYSTEM_NAMESPACE.toString().equals(namespace)
|| SLA_NAMESPACE_PATTERN.matcher(namespace).matches()
|| HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
|| HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches();
}

public static boolean isHeartbeatNamespace(ServiceUnitId ns) {
String namespace = ns.getNamespaceObject().toString();
return HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
|| HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches()
|| SLA_NAMESPACE_PATTERN.matcher(namespace).matches();
|| HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches();
}

public boolean registerSLANamespace() throws PulsarServerException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.slf4j.Logger;
Expand All @@ -69,7 +70,7 @@ private Producer<ByteBuffer> createProducer() throws PulsarClientException {
final int sendTimeoutSecs = 10;

return pulsarClient.newProducer(Schema.BYTEBUFFER)
.topic(RESOURCE_USAGE_TOPIC_NAME)
.topic(SystemTopicNames.RESOURCE_USAGE_TOPIC.toString())
.batchingMaxPublishDelay(publishDelayMilliSecs, TimeUnit.MILLISECONDS)
.sendTimeout(sendTimeoutSecs, TimeUnit.SECONDS)
.blockIfQueueFull(false)
Expand Down Expand Up @@ -123,7 +124,7 @@ private class ResourceUsageReader implements ReaderListener<byte[]>, AutoCloseab

public ResourceUsageReader() throws PulsarClientException {
consumer = pulsarClient.newReader()
.topic(RESOURCE_USAGE_TOPIC_NAME)
.topic(SystemTopicNames.RESOURCE_USAGE_TOPIC.toString())
.startMessageId(MessageId.latest)
.readerListener(this)
.create();
Expand Down Expand Up @@ -165,7 +166,6 @@ public void received(Reader<byte[]> reader, Message<byte[]> msg) {
}

private static final Logger LOG = LoggerFactory.getLogger(ResourceUsageTopicTransportManager.class);
public static final String RESOURCE_USAGE_TOPIC_NAME = "non-persistent://pulsar/system/resource-usage";
private final PulsarService pulsarService;
private final PulsarClient pulsarClient;
private final ResourceUsageWriterTask pTask;
Expand All @@ -177,7 +177,7 @@ public void received(Reader<byte[]> reader, Message<byte[]> msg) {

private void createTenantAndNamespace() throws PulsarServerException, PulsarAdminException {
// Create a public tenant and default namespace
TopicName topicName = TopicName.get(RESOURCE_USAGE_TOPIC_NAME);
TopicName topicName = SystemTopicNames.RESOURCE_USAGE_TOPIC;

PulsarAdmin admin = pulsarService.getAdminClient();
ServiceConfiguration config = pulsarService.getConfig();
Expand Down
Loading

0 comments on commit aa4df1b

Please sign in to comment.