Skip to content

Commit

Permalink
add output relase methon when jvm shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
[email protected] committed Nov 25, 2016
1 parent a0cd06e commit c18b98a
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 11 deletions.
4 changes: 1 addition & 3 deletions src/main/java/com/dtstack/logstash/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ public static void main(String[] args) {
System.exit(-1);
}
//add shutdownhook
ShutDownHook shutDownHook = new ShutDownHook(inputQueueList, assemblyPipeline.getBaseInputs());
ShutDownHook shutDownHook = new ShutDownHook(inputQueueList, assemblyPipeline.getBaseInputs(),assemblyPipeline.getBaseOutPuts());
shutDownHook.addShutDownHook();
}


}
15 changes: 13 additions & 2 deletions src/main/java/com/dtstack/logstash/assembly/AssemblyPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.dtstack.logstash.configs.YamlConfig;
import com.dtstack.logstash.factory.FilterFactory;
import com.dtstack.logstash.factory.InputFactory;
Expand All @@ -19,6 +21,7 @@
import com.dtstack.logstash.outputs.BaseOutput;
import com.dtstack.logstash.property.SystemProperty;
import com.dtstack.logstash.utils.Machine;
import com.google.common.collect.Lists;

/**
*
Expand All @@ -40,6 +43,8 @@ public class AssemblyPipeline {

private List<BaseInput> baseInputs =null;

private List<BaseOutput> baseOutputs = Lists.newCopyOnWriteArrayList();

/**
* 组装管道
* @param cmdLine
Expand Down Expand Up @@ -102,12 +107,14 @@ protected void initInputPutThread(List<BaseInput> baseInputs) {
}
}

@SuppressWarnings("rawtypes")
protected void initFilterAndOutputThread(List<Map> outputs, List<Map> filters, List<LinkedBlockingQueue<Map<String,Object>>> queues,int batchSize) throws Exception{
filterOutputExecutor= Executors.newFixedThreadPool(queues.size());
for(LinkedBlockingQueue<Map<String,Object>> queue:queues){
List<BaseOutput> baseOutputs = OutputFactory.getBatchInstance(outputs);
List<BaseOutput> baseOutputs = OutputFactory.getBatchInstance(outputs);
List<BaseFilter> baseFilters = FilterFactory.getBatchInstance(filters);
filterOutputExecutor.submit(new FilterAndOutputThread(queue,baseFilters,baseOutputs,batchSize));
baseOutputs.addAll(baseOutputs);
}
}

Expand Down Expand Up @@ -155,6 +162,10 @@ protected static boolean isInputQueueSizeLog(CommandLine line){


public List<BaseInput> getBaseInputs() {
return baseInputs;
return this.baseInputs;
}

public List<BaseOutput> getBaseOutPuts() {
return this.baseOutputs;
}
}
24 changes: 22 additions & 2 deletions src/main/java/com/dtstack/logstash/assembly/ShutDownHook.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.dtstack.logstash.assembly;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.dtstack.logstash.inputs.BaseInput;
import com.dtstack.logstash.outputs.BaseOutput;

/**
*
Expand All @@ -19,13 +22,16 @@ public class ShutDownHook {

private InputQueueList initInputQueueList;

private List<BaseInput> baseInputs;
private List<BaseInput> baseInputs;

private List<BaseOutput> baseOutputs;

private static int sleep =1000;

public ShutDownHook(InputQueueList initInputQueueList,List<BaseInput> baseInputs){
public ShutDownHook(InputQueueList initInputQueueList,List<BaseInput> baseInputs,List<BaseOutput> baseOutputs){
this.initInputQueueList = initInputQueueList;
this.baseInputs = baseInputs;
this.baseOutputs = baseOutputs;
}

public void addShutDownHook(){
Expand Down Expand Up @@ -53,6 +59,19 @@ private void inputRelease(){
}
}

private void outPutRelease(){
try{
if(baseOutputs!=null){
for(BaseOutput outPut:baseOutputs){
outPut.release();
}
}
logger.warn("outPutRelease success...");
}catch(Exception e){
logger.error(e.getMessage());
}
}

private void inputQueueRelease(){
if(initInputQueueList!=null){
try{
Expand Down Expand Up @@ -84,6 +103,7 @@ public void run() {
// TODO Auto-generated method stub
inputRelease();
inputQueueRelease();
outPutRelease();
}
}
}
12 changes: 8 additions & 4 deletions src/main/java/com/dtstack/logstash/outputs/BaseOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
Expand All @@ -29,6 +28,7 @@ public abstract class BaseOutput implements Cloneable, java.io.Serializable{

private static final Logger logger = LoggerFactory.getLogger(BaseOutput.class);

@SuppressWarnings("rawtypes")
protected Map config;

protected List<TemplateRender> IF;
Expand All @@ -41,6 +41,7 @@ public abstract class BaseOutput implements Cloneable, java.io.Serializable{

public BlockingQueue<Object> failedMsgQueue = Queues.newLinkedBlockingDeque();

@SuppressWarnings({ "rawtypes", "unchecked" })
public BaseOutput(Map config) {
this.config = config;

Expand All @@ -65,8 +66,13 @@ public BaseOutput(Map config) {

public abstract void prepare();

@SuppressWarnings("rawtypes")
protected abstract void emit(Map event);

public void release(){};


@SuppressWarnings("rawtypes")
public void process(Map event) {
if(event != null && event.size() > 0){
boolean succuess = true;
Expand Down Expand Up @@ -124,7 +130,5 @@ public void sendFailedMsg(Object msg){
@Override
public Object clone() throws CloneNotSupportedException {
return super.clone();
}


}
}

0 comments on commit c18b98a

Please sign in to comment.