From fe952a4a3cad696fb41d37ed315c250223348fe4 Mon Sep 17 00:00:00 2001 From: Andy Walker Date: Wed, 18 Nov 2020 21:53:05 -0500 Subject: [PATCH] [pulsar-io] Add NSQ Source (#8250) ### Motivation This PR adds a source representing an [NSQ](https://nsq.io/) topic to be mirrored to a pulsar topic. ### Modifications With the exception of adding it as a module to `pulsar-io/pom.xml`, there are no modifications to existing code. --- distribution/io/src/assemble/io.xml | 1 + pom.xml | 1 + pulsar-io/nsq/pom.xml | 74 ++++++++++++ .../org/apache/pulsar/io/nsq/NSQSource.java | 109 +++++++++++++++++ .../apache/pulsar/io/nsq/NSQSourceConfig.java | 112 ++++++++++++++++++ .../resources/META-INF/services/pulsar-io.yml | 4 + .../apache/pulsar/io/nsq/NSQConfigTests.java | 107 +++++++++++++++++ .../nsq/src/test/resources/sourceConfig.yaml | 22 ++++ pulsar-io/pom.xml | 1 + site2/docs/io-connectors.md | 6 + site2/docs/io-nsq-source.md | 20 ++++ site2/docs/io-nsq.md | 5 + site2/website/data/connectors.js | 6 + 13 files changed, 468 insertions(+) create mode 100644 pulsar-io/nsq/pom.xml create mode 100644 pulsar-io/nsq/src/main/java/org/apache/pulsar/io/nsq/NSQSource.java create mode 100644 pulsar-io/nsq/src/main/java/org/apache/pulsar/io/nsq/NSQSourceConfig.java create mode 100644 pulsar-io/nsq/src/main/resources/META-INF/services/pulsar-io.yml create mode 100644 pulsar-io/nsq/src/test/java/org/apache/pulsar/io/nsq/NSQConfigTests.java create mode 100644 pulsar-io/nsq/src/test/resources/sourceConfig.yaml create mode 100644 site2/docs/io-nsq-source.md create mode 100644 site2/docs/io-nsq.md diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml index 3598786129f4a..1ae97bfc272a8 100644 --- a/distribution/io/src/assemble/io.xml +++ b/distribution/io/src/assemble/io.xml @@ -50,6 +50,7 @@ ${basedir}/../../pulsar-io/kafka/target/pulsar-io-kafka-${project.version}.nar ${basedir}/../../pulsar-io/kinesis/target/pulsar-io-kinesis-${project.version}.nar ${basedir}/../../pulsar-io/rabbitmq/target/pulsar-io-rabbitmq-${project.version}.nar + ${basedir}/../../pulsar-io/nsq/target/pulsar-io-nsq-${project.version}.nar ${basedir}/../../pulsar-io/jdbc/sqlite/target/pulsar-io-jdbc-sqlite-${project.version}.nar ${basedir}/../../pulsar-io/jdbc/mariadb/target/pulsar-io-jdbc-mariadb-${project.version}.nar ${basedir}/../../pulsar-io/jdbc/clickhouse/target/pulsar-io-jdbc-clickhouse-${project.version}.nar diff --git a/pom.xml b/pom.xml index 010746585b8f6..04ae478865f0b 100644 --- a/pom.xml +++ b/pom.xml @@ -180,6 +180,7 @@ flexible messaging model and an intuitive client API. 1.2.0 4.2.0 9.0.2 + 1.0 3.6.0 diff --git a/pulsar-io/nsq/pom.xml b/pulsar-io/nsq/pom.xml new file mode 100644 index 0000000000000..cad0d4d4ad490 --- /dev/null +++ b/pulsar-io/nsq/pom.xml @@ -0,0 +1,74 @@ + + + 4.0.0 + + org.apache.pulsar + pulsar-io + 2.7.0-SNAPSHOT + + + pulsar-io-nsq + Pulsar IO :: NSQ + + + + ${project.groupId} + pulsar-io-core + ${project.version} + + + + com.sproutsocial + nsq-j + ${nsq-client.version} + + + + commons-collections + commons-collections + ${commons.collections.version} + + + + org.apache.commons + commons-lang3 + 3.4 + + + + ${project.groupId} + pulsar-io-common + ${project.version} + + + + + + + + org.apache.nifi + nifi-nar-maven-plugin + + + + diff --git a/pulsar-io/nsq/src/main/java/org/apache/pulsar/io/nsq/NSQSource.java b/pulsar-io/nsq/src/main/java/org/apache/pulsar/io/nsq/NSQSource.java new file mode 100644 index 0000000000000..1b7cd87cfda5e --- /dev/null +++ b/pulsar-io/nsq/src/main/java/org/apache/pulsar/io/nsq/NSQSource.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.nsq; + +import java.io.IOException; +import java.util.Map; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.PushSource; +import org.apache.pulsar.io.core.SourceContext; +import org.apache.pulsar.io.core.annotations.Connector; +import org.apache.pulsar.io.core.annotations.IOType; + +import com.sproutsocial.nsq.Client; +import com.sproutsocial.nsq.Subscriber; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Connector( + name = "nsq", + type = IOType.SOURCE, + help = "A Simple connector moving messages from an NSQ topic to a Pulsar Topic", + configClass = NSQSourceConfig.class +) +@Slf4j +public class NSQSource extends PushSource { + + private static final Logger LOG = LoggerFactory.getLogger(NSQSource.class); + + private Subscriber subscriber; + + private Object waitObject; + + @Override + public void open(Map config, SourceContext sourceContext) throws IOException { + NSQSourceConfig nsqSourceConfig = IOConfigUtils.loadWithSecrets(config, NSQSourceConfig.class, sourceContext); + nsqSourceConfig.validate(); + + waitObject = new Object(); + startThread(nsqSourceConfig); + } + + @Override + public void close() throws Exception{ + stopThread(); + } + + private void startThread(NSQSourceConfig config) { + String[] lookupds = new String[config.getLookupds().size()]; + config.getLookupds().toArray(lookupds); + subscriber = new Subscriber(lookupds); + + Thread runnerThread = new Thread(() -> { + subscriber.subscribe(config.getTopic(), config.getChannel(), (byte[]data) ->{ + consume(new NSQRecord(data)); + }); + LOG.info("NSQ Consumer started for topic {} with channel {}", config.getTopic(), config.getChannel()); + //wait + try { + synchronized (waitObject) { + waitObject.wait(); + } + } catch (Exception e) { + LOG.info("Got an exception in waitObject"); + } + LOG.debug("Closing the NSQ connection"); + subscriber.stop(); + Client.getDefaultClient().stop(); + LOG.info("NSQ subscriber stopped"); + LOG.info("NSQ Runner Thread ending"); + }); + runnerThread.setName("NSQSubscriberRunner"); + runnerThread.start(); + } + + private void stopThread() { + LOG.info("Source closed"); + synchronized (waitObject) { + waitObject.notify(); + } + } + + @Data + static private class NSQRecord implements Record { + private final byte[] value; + } +} + diff --git a/pulsar-io/nsq/src/main/java/org/apache/pulsar/io/nsq/NSQSourceConfig.java b/pulsar-io/nsq/src/main/java/org/apache/pulsar/io/nsq/NSQSourceConfig.java new file mode 100644 index 0000000000000..b4a20d766999a --- /dev/null +++ b/pulsar-io/nsq/src/main/java/org/apache/pulsar/io/nsq/NSQSourceConfig.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.nsq; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + + +import lombok.Data; +import lombok.experimental.Accessors; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.io.core.annotations.FieldDoc; + +/** + * Configuration object for the NSQ Connector. + */ +@Data +@Accessors(chain=true) +public class NSQSourceConfig implements Serializable { + private static final long serialVersionUID = 1L; + + + @FieldDoc( + required= true, + defaultValue = "", + help = "The topic you wish to transport into pulsar" + ) + private String topic; + + @FieldDoc( + required= false, + defaultValue = "pulsar-transport-", + help = "The channel to use on the topic you want to transport" + ) + private String channel; + + @FieldDoc( + required= true, + defaultValue = "", + help = "A comma-separated list of nsqlookupd hosts to contact" + ) + private String lookupds; + + public static NSQSourceConfig load(String yamlFile) throws IOException { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + return applyDefaults(mapper.readValue(new File(yamlFile), NSQSourceConfig.class)); + } + + public static NSQSourceConfig load(Map map) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return applyDefaults(mapper.readValue(new ObjectMapper().writeValueAsString(map), NSQSourceConfig.class)); + } + + private static NSQSourceConfig applyDefaults(NSQSourceConfig config) { + if (config.channel == null) { + config.channel=String.format("pulsar-transport-%s", config.topic); + } + return config; + } + + + public void validate() throws IllegalArgumentException { + if (getChannel() == null) { + setChannel(String.format("pulsar-transport-%s", getTopic())); + } + if (getTopic() == null || getLookupds() == null || getChannel() == null){ + throw new IllegalArgumentException("Required property not set."); + } + } + + public List getLookupds(){ + if (StringUtils.isBlank(lookupds)){ + return Collections.emptyList(); + } + + List out = new ArrayList (); + for (String s: StringUtils.split(lookupds, ",")) { + out.add(StringUtils.trim(s)); + } + + if (CollectionUtils.isEmpty(out)){ + return Collections.emptyList(); + } + return out; + } +} diff --git a/pulsar-io/nsq/src/main/resources/META-INF/services/pulsar-io.yml b/pulsar-io/nsq/src/main/resources/META-INF/services/pulsar-io.yml new file mode 100644 index 0000000000000..14fb48e608ca5 --- /dev/null +++ b/pulsar-io/nsq/src/main/resources/META-INF/services/pulsar-io.yml @@ -0,0 +1,4 @@ +name: nsq +description: Ingest data from an NSQ topic +sourceClass: org.apache.pulsar.io.nsq.NSQSource +sourceConfigClass: org.apache.pulsar.io.nsq.NSQConfig diff --git a/pulsar-io/nsq/src/test/java/org/apache/pulsar/io/nsq/NSQConfigTests.java b/pulsar-io/nsq/src/test/java/org/apache/pulsar/io/nsq/NSQConfigTests.java new file mode 100644 index 0000000000000..6d1c868bc6447 --- /dev/null +++ b/pulsar-io/nsq/src/test/java/org/apache/pulsar/io/nsq/NSQConfigTests.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.nsq; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.testng.annotations.Test; + +public class NSQConfigTests { + + private NSQSourceConfig config; + + @Test + public final void loadFromYamlFileTest() throws IOException { + File yamlFile = getFile("sourceConfig.yaml"); + config = NSQSourceConfig.load(yamlFile.getAbsolutePath()); + assertNotNull(config); + } + + @Test + public final void loadFromMapTest() throws IOException { + Map map = new HashMap (); + map.put("topic", "xxx"); + map.put("channel", "xxx"); + map.put("lookupds", "xxx"); + + config = NSQSourceConfig.load(map); + + assertNotNull(config); + } + + @Test + public final void defaultValuesTest() throws IOException { + Map map = new HashMap (); + map.put("topic", "xxx"); + map.put("lookupds", "xxx"); + + config = NSQSourceConfig.load(map); + + assertNotNull(config); + assertEquals(config.getChannel(), "pulsar-transport-xxx"); + } + + @Test + public final void validValidateTest() throws IOException { + Map map = new HashMap (); + map.put("topic", "xxx"); + map.put("channel", "xxx"); + map.put("lookupds", "xxx"); + + config = NSQSourceConfig.load(map); + config.validate(); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "Required property not set.") + public final void missingConsumerKeyValidateTest() throws IOException { + Map map = new HashMap (); + + config = NSQSourceConfig.load(map); + config.validate(); + } + + @Test + public final void getlookupdsTest() throws IOException { + Map map = new HashMap (); + map.put("lookupds", "one,two, three"); + config = NSQSourceConfig.load(map); + + List lookupds = config.getLookupds(); + assertNotNull(lookupds); + assertEquals(lookupds.size(), 3); + assertTrue(lookupds.contains("one")); + assertTrue(lookupds.contains("two")); + assertTrue(lookupds.contains("three")); + } + + private File getFile(String name) { + ClassLoader classLoader = getClass().getClassLoader(); + return new File(classLoader.getResource(name).getFile()); + } + +} diff --git a/pulsar-io/nsq/src/test/resources/sourceConfig.yaml b/pulsar-io/nsq/src/test/resources/sourceConfig.yaml new file mode 100644 index 0000000000000..6c8bc730880d8 --- /dev/null +++ b/pulsar-io/nsq/src/test/resources/sourceConfig.yaml @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +topic: "testTopic" +lookupds: "test1,test2" +channel: "testChannel" \ No newline at end of file diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml index fd566debddb17..a22fa151afbf0 100644 --- a/pulsar-io/pom.xml +++ b/pulsar-io/pom.xml @@ -67,6 +67,7 @@ solr influxdb dynamodb + nsq diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md index 9f6aa3bef3577..e5b26e548faaf 100644 --- a/site2/docs/io-connectors.md +++ b/site2/docs/io-connectors.md @@ -95,6 +95,12 @@ Pulsar has various source connectors, which are sorted alphabetically as below. * [Java class](https://github.com/apache/pulsar/blob/master/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java) +### NSQ + +* [Configuration](io-nsq-source.md#configuration) + +* [Java class](https://github.com/apache/pulsar/blob/master/pulsar-io/nsq/src/main/java/org/apache/pulsar/io/nsq/NSQSource.java) + ### RabbitMQ * [Configuration](io-rabbitmq-source.md#configuration) diff --git a/site2/docs/io-nsq-source.md b/site2/docs/io-nsq-source.md new file mode 100644 index 0000000000000..c9ee8e0eedf84 --- /dev/null +++ b/site2/docs/io-nsq-source.md @@ -0,0 +1,20 @@ +--- +id: io-nsq-source +title: NSQ source connector +sidebar_label: NSQ source connector +--- + +The NSQ source connector receives messages from NSQ topics +and writes messages to Pulsar topics. + +## Configuration + +The configuration of the NSQ source connector has the following properties. + +### Property + +| Name | Type|Required | Default | Description +|------|----------|----------|---------|-------------| +| `lookupds` |String| true | " " (empty string) | A comma-separated list of nsqlookupds to connect to. | +| `topic` | String|true | " " (empty string) | The NSQ topic to transport. | +| `channel` | String |false | pulsar-transport-{$topic} | The channel to consume from on the provided NSQ topic. | \ No newline at end of file diff --git a/site2/docs/io-nsq.md b/site2/docs/io-nsq.md new file mode 100644 index 0000000000000..7d08f9eef0655 --- /dev/null +++ b/site2/docs/io-nsq.md @@ -0,0 +1,5 @@ +--- +id: io-nsq +title: NSQ Connector +sidebar_label: NSQ Connector +--- \ No newline at end of file diff --git a/site2/website/data/connectors.js b/site2/website/data/connectors.js index 504f3aaf1d947..9933d53a08dde 100644 --- a/site2/website/data/connectors.js +++ b/site2/website/data/connectors.js @@ -131,6 +131,12 @@ module.exports = [ type: 'Source', link: 'https://netty.io/' }, + { + name: 'nsq', + longName: 'NSQ source', + type: 'Source', + link: 'https://nsq.io/', + } { name: 'rabbitmq', longName: 'RabbitMQ source and sink',