diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java new file mode 100644 index 0000000000000..215bd344a6186 --- /dev/null +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.netty; + +import org.apache.pulsar.io.core.PushSource; +import org.apache.pulsar.io.core.SourceContext; +import org.apache.pulsar.io.core.annotations.Connector; +import org.apache.pulsar.io.core.annotations.IOType; +import org.apache.pulsar.io.netty.server.NettyServer; +import java.util.Map; + +/** + * A simple Netty Tcp or Udp Source connector to listen Tcp/Udp messages and write to user-defined Pulsar topic + */ +@Connector( + name = "netty", + type = IOType.SOURCE, + help = "A simple Netty Tcp or Udp Source connector to listen Tcp/Udp messages and write to user-defined Pulsar topic", + configClass = NettySourceConfig.class) +public class NettySource extends PushSource { + + private NettyServer nettyServer; + private Thread thread; + + @Override + public void open(Map config, SourceContext sourceContext) throws Exception { + NettySourceConfig nettySourceConfig = NettySourceConfig.load(config); + if (nettySourceConfig.getType() == null + || nettySourceConfig.getHost() == null + || nettySourceConfig.getPort() <= 0) { + throw new IllegalArgumentException("Required property not set."); + } + + thread = new Thread(new PulsarServerRunnable(nettySourceConfig, this)); + thread.start(); + } + + @Override + public void close() throws Exception { + nettyServer.shutdownGracefully(); + } + + private class PulsarServerRunnable implements Runnable { + + private NettySourceConfig nettySourceConfig; + private NettySource nettySource; + + public PulsarServerRunnable(NettySourceConfig nettySourceConfig, NettySource nettySource) { + this.nettySourceConfig = nettySourceConfig; + this.nettySource = nettySource; + } + + @Override + public void run() { + nettyServer = new NettyServer.Builder() + .setType(NettyServer.Type.valueOf(nettySourceConfig.getType().toUpperCase())) + .setHost(nettySourceConfig.getHost()) + .setPort(nettySourceConfig.getPort()) + .setNumberOfThreads(nettySourceConfig.getNumberOfThreads()) + .setNettySource(nettySource) + .build(); + + nettyServer.run(); + } + } + +} diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSourceConfig.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java similarity index 79% rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSourceConfig.java rename to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java index 07b3cf8b8ff19..f5d40e930ad59 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSourceConfig.java +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java @@ -30,7 +30,7 @@ import java.util.Map; /** - * Netty Tcp Source Connector Config. + * Netty Tcp or Udp Source Connector Config. */ @Data @Setter @@ -38,10 +38,16 @@ @EqualsAndHashCode @ToString @Accessors(chain = true) -public class NettyTcpSourceConfig implements Serializable { +public class NettySourceConfig implements Serializable { private static final long serialVersionUID = -7116130435021510496L; + @FieldDoc( + required = true, + defaultValue = "tcp", + help = "The tcp or udp network protocols") + private String type = "tcp"; + @FieldDoc( required = true, defaultValue = "127.0.0.1", @@ -61,14 +67,14 @@ public class NettyTcpSourceConfig implements Serializable { "handle the traffic of the accepted connections") private int numberOfThreads = 1; - public static NettyTcpSourceConfig load(Map map) throws IOException { + public static NettySourceConfig load(Map map) throws IOException { ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(new ObjectMapper().writeValueAsString(map), NettyTcpSourceConfig.class); + return mapper.readValue(new ObjectMapper().writeValueAsString(map), NettySourceConfig.class); } - public static NettyTcpSourceConfig load(String yamlFile) throws IOException { + public static NettySourceConfig load(String yamlFile) throws IOException { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); - return mapper.readValue(new File(yamlFile), NettyTcpSourceConfig.class); + return mapper.readValue(new File(yamlFile), NettySourceConfig.class); } } diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSource.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSource.java deleted file mode 100644 index 3cef730dc37ad..0000000000000 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSource.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.netty; - -import org.apache.pulsar.io.core.PushSource; -import org.apache.pulsar.io.core.SourceContext; -import org.apache.pulsar.io.core.annotations.Connector; -import org.apache.pulsar.io.core.annotations.IOType; -import org.apache.pulsar.io.netty.tcp.server.NettyTcpServer; -import java.util.Map; - -/** - * A simple Netty Tcp Source connector to listen Tcp messages and write to user-defined Pulsar topic - */ -@Connector( - name = "tcp", - type = IOType.SOURCE, - help = "A simple Netty Tcp Source connector to listen Tcp messages and write to user-defined Pulsar topic", - configClass = NettyTcpSourceConfig.class) -public class NettyTcpSource extends PushSource { - - private NettyTcpServer nettyTcpServer; - private Thread thread; - - @Override - public void open(Map config, SourceContext sourceContext) throws Exception { - NettyTcpSourceConfig nettyTcpSourceConfig = NettyTcpSourceConfig.load(config); - - thread = new Thread(new PulsarTcpServerRunnable(nettyTcpSourceConfig, this)); - thread.start(); - } - - @Override - public void close() throws Exception { - nettyTcpServer.shutdownGracefully(); - } - - private class PulsarTcpServerRunnable implements Runnable { - - private NettyTcpSourceConfig nettyTcpSourceConfig; - private NettyTcpSource nettyTcpSource; - - public PulsarTcpServerRunnable(NettyTcpSourceConfig nettyTcpSourceConfig, NettyTcpSource nettyTcpSource) { - this.nettyTcpSourceConfig = nettyTcpSourceConfig; - this.nettyTcpSource = nettyTcpSource; - } - - @Override - public void run() { - nettyTcpServer = new NettyTcpServer.Builder() - .setHost(nettyTcpSourceConfig.getHost()) - .setPort(nettyTcpSourceConfig.getPort()) - .setNumberOfThreads(nettyTcpSourceConfig.getNumberOfThreads()) - .setNettyTcpSource(nettyTcpSource) - .build(); - - nettyTcpServer.run(); - } - } - -} diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java similarity index 97% rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializer.java rename to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java index f88f514a4ae0d..b5fa8209ff381 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializer.java +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.netty.tcp.server; +package org.apache.pulsar.io.netty.server; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java new file mode 100644 index 0000000000000..2cb803153ac06 --- /dev/null +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.netty.server; + +import com.google.common.base.Preconditions; +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.io.netty.NettySource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Netty Tcp or Udp Server to accept any incoming data through Tcp. + */ +public class NettyServer { + + private static final Logger logger = LoggerFactory.getLogger(NettyServer.class); + + private Type type; + private String host; + private int port; + private NettySource nettySource; + private int numberOfThreads; + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + + private NettyServer(Builder builder) { + this.type = builder.type; + this.host = builder.host; + this.port = builder.port; + this.nettySource = builder.nettySource; + this.numberOfThreads = builder.numberOfThreads; + } + + public void run() { + try { + switch (type) { + case TCP: + runTcp(); + break; + case UDP: + runUdp(); + break; + default: + runTcp(); + break; + } + } catch(Exception ex) { + logger.error("Error occurred when Netty Tcp or Udp Server is running", ex); + } finally { + shutdownGracefully(); + } + } + + public void shutdownGracefully() { + if (workerGroup != null) + workerGroup.shutdownGracefully(); + if (bossGroup != null) + bossGroup.shutdownGracefully(); + } + + private void runUdp() throws InterruptedException { + workerGroup = new NioEventLoopGroup(this.numberOfThreads); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(workerGroup); + bootstrap.channel(NioDatagramChannel.class); + bootstrap.handler(new NettyChannelInitializer(new NettyServerHandler(this.nettySource))) + .option(ChannelOption.SO_BACKLOG, 1024); + + ChannelFuture channelFuture = bootstrap.bind(this.host, this.port).sync(); + channelFuture.channel().closeFuture().sync(); + } + + private void runTcp() throws InterruptedException { + bossGroup = new NioEventLoopGroup(this.numberOfThreads); + workerGroup = new NioEventLoopGroup(this.numberOfThreads); + ServerBootstrap serverBootstrap = new ServerBootstrap(); + serverBootstrap.group(bossGroup, workerGroup); + serverBootstrap.channel(NioServerSocketChannel.class); + serverBootstrap.childHandler(new NettyChannelInitializer(new NettyServerHandler(this.nettySource))) + .option(ChannelOption.SO_BACKLOG, 1024) + .childOption(ChannelOption.SO_KEEPALIVE, true); + + ChannelFuture channelFuture = serverBootstrap.bind(this.host, this.port).sync(); + channelFuture.channel().closeFuture().sync(); + } + + /** + * Pulsar Netty Server Builder. + */ + public static class Builder { + + private Type type; + private String host; + private int port; + private NettySource nettySource; + private int numberOfThreads; + + public Builder setType(Type type) { + this.type = type; + return this; + } + + public Builder setHost(String host) { + this.host = host; + return this; + } + + public Builder setPort(int port) { + this.port = port; + return this; + } + + public Builder setNettySource(NettySource nettySource) { + this.nettySource = nettySource; + return this; + } + + public Builder setNumberOfThreads(int numberOfThreads) { + this.numberOfThreads = numberOfThreads; + return this; + } + + public NettyServer build() { + Preconditions.checkNotNull(this.type, "type cannot be blank/null"); + Preconditions.checkArgument(StringUtils.isNotBlank(host), "host cannot be blank/null"); + Preconditions.checkArgument(this.port >= 1024, "port must be set equal or bigger than 1024"); + Preconditions.checkNotNull(this.nettySource, "nettySource must be set"); + Preconditions.checkArgument(this.numberOfThreads > 0, + "numberOfThreads must be set as positive"); + + return new NettyServer(this); + } + } + + /** + * tcp or udp network protocol + */ + public enum Type { + + TCP, + + UDP + } + +} diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerHandler.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java similarity index 74% rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerHandler.java rename to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java index 2fd2cf3c690d5..81fd2037ac484 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerHandler.java +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java @@ -16,12 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.netty.tcp.server; +package org.apache.pulsar.io.netty.server; import io.netty.channel.*; import lombok.Data; import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.netty.NettyTcpSource; +import org.apache.pulsar.io.netty.NettySource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,19 +32,19 @@ * Handles a server-side channel */ @ChannelHandler.Sharable -public class NettyTcpServerHandler extends SimpleChannelInboundHandler { +public class NettyServerHandler extends SimpleChannelInboundHandler { - private static final Logger logger = LoggerFactory.getLogger(NettyTcpServerHandler.class); + private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); - private NettyTcpSource nettyTcpSource; + private NettySource nettySource; - public NettyTcpServerHandler(NettyTcpSource nettyTcpSource) { - this.nettyTcpSource = nettyTcpSource; + public NettyServerHandler(NettySource nettySource) { + this.nettySource = nettySource; } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, byte[] bytes) throws Exception { - nettyTcpSource.consume(new NettyTcpRecord(Optional.ofNullable(""), bytes)); + nettySource.consume(new NettyRecord(Optional.ofNullable(""), bytes)); } @Override @@ -54,7 +54,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } @Data - static private class NettyTcpRecord implements Record, Serializable { + static private class NettyRecord implements Record, Serializable { private final Optional key; private final byte[] value; } diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServer.java deleted file mode 100644 index b471b0369705c..0000000000000 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServer.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.netty.tcp.server; - -import com.google.common.base.Preconditions; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.io.netty.NettyTcpSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Netty Tcp Server to accept any incoming data through Tcp. - */ -public class NettyTcpServer { - - private static final Logger logger = LoggerFactory.getLogger(NettyTcpServer.class); - - private String host; - private int port; - private NettyTcpSource nettyTcpSource; - private int numberOfThreads; - private EventLoopGroup bossGroup; - private EventLoopGroup workerGroup; - - private NettyTcpServer(Builder builder) { - this.host = builder.host; - this.port = builder.port; - this.nettyTcpSource = builder.nettyTcpSource; - this.numberOfThreads = builder.numberOfThreads; - } - - public void run() { - try { - bossGroup = new NioEventLoopGroup(this.numberOfThreads); - workerGroup = new NioEventLoopGroup(this.numberOfThreads); - - ServerBootstrap serverBootstrap = new ServerBootstrap(); - serverBootstrap.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .childHandler(new NettyChannelInitializer(new NettyTcpServerHandler(this.nettyTcpSource))) - .option(ChannelOption.SO_BACKLOG, 1024) - .childOption(ChannelOption.SO_KEEPALIVE, true); - - ChannelFuture channelFuture = serverBootstrap.bind(this.host, this.port).sync(); - channelFuture.channel().closeFuture().sync(); - } catch(Exception ex) { - logger.error("Error occurred when Netty Tcp Server is running", ex); - } finally { - shutdownGracefully(); - } - } - - public void shutdownGracefully() { - if (workerGroup != null) - workerGroup.shutdownGracefully(); - if (bossGroup != null) - bossGroup.shutdownGracefully(); - } - - /** - * Pulsar Tcp Server Builder. - */ - public static class Builder { - - private String host; - private int port; - private NettyTcpSource nettyTcpSource; - private int numberOfThreads; - - public Builder setHost(String host) { - this.host = host; - return this; - } - - public Builder setPort(int port) { - this.port = port; - return this; - } - - public Builder setNettyTcpSource(NettyTcpSource nettyTcpSource) { - this.nettyTcpSource = nettyTcpSource; - return this; - } - - public Builder setNumberOfThreads(int numberOfThreads) { - this.numberOfThreads = numberOfThreads; - return this; - } - - public NettyTcpServer build() { - Preconditions.checkArgument(StringUtils.isNotBlank(host), "host cannot be blank/null"); - Preconditions.checkArgument(this.port >= 1024, "port must be set equal or bigger than 1024"); - Preconditions.checkNotNull(this.nettyTcpSource, "nettyTcpSource must be set"); - Preconditions.checkArgument(this.numberOfThreads > 0, - "numberOfThreads must be set as positive"); - - return new NettyTcpServer(this); - } - } - -} \ No newline at end of file diff --git a/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml index 58744ce79462c..22c4af459910a 100644 --- a/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml +++ b/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml @@ -17,6 +17,6 @@ # under the License. # -name: tcp -description: Netty Tcp Source Connector -sourceClass: org.apache.pulsar.io.netty.NettyTcpSource +name: netty +description: Netty Tcp or Udp Source Connector +sourceClass: org.apache.pulsar.io.netty.NettySource diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettyTcpSourceConfigTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettySourceConfigTest.java similarity index 65% rename from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettyTcpSourceConfigTest.java rename to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettySourceConfigTest.java index efc763a90741d..034fa7979b36c 100644 --- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettyTcpSourceConfigTest.java +++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettySourceConfigTest.java @@ -30,24 +30,27 @@ import static org.junit.Assert.assertNotNull; /** - * Tests for Netty Tcp Source Config + * Tests for Netty Tcp or Udp Source Config */ -public class NettyTcpSourceConfigTest { +public class NettySourceConfigTest { private static final String LOCALHOST = "127.0.0.1"; + private static final String TCP = "tcp"; @Test public void testNettyTcpConfigLoadWithMap() throws IOException { Map map = new HashMap<>(); + map.put("type", TCP); map.put("host", LOCALHOST); map.put("port", 10999); map.put("numberOfThreads", 1); - NettyTcpSourceConfig nettyTcpSourceConfig = NettyTcpSourceConfig.load(map); - assertNotNull(nettyTcpSourceConfig); - assertEquals(LOCALHOST, nettyTcpSourceConfig.getHost()); - assertEquals(10999, nettyTcpSourceConfig.getPort()); - assertEquals(1, nettyTcpSourceConfig.getNumberOfThreads()); + NettySourceConfig nettySourceConfig = NettySourceConfig.load(map); + assertNotNull(nettySourceConfig); + assertEquals(TCP, nettySourceConfig.getType()); + assertEquals(LOCALHOST, nettySourceConfig.getHost()); + assertEquals(10999, nettySourceConfig.getPort()); + assertEquals(1, nettySourceConfig.getNumberOfThreads()); } @Test(expected = UnrecognizedPropertyException.class) @@ -55,23 +58,24 @@ public void testNettyTcpConfigLoadWithMapWhenInvalidPropertyIsSet() throws IOExc Map map = new HashMap<>(); map.put("invalidProperty", 1); - NettyTcpSourceConfig.load(map); + NettySourceConfig.load(map); } @Test public void testNettyTcpConfigLoadWithYamlFile() throws IOException { - File yamlFile = getFile("nettyTcpSourceConfig.yaml"); - NettyTcpSourceConfig nettyTcpSourceConfig = NettyTcpSourceConfig.load(yamlFile.getAbsolutePath()); - assertNotNull(nettyTcpSourceConfig); - assertEquals(LOCALHOST, nettyTcpSourceConfig.getHost()); - assertEquals(10911, nettyTcpSourceConfig.getPort()); - assertEquals(5, nettyTcpSourceConfig.getNumberOfThreads()); + File yamlFile = getFile("nettySourceConfig.yaml"); + NettySourceConfig nettySourceConfig = NettySourceConfig.load(yamlFile.getAbsolutePath()); + assertNotNull(nettySourceConfig); + assertEquals(TCP, nettySourceConfig.getType()); + assertEquals(LOCALHOST, nettySourceConfig.getHost()); + assertEquals(10911, nettySourceConfig.getPort()); + assertEquals(5, nettySourceConfig.getNumberOfThreads()); } @Test(expected = UnrecognizedPropertyException.class) public void testNettyTcpConfigLoadWithYamlFileWhenInvalidPropertyIsSet() throws IOException { - File yamlFile = getFile("nettyTcpSourceConfigWithInvalidProperty.yaml"); - NettyTcpSourceConfig.load(yamlFile.getAbsolutePath()); + File yamlFile = getFile("nettySourceConfigWithInvalidProperty.yaml"); + NettySourceConfig.load(yamlFile.getAbsolutePath()); } private File getFile(String name) { diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializerTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java similarity index 89% rename from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializerTest.java rename to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java index a5243e6640b92..b8d6dd45ea9f3 100644 --- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializerTest.java +++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java @@ -16,10 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.netty.tcp.server; +package org.apache.pulsar.io.netty.server; import io.netty.channel.socket.nio.NioSocketChannel; -import org.apache.pulsar.io.netty.NettyTcpSource; +import org.apache.pulsar.io.netty.NettySource; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -35,7 +35,7 @@ public void testChannelInitializer() throws Exception { NioSocketChannel channel = new NioSocketChannel(); NettyChannelInitializer nettyChannelInitializer = new NettyChannelInitializer( - new NettyTcpServerHandler(new NettyTcpSource())); + new NettyServerHandler(new NettySource())); nettyChannelInitializer.initChannel(channel); assertNotNull(channel.pipeline().toMap()); diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyServerTest.java similarity index 51% rename from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerTest.java rename to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyServerTest.java index 0c2f56b083f3f..80415707bcac7 100644 --- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerTest.java +++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyServerTest.java @@ -16,27 +16,71 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.netty.tcp.server; +package org.apache.pulsar.io.netty.server; -import org.apache.pulsar.io.netty.NettyTcpSource; +import org.apache.pulsar.io.netty.NettySource; +import org.apache.pulsar.io.netty.NettySourceConfig; import org.junit.Test; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; /** - * Tests for Netty Tcp Server + * Tests for Netty Tcp or Udp Server */ -public class NettyTcpServerTest { +public class NettyServerTest { private static final String LOCALHOST = "127.0.0.1"; + private static final String TCP = "TCP"; + private static final String UDP = "UDP"; @Test public void testNettyTcpServerConstructor() { - NettyTcpServer nettyTcpServer = new NettyTcpServer.Builder() + NettyServer nettyTcpServer = new NettyServer.Builder() + .setType(NettyServer.Type.valueOf(TCP)) .setHost(LOCALHOST) .setPort(10999) .setNumberOfThreads(2) - .setNettyTcpSource(new NettyTcpSource()) + .setNettySource(new NettySource()) + .build(); + + assertNotNull(nettyTcpServer); + } + + @Test + public void testNettyUdpServerConstructor() { + NettyServer nettyUdpServer = new NettyServer.Builder() + .setType(NettyServer.Type.valueOf(UDP)) + .setHost(LOCALHOST) + .setPort(10999) + .setNumberOfThreads(2) + .setNettySource(new NettySource()) + .build(); + + assertNotNull(nettyUdpServer); + } + + @Test + public void testNettyTcpServerByNettySourceConfig() throws IOException { + Map map = new HashMap<>(); + map.put("type", "tcp"); + map.put("host", LOCALHOST); + map.put("port", 10999); + map.put("numberOfThreads", 1); + + NettySourceConfig nettySourceConfig = NettySourceConfig.load(map); + + // test NettySource run function NettyServer Builder + NettyServer nettyTcpServer = new NettyServer.Builder() + .setType(NettyServer.Type.valueOf(nettySourceConfig.getType().toUpperCase())) + .setHost(nettySourceConfig.getHost()) + .setPort(nettySourceConfig.getPort()) + .setNumberOfThreads(nettySourceConfig.getNumberOfThreads()) + .setNettySource(new NettySource()) .build(); assertNotNull(nettyTcpServer); @@ -44,36 +88,38 @@ public void testNettyTcpServerConstructor() { @Test(expected = IllegalArgumentException.class) public void testNettyTcpServerConstructorWhenHostIsNotSet() { - new NettyTcpServer.Builder() + new NettyServer.Builder() + .setType(NettyServer.Type.valueOf(TCP)) .setPort(10999) .setNumberOfThreads(2) - .setNettyTcpSource(new NettyTcpSource()) + .setNettySource(new NettySource()) .build(); } @Test(expected = IllegalArgumentException.class) public void testNettyTcpServerConstructorWhenPortIsNotSet() { - new NettyTcpServer.Builder() + new NettyServer.Builder() + .setType(NettyServer.Type.valueOf(TCP)) .setHost(LOCALHOST) .setNumberOfThreads(2) - .setNettyTcpSource(new NettyTcpSource()) + .setNettySource(new NettySource()) .build(); } - @Test(expected = IllegalArgumentException.class) public void testNettyTcpServerConstructorWhenNumberOfThreadsIsNotSet() { - new NettyTcpServer.Builder() + new NettyServer.Builder() + .setType(NettyServer.Type.valueOf(TCP)) .setHost(LOCALHOST) .setPort(10999) - .setNettyTcpSource(new NettyTcpSource()) + .setNettySource(new NettySource()) .build(); } - @Test(expected = NullPointerException.class) public void testNettyTcpServerConstructorWhenNettyTcpSourceIsNotSet() { - new NettyTcpServer.Builder() + new NettyServer.Builder() + .setType(NettyServer.Type.valueOf(TCP)) .setHost(LOCALHOST) .setPort(10999) .setNumberOfThreads(2) @@ -82,41 +128,45 @@ public void testNettyTcpServerConstructorWhenNettyTcpSourceIsNotSet() { @Test(expected = IllegalArgumentException.class) public void testNettyTcpServerWhenHostIsSetAsBlank() { - new NettyTcpServer.Builder() + new NettyServer.Builder() + .setType(NettyServer.Type.valueOf(TCP)) .setHost(" ") .setPort(10999) .setNumberOfThreads(2) - .setNettyTcpSource(new NettyTcpSource()) + .setNettySource(new NettySource()) .build(); } @Test(expected = IllegalArgumentException.class) public void testNettyTcpServerWhenPortIsSetAsZero() { - new NettyTcpServer.Builder() + new NettyServer.Builder() + .setType(NettyServer.Type.valueOf(TCP)) .setHost(LOCALHOST) .setPort(0) .setNumberOfThreads(2) - .setNettyTcpSource(new NettyTcpSource()) + .setNettySource(new NettySource()) .build(); } @Test(expected = IllegalArgumentException.class) public void testNettyTcpServerWhenPortIsSetLowerThan1024() { - new NettyTcpServer.Builder() + new NettyServer.Builder() + .setType(NettyServer.Type.valueOf(TCP)) .setHost(LOCALHOST) .setPort(1022) .setNumberOfThreads(2) - .setNettyTcpSource(new NettyTcpSource()) + .setNettySource(new NettySource()) .build(); } @Test(expected = IllegalArgumentException.class) public void testNettyTcpServerWhenNumberOfThreadsIsSetAsZero() { - new NettyTcpServer.Builder() + new NettyServer.Builder() + .setType(NettyServer.Type.valueOf(TCP)) .setHost(LOCALHOST) .setPort(10999) .setNumberOfThreads(0) - .setNettyTcpSource(new NettyTcpSource()) + .setNettySource(new NettySource()) .build(); } diff --git a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfig.yaml b/pulsar-io/netty/src/test/resources/nettySourceConfig.yaml similarity index 98% rename from pulsar-io/netty/src/test/resources/nettyTcpSourceConfig.yaml rename to pulsar-io/netty/src/test/resources/nettySourceConfig.yaml index ca748cc4ad67f..eff1a9ce1f104 100644 --- a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfig.yaml +++ b/pulsar-io/netty/src/test/resources/nettySourceConfig.yaml @@ -18,7 +18,8 @@ # { +"type": "tcp", "host": "127.0.0.1", "port": "10911", "numberOfThreads": "5" -} \ No newline at end of file +} diff --git a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfigWithInvalidProperty.yaml b/pulsar-io/netty/src/test/resources/nettySourceConfigWithInvalidProperty.yaml similarity index 98% rename from pulsar-io/netty/src/test/resources/nettyTcpSourceConfigWithInvalidProperty.yaml rename to pulsar-io/netty/src/test/resources/nettySourceConfigWithInvalidProperty.yaml index 8a2bb92d190a3..b1f90c2d1e8aa 100644 --- a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfigWithInvalidProperty.yaml +++ b/pulsar-io/netty/src/test/resources/nettySourceConfigWithInvalidProperty.yaml @@ -18,7 +18,8 @@ # { +"type": "tcp", "host": "127.0.0.1", "port": "10911", "invalidProperty": "5" -} \ No newline at end of file +} diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md index 0250ebc5441cf..05a43dc40bf83 100644 --- a/site2/docs/io-connectors.md +++ b/site2/docs/io-connectors.md @@ -17,4 +17,4 @@ Pulsar Functions cluster. - [RabbitMQ Source Connector](io-rabbitmq.md#source) - [Twitter Firehose Source Connector](io-twitter.md) - [CDC Source Connector based on Debezium](io-cdc.md) -- [Netty Tcp Source Connector](io-tcp.md#source) +- [Netty Tcp or Udp Source Connector](io-netty.md#source) diff --git a/site2/docs/io-tcp.md b/site2/docs/io-netty.md similarity index 69% rename from site2/docs/io-tcp.md rename to site2/docs/io-netty.md index 8bf3a8959ccfb..479752f9adb23 100644 --- a/site2/docs/io-tcp.md +++ b/site2/docs/io-netty.md @@ -1,12 +1,12 @@ --- -id: io-tcp -title: Netty Tcp Connector -sidebar_label: Netty Tcp Connector +id: io-netty +title: Netty Tcp or Udp Connector +sidebar_label: Netty Tcp or Udp Connector --- ## Source -The Netty Tcp Source connector is used to listen Tcp messages from Tcp Client and write them to user-defined Pulsar topic. +The Netty Tcp or Udp Source connector is used to listen Tcp/Udp messages from Tcp/Udp Client and write them to user-defined Pulsar topic. Also, this connector is suggested to be used in a containerized (e.g. k8s) deployment. Otherwise, if the connector is running in process or thread mode, the instances may be conflicting on listening to ports. @@ -14,6 +14,7 @@ Otherwise, if the connector is running in process or thread mode, the instances | Name | Required | Default | Description | |------|----------|---------|-------------| +| `type` | `false` | `tcp` | The tcp or udp network protocol required by netty. | | `host` | `false` | `127.0.0.1` | The host name or address that the source instance to listen on. | | `port` | `false` | `10999` | The port that the source instance to listen on. | | `numberOfThreads` | `false` | `1` | The number of threads of Netty Tcp Server to accept incoming connections and handle the traffic of the accepted connections. |