Skip to content

Commit

Permalink
[FLINK-27400][Connector/pulsar] Filter system topics for Pulsar conne…
Browse files Browse the repository at this point in the history
…ctor.
  • Loading branch information
syhily authored and tisonkun committed Sep 9, 2022
1 parent b614e94 commit 5a35485
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin;
import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartition;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithoutPartition;
import static org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED;

/**
Expand Down Expand Up @@ -120,7 +120,7 @@ public List<String> availableTopics() {
int partitionNums = entry.getValue();
// Get all topics from partitioned and non-partitioned topic names
if (partitionNums == NON_PARTITIONED) {
results.add(topicNameWithoutPartition(entry.getKey()));
results.add(topicName(entry.getKey()));
} else {
for (int i = 0; i < partitionNums; i++) {
results.add(topicNameWithPartition(entry.getKey(), i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl;

import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
Expand All @@ -35,6 +36,7 @@
import java.util.regex.Pattern;

import static java.util.stream.Collectors.toSet;
import static org.apache.flink.shaded.guava30.com.google.common.base.Predicates.not;

/** Subscribe to matching topics based on topic pattern. */
public class TopicPatternSubscriber extends BasePulsarSubscriber {
Expand Down Expand Up @@ -64,6 +66,7 @@ public Set<TopicPartition> getSubscribedTopicPartitions(
.getTopics(namespace)
.parallelStream()
.filter(this::matchesSubscriptionMode)
.filter(not(TopicNameUtils::isInternal))
.filter(topic -> topicPattern.matcher(topic).find())
.map(topic -> queryTopicMetadata(pulsarAdmin, topic))
.filter(Objects::nonNull)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;

import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;

import org.apache.pulsar.common.naming.TopicName;

Expand All @@ -30,13 +31,32 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicDomain.persistent;

/** util for topic name. */
@Internal
public final class TopicNameUtils {

private static final Pattern HEARTBEAT_NAMESPACE_PATTERN =
Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)");
private static final Pattern HEARTBEAT_NAMESPACE_PATTERN_V2 =
Pattern.compile("pulsar/([^:]+:\\d+)");
private static final Pattern SLA_NAMESPACE_PATTERN =
Pattern.compile("sla-monitor" + "/[^/]+/([^:]+:\\d+)");
private static final Set<String> EVENTS_TOPIC_NAMES =
ImmutableSet.of("__change_events", "__transaction_buffer_snapshot");
private static final String TRANSACTION_COORDINATOR_ASSIGN_PREFIX =
TopicName.get(persistent.value(), SYSTEM_NAMESPACE, "transaction_coordinator_assign")
.toString();
private static final String TRANSACTION_COORDINATOR_LOG_PREFIX =
TopicName.get(persistent.value(), SYSTEM_NAMESPACE, "__transaction_log_").toString();
private static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack";
private static final String PENDING_ACK_STORE_CURSOR_SUFFIX = "__pending_ack_state";

private TopicNameUtils() {
// No public constructor.
}
Expand All @@ -52,11 +72,6 @@ public static String topicNameWithPartition(String topic, int partitionId) {
return TopicName.get(topic).getPartition(partitionId).toString();
}

/** Get a non-partitioned topic name that does not belong to any partitioned topic. */
public static String topicNameWithoutPartition(String topic) {
return TopicName.get(topic).toString();
}

public static boolean isPartition(String topic) {
return TopicName.get(topic).isPartitioned();
}
Expand Down Expand Up @@ -92,4 +107,31 @@ public static List<String> distinctTopics(List<String> topics) {

return builder.build();
}

/**
* This method is refactored from {@code BrokerService} in pulsar-broker which is not available
* in the Pulsar client. We have to put it here and self maintained. Since these topic names
* would never be changed for backward compatible, we only need to add new topic names after
* version bump.
*
* @see <a
* href="https://github.com/apache/pulsar/blob/7075a5ce0d4a70f52625ac8c3d0c48894442b72a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L3024">BrokerService#isSystemTopic</a>
*/
public static boolean isInternal(String topic) {
// A topic name instance without partition information.
String topicName = topicName(topic);
TopicName topicInstance = TopicName.get(topicName);
String localName = topicInstance.getLocalName();
String namespace = topicInstance.getNamespace();

return namespace.equals(SYSTEM_NAMESPACE.toString())
|| SLA_NAMESPACE_PATTERN.matcher(namespace).matches()
|| HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
|| HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches()
|| EVENTS_TOPIC_NAMES.contains(localName)
|| topicName.startsWith(TRANSACTION_COORDINATOR_ASSIGN_PREFIX)
|| topicName.startsWith(TRANSACTION_COORDINATOR_LOG_PREFIX)
|| localName.endsWith(PENDING_ACK_STORE_SUFFIX)
|| localName.endsWith(PENDING_ACK_STORE_CURSOR_SUFFIX);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,15 @@
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;

import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithoutPartition;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

Expand Down Expand Up @@ -130,8 +129,7 @@ void fetchTopicPartitionUpdate() throws Exception {
void fetchNonPartitionTopic() {
String topic = randomAlphabetic(10);
operator().createTopic(topic, 0);
List<String> nonPartitionTopic =
Collections.singletonList(topicNameWithoutPartition(topic));
List<String> nonPartitionTopic = singletonList(topicName(topic));

TopicMetadataListener listener = new TopicMetadataListener(nonPartitionTopic);
long interval = Duration.ofMinutes(15).toMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/** Unit tests for {@link TopicNameUtils}. */
class TopicNameUtilsTest {
Expand Down Expand Up @@ -84,4 +85,12 @@ void mergeTheTopicNamesIntoOneSet() {
"persistent://public/default/short-topic",
"persistent://public/default/long-topic-partition-1");
}

@Test
void internalTopicAssertion() {
boolean internal =
TopicNameUtils.isInternal(
"persistent://public/default/topic__transaction_pending_ack");
assertTrue(internal);
}
}

0 comments on commit 5a35485

Please sign in to comment.