Skip to content

Commit

Permalink
[Issue 5215,4601,5215][pulsar-io] Add jdbc sinks: postgres, mariadb, …
Browse files Browse the repository at this point in the history
…clickhouse (apache#6835)

Fixes apache#5215  apache#4601 apache#5215

### Motivation

Right now, the number of supported JDBC connectors is rather limited. There are a couple of open issues related to the missing connectors. 

### Modifications

As a base for this pull request, I took the [changes](apache#5624) from @huangdx0726 Unfortunately,  I was not able to reach him to merge my changes to his branch, so I created a new one.

The main modification is a split of one JDBC module into a bunch of them. One module serves as a core and provides the main logic of the connector. Other modules are real connectors and add only an appropriate JDBC driver.
  • Loading branch information
vzhikserg authored May 18, 2020
1 parent adf920e commit 218057b
Show file tree
Hide file tree
Showing 33 changed files with 944 additions and 357 deletions.
5 changes: 4 additions & 1 deletion distribution/io/src/assemble/io.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
<file><source>${basedir}/../../pulsar-io/kafka/target/pulsar-io-kafka-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/kinesis/target/pulsar-io-kinesis-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/rabbitmq/target/pulsar-io-rabbitmq-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/jdbc/target/pulsar-io-jdbc-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/jdbc/sqlite/target/pulsar-io-jdbc-sqlite-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/jdbc/mariadb/target/pulsar-io-jdbc-mariadb-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/jdbc/clickhouse/target/pulsar-io-jdbc-clickhouse-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/jdbc/postgres/target/pulsar-io-jdbc-postgres-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/data-generator/target/pulsar-io-data-generator-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/aerospike/target/pulsar-io-aerospike-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/elastic-search/target/pulsar-io-elastic-search-${project.version}.nar</source></file>
Expand Down
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ flexible messaging model and an intuitive client API.</description>
<jclouds.version>2.2.0</jclouds.version>
<sqlite-jdbc.version>3.8.11.2</sqlite-jdbc.version>
<mysql-jdbc.version>8.0.11</mysql-jdbc.version>
<postgresql-jdbc.version>42.2.12</postgresql-jdbc.version>
<clickhouse-jdbc.version>0.2.4</clickhouse-jdbc.version>
<mariadb-jdbc.version>2.6.0</mariadb-jdbc.version>
<hdfs-offload-version3>3.2.0</hdfs-offload-version3>
<org.eclipse.jetty-hdfs-offload>9.3.24.v20180605</org.eclipse.jetty-hdfs-offload>
<test-hdfs-offload-jetty>9.3.24.v20180605</test-hdfs-offload-jetty>
Expand Down Expand Up @@ -1052,6 +1055,11 @@ flexible messaging model and an intuitive client API.</description>
<artifactId>mysql</artifactId>
<version>${testcontainers.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>${testcontainers.version}</version>
</dependency>
<dependency>
<groupId>org.arquillian.cube</groupId>
<artifactId>arquillian-cube-docker</artifactId>
Expand Down
17 changes: 16 additions & 1 deletion pulsar-io/docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,22 @@
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-jdbc</artifactId>
<artifactId>pulsar-io-jdbc-clickhouse</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-jdbc-mariadb</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-jdbc-postgres</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-jdbc-sqlite</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Expand Down
56 changes: 56 additions & 0 deletions pulsar-io/jdbc/clickhouse/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>pulsar-io-jdbc</artifactId>
<groupId>org.apache.pulsar</groupId>
<version>2.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>pulsar-io-jdbc-clickhouse</artifactId>
<name>Pulsar IO :: Jdbc :: ClickHouse</name>

<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-io-jdbc-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>${clickhouse-jdbc.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.jdbc;

import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;

@Connector(
name = "jdbc-clickhouse",
type = IOType.SINK,
help = "A simple JDBC sink for ClickHouse that writes pulsar messages to a database table",
configClass = JdbcSinkConfig.class
)
public class ClickHouseJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

name: jdbc-clickhouse
description: JDBC sink for ClickHouse
sinkClass: org.apache.pulsar.io.jdbc.ClickHouseJdbcAutoSchemaSink
58 changes: 58 additions & 0 deletions pulsar-io/jdbc/core/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>pulsar-io-jdbc</artifactId>
<groupId>org.apache.pulsar</groupId>
<version>2.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>pulsar-io-jdbc-core</artifactId>
<name>Pulsar IO :: Jdbc :: Core</name>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,19 @@
package org.apache.pulsar.io.jdbc;

import java.sql.PreparedStatement;
import java.sql.Types;
import java.util.List;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;

/**
* A Simple Jdbc sink, which interprets input Record in generic record.
* An abstract Jdbc sink, which interprets input Record in generic record.
*/
@Connector(
name = "jdbc",
type = IOType.SINK,
help = "A simple JDBC sink that writes pulsar messages to a database table",
configClass = JdbcSinkConfig.class
)
@Slf4j
public class JdbcAutoSchemaSink extends JdbcAbstractSink<GenericRecord> {
public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericRecord> {

@Override
public void bindValue(PreparedStatement statement,
Expand Down Expand Up @@ -83,13 +74,13 @@ public void bindValue(PreparedStatement statement,
}
setColumnNull(statement, index++, colType);
}

}
}

private static void setColumnNull(PreparedStatement statement, int index, int type) throws Exception {
if (log.isDebugEnabled()) {
log.debug("Setting column value to null, statement: {}, index: {}, value: {}", statement.toString(), index);
log.debug("Setting column value to null, statement: {}, index: {}", statement.toString(), index);
}
statement.setNull(index, type);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.collect.Lists;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Arrays;
import java.util.List;
Expand All @@ -37,8 +38,7 @@
import org.apache.pulsar.io.core.SinkContext;

/**
* A Simple abstract class for Jdbc sink
* Users need to implement extractKeyValue function to use this sink
* A Simple abstract class for Jdbc sink.
*/
@Slf4j
public abstract class JdbcAbstractSink<T> implements Sink<T> {
Expand All @@ -51,8 +51,8 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {

private JdbcUtils.TableId tableId;
private PreparedStatement insertStatement;
private PreparedStatement updateStatment;
private PreparedStatement deleteStatment;
private PreparedStatement updateStatement;
private PreparedStatement deleteStatement;


protected static final String ACTION = "ACTION";
Expand All @@ -66,7 +66,6 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
private List<Record<T>> incomingList;
private List<Record<T>> swapList;
private AtomicBoolean isFlushing;
private int timeoutMs;
private int batchSize;
private ScheduledExecutorService flushExecutor;

Expand All @@ -89,7 +88,9 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
properties.setProperty("password", password);
}

connection = JdbcUtils.getConnection(jdbcUrl, properties);

Class.forName(JdbcUtils.getDriverClassName(jdbcSinkConfig.getJdbcUrl()));
connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), properties);
connection.setAutoCommit(false);
log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, connection.getAutoCommit());

Expand All @@ -98,14 +99,14 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
// Init PreparedStatement include insert, delete, update
initStatement();

timeoutMs = jdbcSinkConfig.getTimeoutMs();
int timeoutMs = jdbcSinkConfig.getTimeoutMs();
batchSize = jdbcSinkConfig.getBatchSize();
incomingList = Lists.newArrayList();
swapList = Lists.newArrayList();
isFlushing = new AtomicBoolean(false);

flushExecutor = Executors.newScheduledThreadPool(1);
flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);
flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);
}

