Skip to content

Commit

Permalink
[pulsar-io] improve hbase sink performance (apache#5705)
Browse files Browse the repository at this point in the history
utf7 authored and murong00 committed Nov 20, 2019
1 parent 91621bc commit 22a70c2
Showing 1 changed file with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
package org.apache.pulsar.io.hbase.sink;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hbase.client.Put;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.BooleanSchema;
@@ -57,11 +58,16 @@ public void bindValue(Record<GenericRecord> message, List<Put> puts) throws Exce
byte[] familyValueBytes = getBytes(familyName);

List<String> qualifierNames = tableDefinition.getQualifierNames();
for (String qualifierName : qualifierNames) {
Object qualifierValue = record.getField(qualifierName);
if (null != qualifierValue) {
Put put = new Put(getBytes(rowKeyValue));
put.addColumn(familyValueBytes, getBytes(qualifierName), getBytes(qualifierValue));
if (CollectionUtils.isNotEmpty(qualifierNames)) {
Put put = new Put(getBytes(rowKeyValue));
for (String qualifierName : qualifierNames) {
Object qualifierValue = record.getField(qualifierName);
if (null != qualifierValue) {
put.addColumn(familyValueBytes, getBytes(qualifierName),
getBytes(qualifierValue));
}
}
if (CollectionUtils.isNotEmpty(put.getFamilyCellMap().values())) {
puts.add(put);
}
}

0 comments on commit 22a70c2

Please sign in to comment.