Skip to content

Commit

Permalink
Protocol Handlers and Proxy Extensions: Fix bootstrap and add tests (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored Nov 29, 2021
1 parent 86fe7d2 commit d3d580f
Show file tree
Hide file tree
Showing 10 changed files with 512 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import io.netty.channel.socket.SocketChannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
Expand All @@ -38,6 +41,9 @@
@Slf4j
public class ProtocolHandlers implements AutoCloseable {

@Getter
private final Map<SocketAddress, String> endpoints = new ConcurrentHashMap<>();

/**
* Load the protocol handlers for the given <tt>protocol</tt> list.
*
Expand Down Expand Up @@ -132,6 +138,7 @@ public Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> ne
+ " already occupied by other messaging protocols");
}
channelInitializers.put(handler.getKey(), initializers);
endpoints.put(address, handler.getKey());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,16 +385,25 @@ public void startProtocolHandlers(
private void startProtocolHandler(String protocol,
SocketAddress address,
ChannelInitializer<SocketChannel> initializer) throws IOException {
ServerBootstrap bootstrap = defaultServerBootstrap.clone();

ServiceConfiguration configuration = pulsar.getConfiguration();
boolean useSeparateThreadPool = configuration.isUseSeparateThreadPoolForProtocolHandlers();
ServerBootstrap bootstrap;
if (useSeparateThreadPool) {
bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));
EventLoopUtil.enableTriggeredMode(bootstrap);
DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory("pulsar-ph-" + protocol);
EventLoopGroup dedicatedWorkerGroup =
EventLoopUtil.newEventLoopGroup(configuration.getNumIOThreads(), false, defaultThreadFactory);
bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(dedicatedWorkerGroup));
protocolHandlersWorkerGroups.add(dedicatedWorkerGroup);
bootstrap.group(this.acceptorGroup, dedicatedWorkerGroup);
} else {
bootstrap = defaultServerBootstrap.clone();
}
bootstrap.childHandler(initializer);
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.protocol;

public class SimpleProtocolHandlerSeparateThreadPoolTest extends SimpleProtocolHandlerTestsBase {
public SimpleProtocolHandlerSeparateThreadPoolTest() {
super(true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.protocol;

public class SimpleProtocolHandlerSharedThreadPoolTest extends SimpleProtocolHandlerTestsBase {
public SimpleProtocolHandlerSharedThreadPoolTest() {
super(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.protocol;

import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.NonStickyEventExecutorGroup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.util.PortManager;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.functions.worker.PulsarFunctionTestTemporaryDirectory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.testng.collections.Sets;

import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Collections;
import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import static org.testng.Assert.assertEquals;

@Slf4j
@Test(groups = "broker")
public abstract class SimpleProtocolHandlerTestsBase extends BrokerTestBase {

public static final class MyProtocolHandler implements ProtocolHandler {

private ServiceConfiguration conf;

@Override
public String protocolName() {
return "test";
}

@Override
public boolean accept(String protocol) {
return "test".equals(protocol);
}

@Override
public void initialize(ServiceConfiguration conf) throws Exception {
this.conf = conf;
}

@Override
public String getProtocolDataToAdvertise() {
return "test";
}

@Override
public void start(BrokerService service) {

}

@Override
public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), PortManager.nextFreePort()),
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(final ChannelHandlerContext ctx) {
final ByteBuf resp = ctx.alloc().buffer();
resp.writeBytes("ok".getBytes(StandardCharsets.UTF_8));

final ChannelFuture f = ctx.writeAndFlush(resp);
f.addListener((ChannelFutureListener) future -> ctx.close());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("error", cause);
ctx.close();
}
});
}
});
}

@Override
public void close() {

}
}

private File tempDirectory;
private boolean useSeparateThreadPool;

public SimpleProtocolHandlerTestsBase(boolean useSeparateThreadPool) {
this.useSeparateThreadPool = useSeparateThreadPool;
}

@BeforeClass
@Override
protected void setup() throws Exception {
tempDirectory = Files.createTempDirectory("SimpleProtocolHandlerTest").toFile();
conf.setUseSeparateThreadPoolForProtocolHandlers(useSeparateThreadPool);
conf.setProtocolHandlerDirectory(tempDirectory.getAbsolutePath());
conf.setMessagingProtocols(Collections.singleton("test"));
buildMockNarFile(tempDirectory);
super.baseSetup();
}

@Test
public void testBootstrapProtocolHandler() throws Exception {
SocketAddress address =
pulsar.getProtocolHandlers()
.getEndpoints()
.entrySet()
.stream()
.filter(e -> e.getValue().equals("test"))
.map(Map.Entry::getKey)
.findAny()
.get();
try (Socket socket = new Socket();) {
socket.connect(address);
String res = IOUtils.toString(socket.getInputStream(), StandardCharsets.UTF_8);
assertEquals(res, "ok");
}
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();

if (tempDirectory != null) {
FileUtils.deleteDirectory(tempDirectory);
}
}

private static void buildMockNarFile(File tempDirectory) throws Exception {
File file = new File(tempDirectory, "temp.nar");
try (ZipOutputStream zipfile = new ZipOutputStream(new FileOutputStream(file))) {

zipfile.putNextEntry(new ZipEntry("META-INF/"));
zipfile.putNextEntry(new ZipEntry("META-INF/services/"));
zipfile.putNextEntry(new ZipEntry("META-INF/bundled-dependencies/"));

ZipEntry manifest = new ZipEntry("META-INF/services/"
+ ProtocolHandlerUtils.PULSAR_PROTOCOL_HANDLER_DEFINITION_FILE);
zipfile.putNextEntry(manifest);
String yaml = "name: test\n" +
"description: this is a test\n" +
"handlerClass: " + MyProtocolHandler.class.getName() + "\n";
zipfile.write(yaml.getBytes(StandardCharsets.UTF_8));
zipfile.closeEntry();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import com.google.common.collect.ImmutableMap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;

import java.net.SocketAddress;
import java.util.HashMap;
import java.util.HashSet;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyService;
Expand All @@ -31,13 +35,16 @@
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* A collection of loaded extensions.
*/
@Slf4j
public class ProxyExtensions implements AutoCloseable {

@Getter
private final Map<SocketAddress, String> endpoints = new ConcurrentHashMap<>();
/**
* Load the extensions for the given <tt>extensions</tt> list.
*
Expand Down Expand Up @@ -123,6 +130,7 @@ public Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> ne
+ "` attempts to use " + address + " for its listening port. But it is"
+ " already occupied by other messaging extensions");
}
endpoints.put(address, extension.getKey());
channelInitializers.put(extension.getKey(), initializers);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class ProxyService implements Closeable {
private MetadataStoreExtended localMetadataStore;
private MetadataStoreExtended configMetadataStore;
private PulsarResources pulsarResources;
@Getter
private ProxyExtensions proxyExtensions = null;

private final EventLoopGroup acceptorGroup;
Expand Down Expand Up @@ -265,15 +266,24 @@ private void startProxyExtension(String extensionName,
SocketAddress address,
ChannelInitializer<SocketChannel> initializer,
ServerBootstrap serverBootstrap) throws IOException {
ServerBootstrap bootstrap = serverBootstrap.clone();
ServerBootstrap bootstrap;
boolean useSeparateThreadPool = proxyConfig.isUseSeparateThreadPoolForProxyExtensions();
if (useSeparateThreadPool) {
bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));

EventLoopUtil.enableTriggeredMode(bootstrap);
DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory("pulsar-ext-" + extensionName);
EventLoopGroup dedicatedWorkerGroup =
EventLoopUtil.newEventLoopGroup(numThreads, false, defaultThreadFactory);
extensionsWorkerGroups.add(dedicatedWorkerGroup);
bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(dedicatedWorkerGroup));
bootstrap.group(this.acceptorGroup, dedicatedWorkerGroup);
} else {
bootstrap = serverBootstrap.clone();
}
bootstrap.childHandler(initializer);
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.extensions;

public class SimpleProxyExtensionSeparateThreadPoolTest extends SimpleProxyExtensionTestBase {
public SimpleProxyExtensionSeparateThreadPoolTest() {
super(true);
}
}
Loading

0 comments on commit d3d580f

Please sign in to comment.