Skip to content

Commit

Permalink
Avoid to create map object for Flume source (apache#4369)
Browse files Browse the repository at this point in the history
  • Loading branch information
liketic authored and merlimat committed May 28, 2019
1 parent dc0463c commit 6f4ed39
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -41,18 +40,18 @@ public abstract class AbstractSink<T> implements Sink<T> {

public abstract T extractValue(Record<T> record);

protected static BlockingQueue<Map<String, Object>> records;
protected static BlockingQueue<Object> records;

protected FlumeConnector flumeConnector;

public static BlockingQueue<Map<String, Object>> getQueue() {
public static BlockingQueue<Object> getQueue() {
return records;
}

@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {

records = new LinkedBlockingQueue<Map<String, Object>>();
records = new LinkedBlockingQueue<>();

FlumeConfig flumeConfig = FlumeConfig.load(config);

Expand All @@ -64,9 +63,7 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
public void write(Record<T> record) {
try {
T message = extractValue(record);
Map<String, Object> m = new HashMap();
m.put("body", message);
records.put(m);
records.put(message);
record.ack();
} catch (InterruptedException e) {
record.fail();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,19 @@
*/
package org.apache.pulsar.io.flume.sink;

import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.conf.BatchSizeSupported;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractPollableSource;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;


import com.google.common.base.Optional;

import static org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.BATCH_SIZE;


Expand All @@ -55,9 +49,6 @@ public class SourceOfFlume extends AbstractPollableSource implements BatchSizeSu

private final List<Event> eventList = new ArrayList<Event>();

private Optional<SpecificDatumReader<AvroFlumeEvent>> reader = Optional.absent();


@Override
public synchronized void doStart() {
log.info("start source of flume ...");
Expand Down Expand Up @@ -87,10 +78,10 @@ public Status doProcess() {

while (eventList.size() < this.getBatchSize() &&
System.currentTimeMillis() < maxBatchEndTime) {
BlockingQueue<Map<String, Object>> blockingQueue = StringSink.getQueue();
BlockingQueue<Object> blockingQueue = StringSink.getQueue();
while (blockingQueue != null && !blockingQueue.isEmpty()) {
Map<String, Object> message = blockingQueue.take();
eventBody = message.get("body").toString();
Object message = blockingQueue.take();
eventBody = message.toString();
event = EventBuilder.withBody(eventBody.getBytes());
eventList.add(event);
}
Expand All @@ -104,7 +95,7 @@ public Status doProcess() {
return Status.BACKOFF;

} catch (Exception e) {
log.error("Flume Source EXCEPTION, {}", e);
log.error("Flume Source EXCEPTION", e);
counter.incrementEventReadOrChannelFail(e);
return Status.BACKOFF;
}
Expand Down

0 comments on commit 6f4ed39

Please sign in to comment.