diff --git a/pulsar-io/core/pom.xml b/pulsar-io/core/pom.xml
index 30da3d527a88a..003e6d861141b 100644
--- a/pulsar-io/core/pom.xml
+++ b/pulsar-io/core/pom.xml
@@ -47,6 +47,19 @@
+
+
+ ${project.groupId}
+ protobuf-shaded
+ ${project.version}
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+
+
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index 5a433122b3c29..eaa2b9184529d 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -35,6 +35,7 @@
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.pulsar.io.core.RecordContext;
import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,9 +67,17 @@
* which accepts json-map of credentials in awsCredentialPluginParam
* eg: awsCredentialPluginParam = {"accessKey":"my-access-key","secretKey":"my-secret-key"}
* 5. awsCredentialPluginParam: json-parameters to initialize {@link AwsCredentialProviderPlugin}
- *
+ * 6. messageFormat: enum:["ONLY_RAW_PAYLOAD","FULL_MESSAGE_IN_JSON"]
+ * a. ONLY_RAW_PAYLOAD: publishes raw payload to stream
+ * b. FULL_MESSAGE_IN_JSON: publish full message (encryptionCtx + properties + payload) in json format
+ * json-schema:
+ * {"type":"object","properties":{"encryptionCtx":{"type":"object","properties":{"metadata":{"type":"object","additionalProperties":{"type":"string"}},"uncompressedMessageSize":{"type":"integer"},"keysMetadataMap":{"type":"object","additionalProperties":{"type":"object","additionalProperties":{"type":"string"}}},"keysMapBase64":{"type":"object","additionalProperties":{"type":"string"}},"encParamBase64":{"type":"string"},"compressionType":{"type":"string","enum":["NONE","LZ4","ZLIB"]},"batchSize":{"type":"integer"},"algorithm":{"type":"string"}}},"payloadBase64":{"type":"string"},"properties":{"type":"object","additionalProperties":{"type":"string"}}}}
+ * Example:
+ * {"payloadBase64":"cGF5bG9hZA==","properties":{"prop1":"value"},"encryptionCtx":{"keysMapBase64":{"key1":"dGVzdDE=","key2":"dGVzdDI="},"keysMetadataMap":{"key1":{"ckms":"cmks-1","version":"v1"},"key2":{"ckms":"cmks-2","version":"v2"}},"metadata":{"ckms":"cmks-1","version":"v1"},"encParamBase64":"cGFyYW0=","algorithm":"algo","compressionType":"LZ4","uncompressedMessageSize":10,"batchSize":10}}
*
*
+ *
+ *
*/
public class KinesisSink implements Sink {
@@ -92,7 +101,8 @@ public void write(RecordContext inputRecordContext, byte[] value) throws Excepti
? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
: partitionedKey; // partitionedKey Length must be at least one, and at most 256
ListenableFuture addRecordResult = kinesisProducer.addUserRecord(this.streamName,
- partitionedKey, ByteBuffer.wrap(value));
+ partitionedKey,
+ ByteBuffer.wrap(createKinesisMessage(kinesisSinkConfig.getMessageFormat(), inputRecordContext, value)));
addCallback(addRecordResult,
ProducerSendCallback.create(this.streamName, inputRecordContext, System.nanoTime()), directExecutor());
if (LOG.isDebugEnabled()) {
@@ -263,4 +273,13 @@ public void refresh() {
};
}
+ public static byte[] createKinesisMessage(MessageFormat msgFormat, RecordContext recordCtx, byte[] data) {
+ if (MessageFormat.FULL_MESSAGE_IN_JSON.equals(msgFormat)) {
+ return Utils.serializeRecordToJson(recordCtx, data).getBytes();
+ } else {
+ // send raw-message
+ return data;
+ }
+ }
+
}
\ No newline at end of file
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index b7dbad441b665..e3c8fde2637bf 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -44,6 +44,7 @@ public class KinesisSinkConfig implements Serializable {
private String awsKinesisStreamName;
private String awsCredentialPluginName;
private String awsCredentialPluginParam;
+ private MessageFormat messageFormat = MessageFormat.ONLY_RAW_PAYLOAD; // default : ONLY_RAW_PAYLOAD
public static KinesisSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
@@ -53,5 +54,28 @@ public static KinesisSinkConfig load(String yamlFile) throws IOException {
public static KinesisSinkConfig load(Map map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(new ObjectMapper().writeValueAsString(map), KinesisSinkConfig.class);
- }
+ }
+
+ /**
+ * Message format in which kinesis-sink converts pulsar-message and publishes to kinesis stream.
+ *
+ */
+ public static enum MessageFormat {
+ /**
+ * Kinesis sink directly publishes pulsar-payload as a message into the kinesis-stream
+ */
+ ONLY_RAW_PAYLOAD,
+ /**
+ * Kinesis sink creates a json payload with message-payload, properties and encryptionCtx and publishes json
+ * payload to kinesis stream.
+ *
+ * schema:
+ * {"type":"object","properties":{"encryptionCtx":{"type":"object","properties":{"metadata":{"type":"object","additionalProperties":{"type":"string"}},"uncompressedMessageSize":{"type":"integer"},"keysMetadataMap":{"type":"object","additionalProperties":{"type":"object","additionalProperties":{"type":"string"}}},"keysMapBase64":{"type":"object","additionalProperties":{"type":"string"}},"encParamBase64":{"type":"string"},"compressionType":{"type":"string","enum":["NONE","LZ4","ZLIB"]},"batchSize":{"type":"integer"},"algorithm":{"type":"string"}}},"payloadBase64":{"type":"string"},"properties":{"type":"object","additionalProperties":{"type":"string"}}}}
+ * Example:
+ * {"payloadBase64":"cGF5bG9hZA==","properties":{"prop1":"value"},"encryptionCtx":{"keysMapBase64":{"key1":"dGVzdDE=","key2":"dGVzdDI="},"keysMetadataMap":{"key1":{"ckms":"cmks-1","version":"v1"},"key2":{"ckms":"cmks-2","version":"v2"}},"metadata":{"ckms":"cmks-1","version":"v1"},"encParamBase64":"cGFyYW0=","algorithm":"algo","compressionType":"LZ4","uncompressedMessageSize":10,"batchSize":10}}
+ *
+ *
+ */
+ FULL_MESSAGE_IN_JSON;
+ }
}
\ No newline at end of file
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
new file mode 100644
index 0000000000000..82360801db341
--- /dev/null
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kinesis;
+
+import static java.util.Base64.getEncoder;
+
+import java.util.Map;
+
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.io.core.RecordContext;
+
+import com.google.gson.JsonObject;
+
+public class Utils {
+
+ private static final String PAYLOAD_FIELD = "payloadBase64";
+ private static final String PROPERTIES_FIELD = "properties";
+ private static final String KEY_MAP_FIELD = "keysMapBase64";
+ private static final String KEY_METADATA_MAP_FIELD = "keysMetadataMap";
+ private static final String METADATA_FIELD = "metadata";
+ private static final String ENCRYPTION_PARAM_FIELD = "encParamBase64";
+ private static final String ALGO_FIELD = "algorithm";
+ private static final String COMPRESSION_TYPE_FIELD = "compressionType";
+ private static final String UNCPRESSED_MSG_SIZE_FIELD = "uncompressedMessageSize";
+ private static final String BATCH_SIZE_FIELD = "batchSize";
+ private static final String ENCRYPTION_CTX_FIELD = "encryptionCtx";
+
+ /**
+ * Serializes sink-record into json format. It encodes encryption-keys, encryption-param and payload in base64
+ * format so, it can be sent in json.
+ *
+ * @param inputRecordContext
+ * @param data
+ * @return
+ */
+ public static String serializeRecordToJson(RecordContext inputRecordContext, byte[] data) {
+ if (inputRecordContext == null) {
+ return null;
+ }
+ JsonObject result = new JsonObject();
+ result.addProperty(PAYLOAD_FIELD, getEncoder().encodeToString(data));
+ if (inputRecordContext.getProperties() != null) {
+ JsonObject properties = new JsonObject();
+ inputRecordContext.getProperties().entrySet()
+ .forEach(e -> properties.addProperty(e.getKey(), e.getValue()));
+ result.add(PROPERTIES_FIELD, properties);
+ }
+ if (inputRecordContext.getEncryptionCtx().isPresent()) {
+ EncryptionContext encryptionCtx = inputRecordContext.getEncryptionCtx().get();
+ JsonObject encryptionCtxJson = new JsonObject();
+ JsonObject keyBase64Map = new JsonObject();
+ JsonObject keyMetadataMap = new JsonObject();
+ encryptionCtx.getKeys().entrySet().forEach(entry -> {
+ keyBase64Map.addProperty(entry.getKey(), getEncoder().encodeToString(entry.getValue().getKeyValue()));
+ Map keyMetadata = entry.getValue().getMetadata();
+ if (keyMetadata != null && !keyMetadata.isEmpty()) {
+ JsonObject metadata = new JsonObject();
+ entry.getValue().getMetadata().entrySet()
+ .forEach(m -> metadata.addProperty(m.getKey(), m.getValue()));
+ keyMetadataMap.add(entry.getKey(), metadata);
+ }
+ });
+ encryptionCtxJson.add(KEY_MAP_FIELD, keyBase64Map);
+ encryptionCtxJson.add(KEY_METADATA_MAP_FIELD, keyMetadataMap);
+ Map metadataMap = encryptionCtx.getMetadata();
+ if (metadataMap != null && !metadataMap.isEmpty()) {
+ JsonObject metadata = new JsonObject();
+ encryptionCtx.getMetadata().entrySet().forEach(m -> metadata.addProperty(m.getKey(), m.getValue()));
+ encryptionCtxJson.add(METADATA_FIELD, metadata);
+ }
+ encryptionCtxJson.addProperty(ENCRYPTION_PARAM_FIELD,
+ getEncoder().encodeToString(encryptionCtx.getParam()));
+ encryptionCtxJson.addProperty(ALGO_FIELD, encryptionCtx.getAlgorithm());
+ if (encryptionCtx.getCompressionType() != null) {
+ encryptionCtxJson.addProperty(COMPRESSION_TYPE_FIELD, encryptionCtx.getCompressionType().name());
+ encryptionCtxJson.addProperty(UNCPRESSED_MSG_SIZE_FIELD, encryptionCtx.getUncompressedMessageSize());
+ }
+ if (encryptionCtx.getBatchSize().isPresent()) {
+ encryptionCtxJson.addProperty(BATCH_SIZE_FIELD, encryptionCtx.getBatchSize().get());
+ }
+ result.add(ENCRYPTION_CTX_FIELD, encryptionCtxJson);
+ }
+ return result.toString();
+ }
+
+}
\ No newline at end of file
diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
new file mode 100644
index 0000000000000..e3f1160d322d3
--- /dev/null
+++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kinesis;
+
+import static java.util.Base64.getDecoder;
+
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
+import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
+import org.apache.pulsar.io.core.RecordContext;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+import com.google.gson.Gson;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+/**
+ * Unit test of {@link UtilsTest}.
+ */
+public class UtilsTest {
+
+ @Test
+ public void testJsonSerialization() throws Exception {
+
+ final String key1 = "key1";
+ final String key2 = "key2";
+ final String key1Value = "test1";
+ final String key2Value = "test2";
+ final String param = "param";
+ final String algo = "algo";
+
+ // prepare encryption-ctx
+ EncryptionContext ctx = new EncryptionContext();
+ ctx.setAlgorithm(algo);
+ ctx.setBatchSize(Optional.of(10));
+ ctx.setCompressionType(CompressionType.LZ4);
+ ctx.setUncompressedMessageSize(10);
+ Map keys = Maps.newHashMap();
+ EncryptionKey encKeyVal = new EncryptionKey();
+ encKeyVal.setKeyValue(key1Value.getBytes());
+ Map metadata1 = Maps.newHashMap();
+ metadata1.put("version", "v1");
+ metadata1.put("ckms", "cmks-1");
+ encKeyVal.setMetadata(metadata1);
+ EncryptionKey encKeyVal2 = new EncryptionKey();
+ encKeyVal2.setKeyValue(key2Value.getBytes());
+ Map metadata2 = Maps.newHashMap();
+ metadata2.put("version", "v2");
+ metadata2.put("ckms", "cmks-2");
+ encKeyVal2.setMetadata(metadata2);
+ keys.put(key1, encKeyVal);
+ keys.put(key2, encKeyVal2);
+ ctx.setKeys(keys);
+ ctx.setMetadata(metadata1);
+ ctx.setParam(param.getBytes());
+
+ // serialize to json
+ byte[] data = "payload".getBytes();
+ Map properties = Maps.newHashMap();
+ properties.put("prop1", "value");
+ RecordContext recordCtx = new RecordContextImpl(properties, ctx);
+ String json = Utils.serializeRecordToJson(recordCtx, data);
+ System.out.println(json);
+
+ // deserialize from json and assert
+ KinesisMessageResponse kinesisJsonResponse = deSerializeRecordFromJson(json);
+ Assert.assertEquals(data, getDecoder().decode(kinesisJsonResponse.getPayloadBase64()));
+ EncryptionCtx encryptionCtxDeser = kinesisJsonResponse.getEncryptionCtx();
+ Assert.assertEquals(key1Value.getBytes(), getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(key1)));
+ Assert.assertEquals(key2Value.getBytes(), getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(key2)));
+ Assert.assertEquals(param.getBytes(), getDecoder().decode(encryptionCtxDeser.getEncParamBase64()));
+ Assert.assertEquals(algo, encryptionCtxDeser.getAlgorithm());
+ Assert.assertEquals(metadata1, encryptionCtxDeser.getKeysMetadataMap().get(key1));
+ Assert.assertEquals(metadata2, encryptionCtxDeser.getKeysMetadataMap().get(key2));
+ Assert.assertEquals(metadata1, encryptionCtxDeser.getMetadata());
+ Assert.assertEquals(properties, kinesisJsonResponse.getProperties());
+
+ }
+
+ class RecordContextImpl implements RecordContext {
+ Map properties;
+ Optional ectx;
+
+ public RecordContextImpl(Map properties, EncryptionContext ectx) {
+ this.properties = properties;
+ this.ectx = Optional.of(ectx);
+ }
+
+ public Map getProperties() {
+ return properties;
+ }
+
+ public Optional getEncryptionCtx() {
+ return ectx;
+ }
+ }
+
+ public static KinesisMessageResponse deSerializeRecordFromJson(String jsonRecord) {
+ if (StringUtils.isNotBlank(jsonRecord)) {
+ return new Gson().fromJson(jsonRecord, KinesisMessageResponse.class);
+ }
+ return null;
+ }
+
+ @ToString
+ @Setter
+ @Getter
+ public static class KinesisMessageResponse {
+ // Encryption-context if message has been encrypted
+ private EncryptionCtx encryptionCtx;
+ // user-properties
+ private Map properties;
+ // base64 encoded payload
+ private String payloadBase64;
+ }
+
+ @ToString
+ @Setter
+ @Getter
+ public static class EncryptionCtx {
+ // map of encryption-key value. (key-value is base64 encoded)
+ private Map keysMapBase64;
+ // map of encryption-key metadata
+ private Map> keysMetadataMap;
+ // encryption-ctx metadata
+ private Map metadata;
+ // encryption param which is base64 encoded
+ private String encParamBase64;
+ // encryption algorithm
+ private String algorithm;
+ // compression type if message is compressed
+ private CompressionType compressionType;
+ private int uncompressedMessageSize;
+ // number of messages in the batch if msg is batched message
+ private Integer batchSize;
+ }
+
+}