Skip to content

Commit

Permalink
Kinesis sink publish full json message (apache#2079)
Browse files Browse the repository at this point in the history
* Kinesis sink publish full json message

* fix pulsar typo
  • Loading branch information
rdhabalia authored Jul 6, 2018
1 parent 3e9bac6 commit da8981c
Show file tree
Hide file tree
Showing 5 changed files with 324 additions and 3 deletions.
13 changes: 13 additions & 0 deletions pulsar-io/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,19 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>protobuf-shaded</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.pulsar.io.core.RecordContext;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -66,9 +67,17 @@
* which accepts json-map of credentials in awsCredentialPluginParam
* eg: awsCredentialPluginParam = {"accessKey":"my-access-key","secretKey":"my-secret-key"}
* 5. <b>awsCredentialPluginParam:</b> json-parameters to initialize {@link AwsCredentialProviderPlugin}
*
* 6. messageFormat: enum:["ONLY_RAW_PAYLOAD","FULL_MESSAGE_IN_JSON"]
* a. ONLY_RAW_PAYLOAD: publishes raw payload to stream
* b. FULL_MESSAGE_IN_JSON: publish full message (encryptionCtx + properties + payload) in json format
* json-schema:
* {"type":"object","properties":{"encryptionCtx":{"type":"object","properties":{"metadata":{"type":"object","additionalProperties":{"type":"string"}},"uncompressedMessageSize":{"type":"integer"},"keysMetadataMap":{"type":"object","additionalProperties":{"type":"object","additionalProperties":{"type":"string"}}},"keysMapBase64":{"type":"object","additionalProperties":{"type":"string"}},"encParamBase64":{"type":"string"},"compressionType":{"type":"string","enum":["NONE","LZ4","ZLIB"]},"batchSize":{"type":"integer"},"algorithm":{"type":"string"}}},"payloadBase64":{"type":"string"},"properties":{"type":"object","additionalProperties":{"type":"string"}}}}
* Example:
* {"payloadBase64":"cGF5bG9hZA==","properties":{"prop1":"value"},"encryptionCtx":{"keysMapBase64":{"key1":"dGVzdDE=","key2":"dGVzdDI="},"keysMetadataMap":{"key1":{"ckms":"cmks-1","version":"v1"},"key2":{"ckms":"cmks-2","version":"v2"}},"metadata":{"ckms":"cmks-1","version":"v1"},"encParamBase64":"cGFyYW0=","algorithm":"algo","compressionType":"LZ4","uncompressedMessageSize":10,"batchSize":10}}
* </pre>
*
*
*
*/
public class KinesisSink implements Sink<byte[]> {

Expand All @@ -92,7 +101,8 @@ public void write(RecordContext inputRecordContext, byte[] value) throws Excepti
? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
: partitionedKey; // partitionedKey Length must be at least one, and at most 256
ListenableFuture<UserRecordResult> addRecordResult = kinesisProducer.addUserRecord(this.streamName,
partitionedKey, ByteBuffer.wrap(value));
partitionedKey,
ByteBuffer.wrap(createKinesisMessage(kinesisSinkConfig.getMessageFormat(), inputRecordContext, value)));
addCallback(addRecordResult,
ProducerSendCallback.create(this.streamName, inputRecordContext, System.nanoTime()), directExecutor());
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -263,4 +273,13 @@ public void refresh() {
};
}