private void initStatement() throws Exception {
Expand All @@ -123,10 +124,10 @@ private void initStatement() throws Exception {
tableDefinition = JdbcUtils.getTableDefinition(connection, tableId, keyList, nonKeyList);
insertStatement = JdbcUtils.buildInsertStatement(connection, JdbcUtils.buildInsertSql(tableDefinition));
if (!nonKeyList.isEmpty()) {
updateStatment = JdbcUtils.buildUpdateStatement(connection, JdbcUtils.buildUpdateSql(tableDefinition));
updateStatement = JdbcUtils.buildUpdateStatement(connection, JdbcUtils.buildUpdateSql(tableDefinition));
}
if (!keyList.isEmpty()) {
deleteStatment = JdbcUtils.buildDeleteStatement(connection, JdbcUtils.buildDeleteSql(tableDefinition));
deleteStatement = JdbcUtils.buildDeleteStatement(connection, JdbcUtils.buildDeleteSql(tableDefinition));
}
}

Expand All @@ -150,7 +151,7 @@ public void write(Record<T> record) throws Exception {
number = incomingList.size();
}
if (number == batchSize) {
flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS);
flushExecutor.schedule(this::flush, 0, TimeUnit.MILLISECONDS);
}
}

Expand Down Expand Up @@ -187,14 +188,14 @@ private void flush() {
}
switch (action) {
case DELETE:
bindValue(deleteStatment, record, action);
bindValue(deleteStatement, record, action);
count += 1;
deleteStatment.execute();
deleteStatement.execute();
break;
case UPDATE:
bindValue(updateStatment, record, action);
bindValue(updateStatement, record, action);
count += 1;
updateStatment.execute();
updateStatement.execute();
break;
case INSERT:
bindValue(insertStatement, record, action);
Expand All @@ -208,10 +209,10 @@ private void flush() {
}
}
connection.commit();
swapList.forEach(tRecord -> tRecord.ack());
swapList.forEach(Record::ack);
} catch (Exception e) {
log.error("Got exception ", e);
swapList.forEach(tRecord -> tRecord.fail());
swapList.forEach(Record::fail);
}

if (swapList.size() != count) {
Expand Down
Loading

0 comments on commit 218057b

Please sign in to comment.