Skip to content

Commit

Permalink
[pulsar-broker] PIP-100 Support pluggable topic factory (apache#12235)
Browse files Browse the repository at this point in the history
add Closeable

fix test

close factory and handle exception
  • Loading branch information
rdhabalia authored Jul 20, 2022
1 parent 74bafe2 commit 6f38c5a
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 11 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ brokerShutdownTimeoutMs=60000
# Flag to skip broker shutdown when broker handles Out of memory error
skipBrokerShutdownOnOOM=false

# Factory class-name to create topic with custom workflow
topicFactoryClassName=

# Enable backlog quota check. Enforces action on topic when the quota is reached
backlogQuotaCheckEnabled=true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private String metadataStoreConfigPath = null;

@FieldContext(
dynamic = true,
doc = "Factory class-name to create topic with custom workflow"
)
private String topicFactoryClassName;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Enable backlog quota check. Enforces actions on topic when the quota is reached"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ public class BrokerService implements Closeable {
private final LongAdder pausedConnections = new LongAdder();
private BrokerInterceptor interceptor;
private ImmutableMap<String, EntryFilterWithClassLoader> entryFilters;
private TopicFactory topicFactory;

private Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;
private Set<ManagedLedgerPayloadProcessor> brokerEntryPayloadProcessors;
Expand Down Expand Up @@ -346,6 +347,7 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
this.authenticationService = new AuthenticationService(pulsar.getConfiguration());
this.blockedDispatchers =
ConcurrentOpenHashSet.<PersistentDispatcherMultipleConsumers>newBuilder().build();
this.topicFactory = createPersistentTopicFactory();
// update dynamic configuration and register-listener
updateConfigurationAndRegisterListeners();
this.lookupRequestSemaphore = new AtomicReference<Semaphore>(
Expand Down Expand Up @@ -1127,7 +1129,13 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
new NotAllowedException("Broker is not unable to load non-persistent topic"));
}
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);
NonPersistentTopic nonPersistentTopic;
try {
nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class);
} catch (Exception e) {
log.warn("Failed to create topic {}", topic, e);
return FutureUtil.failedFuture(e);
}
CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
isOwner.thenRun(() -> {
nonPersistentTopic.initialize()
Expand Down Expand Up @@ -1412,7 +1420,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
try {
PersistentTopic persistentTopic = isSystemTopic(topic)
? new SystemTopic(topic, ledger, BrokerService.this)
: new PersistentTopic(topic, ledger, BrokerService.this);
: newTopic(topic, ledger, BrokerService.this, PersistentTopic.class);
persistentTopic
.initialize()
.thenCompose(__ -> persistentTopic.preCreateSubscriptionForCompactionIfNeeded())
Expand Down Expand Up @@ -3061,6 +3069,37 @@ public long getPausedConnections() {
return pausedConnections.longValue();
}

@SuppressWarnings("unchecked")
private <T extends Topic> T newTopic(String topic, ManagedLedger ledger, BrokerService brokerService,
Class<T> topicClazz) throws PulsarServerException {
if (topicFactory != null) {
try {
Topic newTopic = topicFactory.create(topic, ledger, brokerService, topicClazz);
if (newTopic != null) {
return (T) newTopic;
}
} catch (Throwable e) {
log.warn("Failed to create persistent topic using factory {}", topic, e);
throw new PulsarServerException("Topic factory failed to create topic ", e);
}
}
return topicClazz == NonPersistentTopic.class ? (T) new NonPersistentTopic(topic, BrokerService.this)
: (T) new PersistentTopic(topic, ledger, brokerService);
}

private TopicFactory createPersistentTopicFactory() throws Exception {
String topicFactoryClassName = pulsar.getConfig().getTopicFactoryClassName();
if (StringUtils.isNotBlank(topicFactoryClassName)) {
try {
return (TopicFactory) Class.forName(topicFactoryClassName).newInstance();
} catch (Exception e) {
log.warn("Failed to initialize topic factory class {}", topicFactoryClassName, e);
throw e;
}
}
return null;
}

@VisibleForTesting
public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) {
this.pulsarChannelInitFactory = factory;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import java.io.Closeable;
import org.apache.bookkeeper.mledger.ManagedLedger;

/**
* Pluggable TopicFactory to create topic with specific behavior in broker.
* Note: This API and feature is in experimental phase.
*/
public interface TopicFactory extends Closeable {

<T extends Topic> T create(String topic, ManagedLedger ledger, BrokerService brokerService, Class<T> topicClazz);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.SocketChannel;

import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand All @@ -43,16 +42,15 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
Expand Down Expand Up @@ -96,6 +94,15 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import com.google.common.collect.Sets;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.SocketChannel;
import lombok.Cleanup;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

@Test(groups = "flaky")
public class PersistentTopicE2ETest extends BrokerTestBase {
private final List<AutoCloseable> closeables = new ArrayList<>();
Expand All @@ -104,6 +111,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase {

private AtomicInteger inactiveCount;

@DataProvider(name = "topic")
public Object[][] isPersistent() {
return new Object[][] { { true }, { false } };
}

@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
Expand Down Expand Up @@ -1974,4 +1986,47 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
PersistentTopicE2ETest.this.inactiveCount.incrementAndGet();
}
}
}

@Test(dataProvider = "topic")
public void testPersistentTopicFactory(boolean isPersistent) throws Exception {
conf.setTopicFactoryClassName(MyTopicFactory.class.getName());
restartBroker();

final String topicName = (isPersistent ? "persistent" : "non-persistent") + "://prop/ns-abc/factoryTopic"
+ isPersistent;
MyTopicFactory.count.set(0);

// 1. producer connect
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();

assertTrue(MyTopicFactory.count.get() > 0);
producer.close();
consumer.close();
}

public static class MyTopicFactory implements TopicFactory {
private static AtomicInteger count = new AtomicInteger(0);

@Override
public <T extends Topic> T create(String topic, ManagedLedger ledger, BrokerService brokerService,
Class<T> topicClazz) {
try {
count.incrementAndGet();
if(topicClazz == NonPersistentTopic.class) {
return (T) new NonPersistentTopic(topic, brokerService);
}else {
return (T) new PersistentTopic(topic, ledger, brokerService);
}
} catch (Exception e) {
throw new IllegalStateException(e);
}
}

@Override
public void close() throws IOException {
// No-op
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ public void testConfigFileDefaults() throws Exception {
+ key + "' conf/broker.conf default value doesn't match java default value\nConf: "+ fileValue + "\nJava: " + javaValue);
}
}

}

@Test
Expand Down
2 changes: 2 additions & 0 deletions site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|metadataStoreSessionTimeoutMillis| Metadata store session timeout in milliseconds |30000|
|brokerShutdownTimeoutMs| Time to wait for broker graceful shutdown. After this time elapses, the process will be killed |60000|
|skipBrokerShutdownOnOOM| Flag to skip broker shutdown when broker handles Out of memory error. |false|
|topicFactoryClassName| Factory class-name to create topic with custom workflow. ||
|backlogQuotaCheckEnabled| Enable backlog quota check. Enforces action on topic when the quota is reached |true|
|backlogQuotaCheckIntervalInSeconds| How often to check for topics that have reached the quota |60|
|backlogQuotaDefaultLimitBytes| The default per-topic backlog quota limit. Being less than 0 means no limitation. By default, it is -1. | -1 |
Expand Down Expand Up @@ -515,6 +516,7 @@ You can set the log level and configuration in the [log4j2.yaml](https://github
|metadataStoreOperationTimeoutSeconds|Metadata store operation timeout in seconds.|30|
|brokerShutdownTimeoutMs| The time to wait for graceful broker shutdown. After this time elapses, the process will be killed. |60000|
|skipBrokerShutdownOnOOM| Flag to skip broker shutdown when broker handles Out of memory error. |false|
|topicFactoryClassName| Factory class-name to create topic with custom workflow. ||
|backlogQuotaCheckEnabled| Enable the backlog quota check, which enforces a specified action when the quota is reached. |true|
|backlogQuotaCheckIntervalInSeconds| How often to check for topics that have reached the backlog quota. |60|
|backlogQuotaDefaultLimitBytes| The default per-topic backlog quota limit. Being less than 0 means no limitation. By default, it is -1. |-1|
Expand Down

0 comments on commit 6f38c5a

Please sign in to comment.