Skip to content

Commit

Permalink
add byte array to default pulsar functions serde (apache#1372)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucperkins authored and merlimat committed Mar 13, 2018
1 parent 55d3ff0 commit 2f6de57
Showing 1 changed file with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
public class DefaultSerDe implements SerDe<Object> {

private static final Set<Class> supportedInputTypes = new HashSet<>(Arrays.asList(
byte[].class,
Integer.class,
Double.class,
Long.class,
Expand All @@ -49,7 +50,10 @@ public DefaultSerDe(Class type) {
@Override
public Object deserialize(byte[] input) {
String data = new String(input, StandardCharsets.UTF_8);
if (type.equals(Integer.class)) {

if (type.equals(byte[].class)) {
return input;
} else if (type.equals(Integer.class)) {
return Integer.valueOf(data);
} else if (type.equals(Double.class)) {
return Double.valueOf(data);
Expand All @@ -70,7 +74,9 @@ public Object deserialize(byte[] input) {

@Override
public byte[] serialize(Object input) {
if (type.equals(Integer.class)) {
if (type.equals(byte[].class)) {
return (byte[]) input;
} else if (type.equals(Integer.class)) {
return ((Integer) input).toString().getBytes(StandardCharsets.UTF_8);
} else if (type.equals(Double.class)) {
return ((Double) input).toString().getBytes(StandardCharsets.UTF_8);
Expand Down

0 comments on commit 2f6de57

Please sign in to comment.