public static byte[] createKinesisMessage(MessageFormat msgFormat, RecordContext recordCtx, byte[] data) {
if (MessageFormat.FULL_MESSAGE_IN_JSON.equals(msgFormat)) {
return Utils.serializeRecordToJson(recordCtx, data).getBytes();
} else {
// send raw-message
return data;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class KinesisSinkConfig implements Serializable {
private String awsKinesisStreamName;
private String awsCredentialPluginName;
private String awsCredentialPluginParam;
private MessageFormat messageFormat = MessageFormat.ONLY_RAW_PAYLOAD; // default : ONLY_RAW_PAYLOAD

public static KinesisSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
Expand All @@ -53,5 +54,28 @@ public static KinesisSinkConfig load(String yamlFile) throws IOException {
public static KinesisSinkConfig load(Map<String, Object> map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(new ObjectMapper().writeValueAsString(map), KinesisSinkConfig.class);
}
}

/**
* Message format in which kinesis-sink converts pulsar-message and publishes to kinesis stream.
*
*/
public static enum MessageFormat {
/**
* Kinesis sink directly publishes pulsar-payload as a message into the kinesis-stream
*/
ONLY_RAW_PAYLOAD,
/**
* Kinesis sink creates a json payload with message-payload, properties and encryptionCtx and publishes json
* payload to kinesis stream.
*
* schema:
* {"type":"object","properties":{"encryptionCtx":{"type":"object","properties":{"metadata":{"type":"object","additionalProperties":{"type":"string"}},"uncompressedMessageSize":{"type":"integer"},"keysMetadataMap":{"type":"object","additionalProperties":{"type":"object","additionalProperties":{"type":"string"}}},"keysMapBase64":{"type":"object","additionalProperties":{"type":"string"}},"encParamBase64":{"type":"string"},"compressionType":{"type":"string","enum":["NONE","LZ4","ZLIB"]},"batchSize":{"type":"integer"},"algorithm":{"type":"string"}}},"payloadBase64":{"type":"string"},"properties":{"type":"object","additionalProperties":{"type":"string"}}}}
* Example:
* {"payloadBase64":"cGF5bG9hZA==","properties":{"prop1":"value"},"encryptionCtx":{"keysMapBase64":{"key1":"dGVzdDE=","key2":"dGVzdDI="},"keysMetadataMap":{"key1":{"ckms":"cmks-1","version":"v1"},"key2":{"ckms":"cmks-2","version":"v2"}},"metadata":{"ckms":"cmks-1","version":"v1"},"encParamBase64":"cGFyYW0=","algorithm":"algo","compressionType":"LZ4","uncompressedMessageSize":10,"batchSize":10}}
*
*
*/
FULL_MESSAGE_IN_JSON;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.io.kinesis;

import static java.util.Base64.getEncoder;

import java.util.Map;

import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.io.core.RecordContext;

import com.google.gson.JsonObject;

public class Utils {

private static final String PAYLOAD_FIELD = "payloadBase64";
private static final String PROPERTIES_FIELD = "properties";
private static final String KEY_MAP_FIELD = "keysMapBase64";
private static final String KEY_METADATA_MAP_FIELD = "keysMetadataMap";
private static final String METADATA_FIELD = "metadata";
private static final String ENCRYPTION_PARAM_FIELD = "encParamBase64";
private static final String ALGO_FIELD = "algorithm";
private static final String COMPRESSION_TYPE_FIELD = "compressionType";
private static final String UNCPRESSED_MSG_SIZE_FIELD = "uncompressedMessageSize";
private static final String BATCH_SIZE_FIELD = "batchSize";
private static final String ENCRYPTION_CTX_FIELD = "encryptionCtx";

/**
* Serializes sink-record into json format. It encodes encryption-keys, encryption-param and payload in base64
* format so, it can be sent in json.
*
* @param inputRecordContext
* @param data
* @return
*/
public static String serializeRecordToJson(RecordContext inputRecordContext, byte[] data) {
if (inputRecordContext == null) {
return null;
}
JsonObject result = new JsonObject();
result.addProperty(PAYLOAD_FIELD, getEncoder().encodeToString(data));
if (inputRecordContext.getProperties() != null) {
JsonObject properties = new JsonObject();
inputRecordContext.getProperties().entrySet()
.forEach(e -> properties.addProperty(e.getKey(), e.getValue()));
result.add(PROPERTIES_FIELD, properties);
}
if (inputRecordContext.getEncryptionCtx().isPresent()) {
EncryptionContext encryptionCtx = inputRecordContext.getEncryptionCtx().get();
JsonObject encryptionCtxJson = new JsonObject();
JsonObject keyBase64Map = new JsonObject();
JsonObject keyMetadataMap = new JsonObject();
encryptionCtx.getKeys().entrySet().forEach(entry -> {
keyBase64Map.addProperty(entry.getKey(), getEncoder().encodeToString(entry.getValue().getKeyValue()));
Map<String, String> keyMetadata = entry.getValue().getMetadata();
if (keyMetadata != null && !keyMetadata.isEmpty()) {
JsonObject metadata = new JsonObject();
entry.getValue().getMetadata().entrySet()
.forEach(m -> metadata.addProperty(m.getKey(), m.getValue()));
keyMetadataMap.add(entry.getKey(), metadata);
}
});
encryptionCtxJson.add(KEY_MAP_FIELD, keyBase64Map);
encryptionCtxJson.add(KEY_METADATA_MAP_FIELD, keyMetadataMap);
Map<String, String> metadataMap = encryptionCtx.getMetadata();
if (metadataMap != null && !metadataMap.isEmpty()) {
JsonObject metadata = new JsonObject();
encryptionCtx.getMetadata().entrySet().forEach(m -> metadata.addProperty(m.getKey(), m.getValue()));
encryptionCtxJson.add(METADATA_FIELD, metadata);
}
encryptionCtxJson.addProperty(ENCRYPTION_PARAM_FIELD,
getEncoder().encodeToString(encryptionCtx.getParam()));
encryptionCtxJson.addProperty(ALGO_FIELD, encryptionCtx.getAlgorithm());
if (encryptionCtx.getCompressionType() != null) {
encryptionCtxJson.addProperty(COMPRESSION_TYPE_FIELD, encryptionCtx.getCompressionType().name());
encryptionCtxJson.addProperty(UNCPRESSED_MSG_SIZE_FIELD, encryptionCtx.getUncompressedMessageSize());
}
if (encryptionCtx.getBatchSize().isPresent()) {
encryptionCtxJson.addProperty(BATCH_SIZE_FIELD, encryptionCtx.getBatchSize().get());
}
result.add(ENCRYPTION_CTX_FIELD, encryptionCtxJson);
}
return result.toString();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.kinesis;

import static java.util.Base64.getDecoder;

import java.util.Map;
import java.util.Optional;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
import org.apache.pulsar.io.core.RecordContext;
import org.testng.Assert;
import org.testng.annotations.Test;
import org.testng.collections.Maps;

import com.google.gson.Gson;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

/**
* Unit test of {@link UtilsTest}.
*/
public class UtilsTest {

@Test
public void testJsonSerialization() throws Exception {

final String key1 = "key1";
final String key2 = "key2";
final String key1Value = "test1";
final String key2Value = "test2";
final String param = "param";
final String algo = "algo";

// prepare encryption-ctx
EncryptionContext ctx = new EncryptionContext();
ctx.setAlgorithm(algo);
ctx.setBatchSize(Optional.of(10));
ctx.setCompressionType(CompressionType.LZ4);
ctx.setUncompressedMessageSize(10);
Map<String, EncryptionKey> keys = Maps.newHashMap();
EncryptionKey encKeyVal = new EncryptionKey();
encKeyVal.setKeyValue(key1Value.getBytes());
Map<String, String> metadata1 = Maps.newHashMap();
metadata1.put("version", "v1");
metadata1.put("ckms", "cmks-1");
encKeyVal.setMetadata(metadata1);
EncryptionKey encKeyVal2 = new EncryptionKey();
encKeyVal2.setKeyValue(key2Value.getBytes());
Map<String, String> metadata2 = Maps.newHashMap();
metadata2.put("version", "v2");
metadata2.put("ckms", "cmks-2");
encKeyVal2.setMetadata(metadata2);
keys.put(key1, encKeyVal);
keys.put(key2, encKeyVal2);
ctx.setKeys(keys);
ctx.setMetadata(metadata1);
ctx.setParam(param.getBytes());

// serialize to json
byte[] data = "payload".getBytes();
Map<String, String> properties = Maps.newHashMap();
properties.put("prop1", "value");
RecordContext recordCtx = new RecordContextImpl(properties, ctx);
String json = Utils.serializeRecordToJson(recordCtx, data);
System.out.println(json);

// deserialize from json and assert
KinesisMessageResponse kinesisJsonResponse = deSerializeRecordFromJson(json);
Assert.assertEquals(data, getDecoder().decode(kinesisJsonResponse.getPayloadBase64()));
EncryptionCtx encryptionCtxDeser = kinesisJsonResponse.getEncryptionCtx();
Assert.assertEquals(key1Value.getBytes(), getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(key1)));
Assert.assertEquals(key2Value.getBytes(), getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(key2)));
Assert.assertEquals(param.getBytes(), getDecoder().decode(encryptionCtxDeser.getEncParamBase64()));
Assert.assertEquals(algo, encryptionCtxDeser.getAlgorithm());
Assert.assertEquals(metadata1, encryptionCtxDeser.getKeysMetadataMap().get(key1));
Assert.assertEquals(metadata2, encryptionCtxDeser.getKeysMetadataMap().get(key2));
Assert.assertEquals(metadata1, encryptionCtxDeser.getMetadata());
Assert.assertEquals(properties, kinesisJsonResponse.getProperties());

}

class RecordContextImpl implements RecordContext {
Map<String, String> properties;
Optional<EncryptionContext> ectx;

public RecordContextImpl(Map<String, String> properties, EncryptionContext ectx) {
this.properties = properties;
this.ectx = Optional.of(ectx);
}

public Map<String, String> getProperties() {
return properties;
}

public Optional<EncryptionContext> getEncryptionCtx() {
return ectx;
}
}

public static KinesisMessageResponse deSerializeRecordFromJson(String jsonRecord) {
if (StringUtils.isNotBlank(jsonRecord)) {
return new Gson().fromJson(jsonRecord, KinesisMessageResponse.class);
}
return null;
}

@ToString
@Setter
@Getter
public static class KinesisMessageResponse {
// Encryption-context if message has been encrypted
private EncryptionCtx encryptionCtx;
// user-properties
private Map<String, String> properties;
// base64 encoded payload
private String payloadBase64;
}

@ToString
@Setter
@Getter
public static class EncryptionCtx {
// map of encryption-key value. (key-value is base64 encoded)
private Map<String, String> keysMapBase64;
// map of encryption-key metadata
private Map<String, Map<String, String>> keysMetadataMap;
// encryption-ctx metadata
private Map<String, String> metadata;
// encryption param which is base64 encoded
private String encParamBase64;
// encryption algorithm
private String algorithm;
// compression type if message is compressed
private CompressionType compressionType;
private int uncompressedMessageSize;
// number of messages in the batch if msg is batched message
private Integer batchSize;
}

}

0 comments on commit da8981c

Please sign in to comment.