Skip to content

Commit

Permalink
[doc] Generate connector yaml files for connectors (apache#3065)
Browse files Browse the repository at this point in the history
*Motivation*

Include the example yaml files in the io distribution package
  • Loading branch information
sijie authored and merlimat committed Nov 27, 2018
1 parent 78e8a2a commit 08ef448
Show file tree
Hide file tree
Showing 16 changed files with 334 additions and 5 deletions.
1 change: 1 addition & 0 deletions distribution/io/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
</goals>
<configuration>
<executable>${project.basedir}/../../src/pulsar-io-gen</executable>
<outputFile>${project.basedir}/target/pulsar-io-gen.output</outputFile>
<arguments>
<argument>conf</argument>
<argument>-o</argument>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.slf4j.MDC;

import java.net.InetSocketAddress;
Expand All @@ -39,8 +41,13 @@
import java.util.Optional;

/**
* A Simple class for mysql binlog sync to pulsar
* A Simple class for mysql binlog sync to pulsar.
*/
@Connector(
name = "canal",
type = IOType.SOURCE,
help = "The CanalSource is used for syncing mysql binlog to Pulsar.",
configClass = CanalSourceConfig.class)
@Slf4j
public class CanalSource extends PushSource<byte[]> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import org.apache.pulsar.io.core.annotations.FieldDoc;


/**
* Canal source config.
*/
@Data
@Setter
@Getter
Expand All @@ -37,14 +41,47 @@
@Accessors(chain = true)
public class CanalSourceConfig implements Serializable{

@FieldDoc(
required = true,
defaultValue = "",
help = "Username to connect to mysql database")
private String username;
@FieldDoc(
required = true,
defaultValue = "",
help = "Password to connect to mysql database")
private String password;
@FieldDoc(
required = true,
defaultValue = "",
help = "Source destination that Canal source connector connects to")
private String destination;
@FieldDoc(
required = false,
defaultValue = "",
help = "The mysql database hostname")
private String singleHostname;
@FieldDoc(
required = false,
defaultValue = "",
help = "The mysql database port")
private int singlePort;
private Boolean cluster;
@FieldDoc(
required = true,
defaultValue = "false",
help = "If setting to true, it will be talking to `zkServers` to figure out the actual database hosts."
+ " If setting to false, it will connect to the database specified by `singleHostname` and `singlePort`.")
private Boolean cluster = false;
@FieldDoc(
required = true,
defaultValue = "",
help = "The zookeeper servers that canal source connector talks to figure out the actual database hosts")
private String zkServers;
private int batchSize;
@FieldDoc(
required = false,
defaultValue = "1000",
help = "The batch size to fetch from canal.")
private int batchSize = 1000;

public static CanalSourceConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import org.apache.pulsar.io.core.annotations.FieldDoc;

@Data
@Setter
Expand All @@ -39,10 +40,30 @@ public class CassandraSinkConfig implements Serializable {

private static final long serialVersionUID = 1L;

@FieldDoc(
required = true,
defaultValue = "",
help = "A comma-separated list of cassandra hosts to connect to")
private String roots;
@FieldDoc(
required = true,
defaultValue = "",
help = "The key space used for writing pulsar messages to")
private String keyspace;
@FieldDoc(
required = true,
defaultValue = "",
help = "The key name of the cassandra column family")
private String keyname;
@FieldDoc(
required = true,
defaultValue = "",
help = "The cassandra column family name")
private String columnFamily;
@FieldDoc(
required = true,
defaultValue = "",
help = "The column name of the cassandra column family")
private String columnName;

public static CassandraSinkConfig load(String yamlFile) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@

import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;

/**
* Cassandra sink that treats incoming messages on the input topic as Strings
* and write identical key/value pairs.
*/
@Connector(
name = "cassandra",
type = IOType.SINK,
help = "The CassandraStringSink is used for moving messages from Pulsar to Cassandra.",
configClass = CassandraSinkConfig.class)
public class CassandraStringSink extends CassandraAbstractSink<String, String> {
@Override
public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
Expand Down
45 changes: 45 additions & 0 deletions pulsar-io/docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,56 @@
</dependency>

<!-- include connectors -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-canal</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-cassandra</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-data-generator</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-elastic-search</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-hdfs</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-jdbc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-kinesis</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-rabbitmq</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-twitter</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import lombok.ToString;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.io.core.annotations.FieldDoc;

/**
* Configuration class for the ElasticSearch Sink Connector.
Expand All @@ -45,16 +46,46 @@ public class ElasticSearchConfig implements Serializable {

private static final long serialVersionUID = 1L;

@FieldDoc(
required = true,
defaultValue = "",
help = "The url of elastic search cluster that the connector connects to"
)
private String elasticSearchUrl;

@FieldDoc(
required = true,
defaultValue = "",
help = "The index name that the connector writes messages to"
)
private String indexName;

@FieldDoc(
required = false,
defaultValue = "1",
help = "The number of shards of the index"
)
private int indexNumberOfShards = 1;

@FieldDoc(
required = false,
defaultValue = "1",
help = "The number of replicas of the index"
)
private int indexNumberOfReplicas = 1;

@FieldDoc(
required = false,
defaultValue = "",
help = "The username used by the connector to connect to the elastic search cluster. If username is set, a password should also be provided."
)
private String username;

@FieldDoc(
required = false,
defaultValue = "",
help = "The password used by the connector to connect to the elastic search cluster. If password is set, a username should also be provided"
)
private String password;

public static ElasticSearchConfig load(String yamlFile) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
Expand All @@ -51,6 +53,12 @@
* Users need to implement extractKeyValue function to use this sink.
* This class assumes that the input will be JSON documents
*/
@Connector(
name = "elastic_search",
type = IOType.SINK,
help = "A sink connector that sends pulsar messages to elastic search",
configClass = ElasticSearchConfig.class
)
public class ElasticSearchSink implements Sink<byte[]> {

protected static final String DOCUMENT = "doc";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,19 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;

/**
* A Simple Jdbc sink, which interprets input Record in generic record.
*/
@Connector(
name = "jdbc",
type = IOType.SINK,
help = "A simple JDBC sink that writes pulser messages to a database table",
configClass = JdbcSinkConfig.class
)
@Slf4j
public class JdbcAutoSchemaSink extends JdbcAbstractSink<GenericRecord> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import org.apache.pulsar.io.core.annotations.FieldDoc;

@Data
@Setter
Expand All @@ -39,13 +40,43 @@ public class JdbcSinkConfig implements Serializable {

private static final long serialVersionUID = 1L;

@FieldDoc(
required = false,
defaultValue = "",
help = "Username used to connect to the database specified by `jdbcUrl`"
)
private String userName;
@FieldDoc(
required = false,
defaultValue = "",
help = "Password used to connect to the database specified by `jdbcUrl`"
)
private String password;
@FieldDoc(
required = true,
defaultValue = "",
help = "The JDBC url of the database this connector connects to"
)
private String jdbcUrl;
@FieldDoc(
required = true,
defaultValue = "",
help = "The name of the table this connector writes messages to"
)
private String tableName;

// Optional
@FieldDoc(
required = false,
defaultValue = "500",
help = "The jdbc operation timeout in milliseconds"
)
private int timeoutMs = 500;
@FieldDoc(
required = false,
defaultValue = "200",
help = "The batch size of updates made to the database"
)
private int batchSize = 200;

public static JdbcSinkConfig load(String yamlFile) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -80,6 +82,12 @@
*
*
*/
@Connector(
name = "kinesis",
type = IOType.SINK,
help = "A sink connector that copies messages from Pulsar to Kinesis",
configClass = KinesisSinkConfig.class
)
public class KinesisSink implements Sink<byte[]> {

private static final Logger LOG = LoggerFactory.getLogger(KinesisSink.class);
Expand Down
Loading

0 comments on commit 08ef448

Please sign in to comment.