Skip to content

Commit

Permalink
FLUME-2072. JMX metrics support for HBase Sink
Browse files Browse the repository at this point in the history
(Sravya Tirukkovalur via Hari Shreedharan)
  • Loading branch information
harishreedharan committed Jun 7, 2013
1 parent e442c29 commit 791f443
Showing 1 changed file with 23 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
Expand Down Expand Up @@ -88,7 +88,6 @@ public class HBaseSink extends AbstractSink implements Configurable {
private HTable table;
private long batchSize;
private Configuration config;
private CounterGroup counterGroup = new CounterGroup();
private static final Logger logger = LoggerFactory.getLogger(HBaseSink.class);
private HbaseEventSerializer serializer;
private String eventSerializerType;
Expand All @@ -97,6 +96,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
private String kerberosKeytab;
private User hbaseUser;
private boolean enableWal = true;
private SinkCounter sinkCounter;

public HBaseSink(){
this(HBaseConfiguration.create());
Expand All @@ -116,6 +116,7 @@ public void start(){
kerberosPrincipal, kerberosKeytab);
}
} catch (Exception ex) {
sinkCounter.incrementConnectionFailedCount();
throw new FlumeException("Failed to login to HBase using "
+ "provided credentials.", ex);
}
Expand All @@ -131,6 +132,7 @@ public HTable run() throws Exception {
}
});
} catch (Exception e) {
sinkCounter.incrementConnectionFailedCount();
logger.error("Could not load table, " + tableName +
" from HBase", e);
throw new FlumeException("Could not load table, " + tableName +
Expand All @@ -149,13 +151,16 @@ public Boolean run() throws IOException {
} catch (Exception e) {
//Get getTableDescriptor also throws IOException, so catch the IOException
//thrown above or by the getTableDescriptor() call.
sinkCounter.incrementConnectionFailedCount();
throw new FlumeException("Error getting column family from HBase."
+ "Please verify that the table " + tableName + " and Column Family, "
+ Bytes.toString(columnFamily) + " exists in HBase, and the"
+ " current user has permissions to access that table.", e);
}

super.start();
sinkCounter.incrementConnectionCreatedCount();
sinkCounter.start();
}

@Override
Expand All @@ -166,6 +171,8 @@ public void stop(){
} catch (IOException e) {
throw new FlumeException("Error closing table.", e);
}
sinkCounter.incrementConnectionClosedCount();
sinkCounter.stop();
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -214,6 +221,7 @@ public void configure(Context context){
"writes to HBase will have WAL disabled, and any data in the " +
"memstore of this region in the Region Server could be lost!");
}
sinkCounter = new SinkCounter(this.getName());
}

@Override
Expand All @@ -224,18 +232,28 @@ public Status process() throws EventDeliveryException {
List<Row> actions = new LinkedList<Row>();
List<Increment> incs = new LinkedList<Increment>();
txn.begin();
for(long i = 0; i < batchSize; i++) {
long i = 0;
for(; i < batchSize; i++) {
Event event = channel.take();
if(event == null){
status = Status.BACKOFF;
counterGroup.incrementAndGet("channel.underflow");
if (i == 0) {
sinkCounter.incrementBatchEmptyCount();
} else {
sinkCounter.incrementBatchUnderflowCount();
}
break;
} else {
serializer.initialize(event, columnFamily);
actions.addAll(serializer.getActions());
incs.addAll(serializer.getIncrements());
}
}
if (i == batchSize) {
sinkCounter.incrementBatchCompleteCount();
}
sinkCounter.addToEventDrainAttemptCount(i);

putEventsAndCommit(actions, incs, txn);
return status;
}
Expand Down Expand Up @@ -272,15 +290,14 @@ public Void run() throws Exception {
});

txn.commit();
counterGroup.incrementAndGet("transaction.success");
sinkCounter.addToEventDrainSuccessCount(actions.size());
} catch (Throwable e) {
try{
txn.rollback();
} catch (Exception e2) {
logger.error("Exception in rollback. Rollback might not have been" +
"successful." , e2);
}
counterGroup.incrementAndGet("transaction.rollback");
logger.error("Failed to commit transaction." +
"Transaction rolled back.", e);
if(e instanceof Error || e instanceof RuntimeException){
Expand Down

0 comments on commit 791f443

Please sign in to comment.