Skip to content

Commit

Permalink
Address translator support and configuration related documentation (#12)
Browse files Browse the repository at this point in the history
* support for record to table mapping config

* some minor bug fixes

* error handling and ssl config support

* some typo mistake fix

* support for address translator

* documentation changes for new configs

* resolved review comments

* renamed ComposeAddressTranslator to ClusterAddressTranslator

* addressed review comments
  • Loading branch information
patitapaban19 authored Apr 7, 2020
1 parent e177741 commit aa89618
Show file tree
Hide file tree
Showing 6 changed files with 295 additions and 72 deletions.
26 changes: 24 additions & 2 deletions config/scylladb-sink-quickstart.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@ name=scylladb-sink-test
topics=<comma-separated-kafka-topics-name>
tasks.max=1
connector.class=io.connect.scylladb.ScyllaDbSinkConnector

scylladb.contact.points=<scylladb-hosts>
#Eg. scylladb.contact.points=10.0.24.69,10.0.24.70,10.0.24.71
# configure this to the public hostname of the Scylla nodes, the port will be taken from configuration scylladb.port

#scylladb.contact.points={\"private_host1:port1\",\"public_host1:port1\", \"private_host2:port2\",\"public_host2:port2\", ...}
#Eg. scylladb.contact.points={\"10.0.24.69:9042\": \"sl-eu-lon-2-portal.3.dblayer.com:15227\", \"10.0.24.71:9042\": \"sl-eu-lon-2-portal.2.dblayer.com:15229\", \"10.0.24.70:9042\": \"sl-eu-lon-2-portal.1.dblayer.com:15228\"}
# configure this to a JSON string having key-values pairs of internal private network address(es) mapped to external network address(es).

scylladb.keyspace=<keyspace-name>

### Connection based configs:
Expand All @@ -20,15 +28,29 @@ scylladb.keyspace=<keyspace-name>
#scylladb.keyspace.replication.factor=3

### SSL based configs:
#scylladb.ssl.truststore.path=<truststore-path>
#scylladb.ssl.truststore.password=<truststore-passsword>
#scylladb.ssl.provider=JDK
#scylladb.ssl.truststore.path=<truststore-path>
#scylladb.ssl.truststore.password=<truststore-password>
#scylladb.ssl.keystore.path=<keystore-path>
#scylladb.ssl.keystore.password=<keystore-password>
#scylladb.ssl.cipherSuites=<cipher-suites-to-enable>
#scylladb.ssl.openssl.keyCertChain=<ssl-certificate-path>
#ssl.openssl.privateKey=<privateKey-path>

### ScyllaDB related configs:
#behavior.on.error=FAIL

### Table related configs:
#scylladb.table.manage.enabled=true
#scylladb.table.create.compression.algorithm=NONE
#scylladb.offset.storage.table=kafka_connect_offsets

### Topic to table related configs:
#topic.my_topic.my_ks.my_table.mapping=column1=key.field1, column2=value.field1, __ttl=value.field2, __timestamp=value.field3, column3=header.field1
#topic.my_topic.my_ks.my_table.consistencyLevel=LOCAL_ONE
#topic.my_topic.my_ks.my_table.ttlSeconds=1
#topic.my_topic.my_ks.my_table.deletesEnabled=true

### Writer configs
#scylladb.consistency.level=LOCAL_QUORUM
#scylladb.deletes.enabled=true
Expand Down
88 changes: 82 additions & 6 deletions documentation/CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,15 @@ Connector-specific configuration properties are described below.

``scylladb.contact.points``

The ScyllaDB hosts to connect to. Scylla nodes use this list of hosts to find each other and learn the topology of the ring. You must change this if you are running multiple nodes.
It's essential to put at least 2 hosts in case of bigger cluster, since if first host is down, it will contact second one and get the state of the cluster from it.
Eg. When using the docker image, connect to the host it uses.
The ScyllaDB hosts to connect to. Scylla nodes use this list of hosts to find each other and learn the topology of the ring.
You must change this if you are running multiple nodes.
It's essential to put at least 2 hosts in case of bigger cluster, since if first host is down,
it will contact second one and get the state of the cluster from it.
Eg. When using the docker image, connect to the host it uses.
To connect to private Scylla nodes, provide a JSON string having all internal private network address:port mapped to
an external network address:port as key value pairs. Need to pass it as
{\"private_host1:port1\",\"public_host1:port1\",\"private_host2:port2\",\"public_host2:port2\", ...}
Eg. {\"10.0.24.69:9042\": \"sl-eu-lon-2-portal.3.dblayer.com:15227\", \"10.0.24.71:9042\": \"sl-eu-lon-2-portal.2.dblayer.com:15229\", \"10.0.24.70:9042\": \"sl-eu-lon-2-portal.1.dblayer.com:15228\"}

* Type: List
* Importance: High
Expand All @@ -32,7 +38,9 @@ Connector-specific configuration properties are described below.
* Valid Values: ValidPort{start=1, end=65535}

``scylladb.loadbalancing.localdc``
The case-sensitive Data Center name local to the machine on which the connector is running. It is a recommended config if we have more than one DC.

The case-sensitive Data Center name local to the machine on which the connector is running.
It is a recommended configuration if we have more than one DC.

* Type: string
* Default: ""
Expand All @@ -44,7 +52,7 @@ Connector-specific configuration properties are described below.

* Type: Boolean
* Importance: High
* Default Value: False
* Default Value: false

``scylladb.username``

Expand Down Expand Up @@ -82,28 +90,38 @@ Connector-specific configuration properties are described below.
###SSL

``scylladb.ssl.truststore.path``

Path to the Java Truststore.

* Type: string
* Default: ""
* Importance: medium

``scylladb.ssl.truststore.password``

Password to open the Java Truststore with.

* Type: password
* Default: [hidden]
* Importance: medium

``scylladb.ssl.provider``

The SSL Provider to use when connecting to ScyllaDB.

* Type: string
* Default: JDK
* Valid Values: [JDK, OPENSSL, OPENSSL_REFCNT]
* Importance: low

### Keyspace
###Keyspace

**Note**: Both keyspace and table names consist of only alphanumeric characters,
cannot be empty and are limited in size to 48 characters (that limit exists
mostly to avoid filenames, which may include the keyspace and table name,
to go over the limits of certain file systems). By default, keyspace and table names
are case insensitive (myTable is equivalent to mytable) but case sensitivity
can be forced by using double-quotes ("myTable" is different from mytable).

``scylladb.keyspace``

Expand Down Expand Up @@ -158,6 +176,39 @@ Connector-specific configuration properties are described below.
* Type: String
* Importance: Low
* Default: kafka_connect_offsets

###Topic to Table

These configurations can be specified for multiple Kafka topics from which records are being processed.
Also, these topic level configurations will be override the behavior of Connector level configurations such as
``scylladb.consistency.level``, ``scylladb.deletes.enabled`` and ``scylladb.ttl``

``topic.my_topic.my_ks.my_table.mapping``

For mapping topic and fields from Kafka record's key, value and headers to ScyllaDB table and its columns.

**Note**: Ensure that the data type of the Kafka record's fields are compatible with the data type of the ScyllaDB column.
In the Kafka topic mapping, you can optionally specify which column should be used as the ttl (time-to-live) and
timestamp of the record being inserted into the database table using the special property __ttl and __timestamp.
By default, the database internally tracks the write time(timestamp) of records inserted into Kafka.
However, this __timestamp feature in the mapping supports the scenario where the Kafka records have an explicit
timestamp field that you want to use as a write time for the database record produced by the connector.
Eg. "topic.my_topic.my_ks.my_table.mapping":
"column1=key.field1, column2=value.field1, __ttl=value.field2, __timestamp=value.field3, column3=header.field1"

``topic.my_topic.my_ks.my_table.consistencyLevel``

By using this property we can specify table wide consistencyLevel.

``topic.my_topic.my_ks.my_table.ttlSeconds``

By using this property we can specify table wide ttl(time-to-live).

``topic.my_topic.my_ks.my_table.deletesEnabled``

By using this property we can specify if tombstone records(records with Kafka value as null)
should processed as delete request.


###Write

Expand All @@ -171,6 +222,7 @@ Connector-specific configuration properties are described below.
* Valid Values: ``ANY``, ``ONE``, ``TWO``, ``THREE``, ``QUORUM``, ``ALL``, ``LOCAL_QUORUM``, ``EACH_QUORUM``, ``SERIAL``, ``LOCAL_SERIAL``, ``LOCAL_ONE``

``scylladb.deletes.enabled``

Flag to determine if the connector should process deletes.
The Kafka records with kafka record value as null will result in deletion of ScyllaDB record
with the primary key present in Kafka record key.
Expand Down Expand Up @@ -206,6 +258,7 @@ Connector-specific configuration properties are described below.
* Default Value: True

``scylladb.max.batch.size.kb``

Maximum size(in kilobytes) of a single batch consisting ScyllaDB operations. This should be equal to
batch_size_warn_threshold_in_kb and 1/10th of the batch_size_fail_threshold_in_kb configured in scylla.yaml.
The default value is set to 5kb, any change in this configuration should be accompanied by change in scylla.yaml.
Expand All @@ -224,6 +277,27 @@ Connector-specific configuration properties are described below.
* Importance: Low
* Valid Values: [0,...]
* Default Value: 0

###ScyllaDB

``behavior.on.error``

Error handling behavior setting. Must be configured to one of the following:

``fail``
The Connector throws ConnectException and stops processing records when an error occurs while processing or inserting records into ScyllaDB.

``ignore``
Continues to process next set of records when error occurs while processing or inserting records into ScyllaDB.

``log``
Logs the error via connect-reporter when an error occurs while processing or inserting records into ScyllaDB and continues to process next set of records, available in the kafka topics.

* Type: string
* Default: FAIL
* Valid Values: [FAIL, LOG, IGNORE]
* Importance: medium


###Confluent Platform Configurations.

Expand All @@ -232,6 +306,7 @@ Connector-specific configuration properties are described below.
The maximum number of tasks to use for the connector that helps in parallelism.

* Type:int
* Default: 1
* Importance: high

``topics``
Expand All @@ -246,6 +321,7 @@ The name of the topics to consume data from and write to ScyllaDB.
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster used for licensing. All servers in the cluster will be discovered from the initial connection. This list should be in the form <code>host1:port1,host2:port2,…</code>. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).

* Type: list
* Default: localhost:9092
* Importance: high

------------------------
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@
<version>${netty-tcnative.version}</version>
</dependency>

<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20160810</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
Expand Down
93 changes: 93 additions & 0 deletions src/main/java/io/connect/scylladb/ClusterAddressTranslator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) 2016 Compose, an IBM company
//
// MIT License
//
// Permission is hereby granted, free of charge, to any person obtaining
// a copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to
// permit persons to whom the Software is furnished to do so, subject to
// the following conditions:
//
// The above copyright notice and this permission notice shall be
// included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

package io.connect.scylladb;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.policies.AddressTranslator;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.json.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ClusterAddressTranslator implements AddressTranslator {

public Map<InetSocketAddress, InetSocketAddress> addressMap = new HashMap<>();
private static final Logger log = LoggerFactory.getLogger(ClusterAddressTranslator.class);

@Override
public void init(Cluster cluster) {
}

public void setMap(String addressMapString) {
JSONObject jsonmap;

if (addressMapString.charAt(0) == '[') {
JSONArray jsonarray = new JSONArray(addressMapString);
log.trace("Address translation map: " + jsonarray.toString());
Iterator jai = jsonarray.iterator();

while (jai.hasNext()) {
JSONObject element = (JSONObject) jai.next();
Iterator subpart = element.keys();
String internal = (String) subpart.next();
String external = element.getString(internal);
addAddresses(internal, external);
}
} else {
jsonmap = new JSONObject(addressMapString);
Iterator keys = jsonmap.keys();
while (keys.hasNext()) {
String internal = (String) keys.next();
String external = (String) jsonmap.getString(internal);
addAddresses(internal, external);
}
}
}

public void addAddresses(String internal, String external) {
String[] internalhostport = internal.split(":");
String[] externalhostport = external.split(":");
InetSocketAddress internaladdress = new InetSocketAddress(internalhostport[0], Integer.parseInt(internalhostport[1]));
InetSocketAddress externaladdress = new InetSocketAddress(externalhostport[0], Integer.parseInt(externalhostport[1]));
addressMap.put(internaladdress, externaladdress);
}

public Collection<InetSocketAddress> getContactPoints() {
return Collections.unmodifiableCollection(addressMap.values());
}

@Override
public InetSocketAddress translate(final InetSocketAddress inetSocketAddress) {
return addressMap.getOrDefault(inetSocketAddress, inetSocketAddress);
}

@Override
public void close() {
}
}
30 changes: 28 additions & 2 deletions src/main/java/io/connect/scylladb/ScyllaDbSessionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import org.apache.kafka.connect.errors.ConnectException;
import org.json.JSONObject;
import org.json.JSONException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -43,10 +45,18 @@ public class ScyllaDbSessionFactory {

public ScyllaDbSession newSession(ScyllaDbSinkConnectorConfig config) {
Cluster.Builder clusterBuilder = Cluster.builder()
.withPort(config.port)
.addContactPoints(config.contactPoints)
.withProtocolVersion(ProtocolVersion.NEWEST_SUPPORTED)
.withCodecRegistry(CODEC_REGISTRY);

try {
configureAddressTranslator(config, clusterBuilder);
} catch (JSONException e) {
log.info("Failed to configure address translator, provide a valid JSON string " +
"with external network address and port mapped to private network " +
"address and port.");
configurePublicContactPoints(config, clusterBuilder);
}

if (!config.loadBalancingLocalDc.isEmpty()) {
clusterBuilder.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy.builder()
Expand Down Expand Up @@ -131,6 +141,22 @@ public ScyllaDbSession newSession(ScyllaDbSinkConnectorConfig config) {
return new ScyllaDbSessionImpl(config, cluster, session);
}

private void configurePublicContactPoints(ScyllaDbSinkConnectorConfig config, Cluster.Builder clusterBuilder) {
log.info("Configuring public contact points={}", config.contactPoints);
String[] contactPointsArray = config.contactPoints.split(",");
clusterBuilder.withPort(config.port)
.addContactPoints(contactPointsArray);
}

private void configureAddressTranslator(ScyllaDbSinkConnectorConfig config, Cluster.Builder clusterBuilder) {
log.info("Trying to configure address translator for private network address and port.");
new JSONObject(config.contactPoints);
ClusterAddressTranslator translator = new ClusterAddressTranslator();
translator.setMap(config.contactPoints);
clusterBuilder.addContactPointsWithPorts(translator.getContactPoints())
.withAddressTranslator(translator);
}

private KeyStore createKeyStore(File path, char[] password) {
KeyStore keyStore;
try {
Expand Down
Loading

0 comments on commit aa89618

Please sign in to comment.