Skip to content

Commit

Permalink
[pulsar-io] hbase sink use BooleanSchema encode Boolean data (apache#…
Browse files Browse the repository at this point in the history
…4258)

### Motivation

1. use pulsar client BooleanSchema encode Boolean data
2. remove unused code.

### Modifications

1. Use  BooleanSchema.encode() replace Bytes.toBytes()
2. remove  unused code, eg:  required = false,
  • Loading branch information
ambition119 authored and sijie committed May 21, 2019
1 parent 706a18e commit b868bcd
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.io.hbase;

import com.google.common.base.Preconditions;
import java.io.Serializable;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand All @@ -27,8 +28,6 @@
import lombok.experimental.Accessors;
import org.apache.pulsar.io.core.annotations.FieldDoc;

import java.io.Serializable;

/**
* Configuration object for all Hbase Sink components.
*/
Expand All @@ -40,13 +39,12 @@
@Accessors(chain = true)
public class HbaseAbstractConfig implements Serializable {

private static final long serialVersionUID = 6783394446906640112L;
private static final long serialVersionUID = -8945930873383593712L;

@FieldDoc(
required = false,
defaultValue = "",
defaultValue = "hbase-site.xml",
help = "hbase system configuration 'hbase-site.xml' file")
private String hbaseConfigResources;
private String hbaseConfigResources = "hbase-site.xml";

@FieldDoc(
required = true,
Expand All @@ -55,13 +53,11 @@ public class HbaseAbstractConfig implements Serializable {
private String zookeeperQuorum;

@FieldDoc(
required = false,
defaultValue = "2181",
help = "hbase system configuration about hbase.zookeeper.property.clientPort value")
private String zookeeperClientPort = "2181";

@FieldDoc(
required = false,
defaultValue = "/hbase",
help = "hbase system configuration about zookeeper.znode.parent value")
private String zookeeperZnodeParent = "/hbase";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.BooleanSchema;
import org.apache.pulsar.client.impl.schema.DoubleSchema;
import org.apache.pulsar.client.impl.schema.FloatSchema;
import org.apache.pulsar.client.impl.schema.IntSchema;
Expand Down Expand Up @@ -77,7 +77,7 @@ private byte[] getBytes(Object value) throws Exception{
} else if (value instanceof Float) {
return FloatSchema.of().encode((Float) value);
} else if (value instanceof Boolean) {
return Bytes.toBytes((Boolean) value);
return BooleanSchema.of().encode((Boolean) value);
} else if (value instanceof String) {
return StringSchema.utf8().encode((String) value);
} else if (value instanceof Short) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;

Expand All @@ -42,7 +41,7 @@
@EqualsAndHashCode(callSuper = false)
@ToString
@Accessors(chain = true)
public class HbaseSinkConfig extends HbaseAbstractConfig implements Serializable {
public class HbaseSinkConfig extends HbaseAbstractConfig {

private static final long serialVersionUID = 1245636479605735555L;

Expand All @@ -65,13 +64,11 @@ public class HbaseSinkConfig extends HbaseAbstractConfig implements Serializable
private List<String> qualifierNames;

@FieldDoc(
required = false,
defaultValue = "1000l",
help = "The hbase operation time in milliseconds")
private long batchTimeMs = 1000l;

@FieldDoc(
required = false,
defaultValue = "200",
help = "The batch size of write to the hbase table"
)
Expand Down

0 comments on commit b868bcd

Please sign in to comment.