Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
neoremind committed Sep 14, 2016
1 parent 24657d2 commit ebb51ca
Show file tree
Hide file tree
Showing 518 changed files with 42,962 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
language: java

after_success:
# - mvn jacoco:report coveralls:report
32 changes: 32 additions & 0 deletions consumer-spi/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>net.neoremind</groupId>
<artifactId>fountain-base</artifactId>
<relativePath>../pom.xml</relativePath>
<version>1.0.0-SNAPSHOT</version>
</parent>

<groupId>net.neoremind</groupId>
<artifactId>consumer-spi</artifactId>
<version>${consumer-spi.version}</version>
<packaging>jar</packaging>

<name>consumer-spi</name>
<url>http://maven.apache.org</url>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>net.neoremind</groupId>
<artifactId>fountain-common</artifactId>
<version>${fountain-common.version}</version>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package net.neoremind.fountain.consumer.exp;

/**
* 初始化消费者失败异常
*
* @author hexiufeng
*/
public class InitConsumerExp extends RuntimeException {
/**
* serialVersionUID
*/
private static final long serialVersionUID = 1L;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package net.neoremind.fountain.consumer.spi;

import net.neoremind.fountain.changedata.ChangeDataSet;
import net.neoremind.fountain.eventposition.DisposeEventPositionBridge;

/**
* 消费者的Actor,消费fountain产出的{@link ChangeDataSet}这个消息。
* <p/>
* 消费的形式可以很丰富:
* <ul>
* <li>写入本地文件</li>
* <li>发送至MQ中间件</li>
* <li>写入分布式存储HDFS</li>
* <li>....</li>
* </ul>
* <p/>
* 借鉴了<a href="https://en.wikipedia.org/wiki/Actor_model">Actor</a>模型,Actor作为一个独立的实体,
* 可做消息传递通信中的一个环节,它没有状态,不改变消息本身,只做消费处理,或者调用别的actor继续完成任务。
*
* @author zhangxu
*/
public interface ConsumeActor {

/**
* 接收到数据库变化消息后的处理逻辑。
*
* @param event 数据库变化消息
*/
void onReceive(ChangeDataSet event);

/**
* 当{@link #onReceive(ChangeDataSet)}处理完毕,并且安全退出/未抛出任何异常后,消费者会执行该方法。
*
* @param event 数据库变化消息
* @param bridge 同步点桥接
*
* @see DisposeEventPositionBridge
*/
void onSuccess(ChangeDataSet event, DisposeEventPositionBridge bridge);

/**
* 当{@link #onReceive(ChangeDataSet)}处理过程中,发生未不容的任何异常时,消费者回调该方法。
*
* @param event 数据库变化消息
* @param e 未捕获的异常
*/
void onUncaughtException(ChangeDataSet event, Exception e);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package net.neoremind.fountain.consumer.spi;

/**
* 知晓ConsumeActor元接口
*
* @author zhangxu
*/
public interface ConsumeActorAware {

/**
* Set the ConsumeActor that this object runs in.
*
* @param consumeActor
*/
void setConsumeActor(ConsumeActor consumeActor);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package net.neoremind.fountain.consumer.spi;

/**
* 具体如何消费一个数据的抽象接口
*
* @author hanxu
*
*/
public interface Consumer {
<T> boolean consume(T event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package net.neoremind.fountain.consumer.spi;

/**
* 消费一个数据变化的工作流程,一般包括解包、和消费
*
* @author zhangxu
*/
public interface ConsumerWorkflow {
/**
* 消费一个数据变化, 返回true表示消费成功,可以继续消费下一个, 如果返回false,上层调用方可能回滚事务,需要重复消费
*
* @param message 消息
*
* @return 是否消费成功
*/
boolean doConsume(Object message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package net.neoremind.fountain.consumer.spi;

/**
* 输出数据到mq的抽象
*
* @author hexiufeng
*/
public interface Output {
/**
* 输出数据
*
* @param message,可能是单条数据,也可能是一包数据包
*
* @return boolean
*/
boolean output(Object message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package net.neoremind.fountain.consumer.spi;

/**
* 消费者接收后真正消费前可以对数据进行转换
*
* @author hexiufeng
*/
public interface RecievedDataConverter {
/**
* 转换收到的数据
*
* @param dataObject
*
* @return 转换后的数据
*/
Object covert(Object dataObject);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package net.neoremind.fountain.consumer.spi.def;

import org.slf4j.Logger;

import net.neoremind.fountain.changedata.ChangeDataSet;
import net.neoremind.fountain.consumer.spi.Consumer;
import net.neoremind.fountain.eventposition.DisposeEventPositionBridge;

/**
* 直接消费ChangeDataSet对象的消费者抽象实现。
* <p>
* 直接消费ChangeDataSet对象的情况一般适用于producer-consumer部署在同一进程内的情况, 此时通过内存mq进行传输,ChangeDataSet不需要被打包和解包
* </p>
*
* @author hexiufeng
*/
public abstract class AbstractConsumeSingleChangeSetConsumer implements Consumer {

private DisposeEventPositionBridge disposeEventPositionBridge;

@Override
public <T> boolean consume(T event) {
if (event instanceof ChangeDataSet) {
ChangeDataSet ds = (ChangeDataSet) event;
outputCore(ds);
if (ds.getSyncPoint() != null) {
recordSyncPoint(ds);
}
return true;
}
throw new RuntimeException("event is not ChangeDataSet.");

}

/**
* 记录同步点
*
* @param event 事件
*/
private void recordSyncPoint(ChangeDataSet ds) {
if (disposeEventPositionBridge == null) {
return;
}
try {
disposeEventPositionBridge.getDisposeEventPosition(ds.getInstanceName()).saveSyncPoint(ds.getSyncPoint());
} catch (RuntimeException e) {
getLogger().error("record sync point failed.", e);
}
}

/**
* 输出event
*
* @param event ChangeDataSet
*
* @throws RuntimeException RuntimeException
*/
protected abstract void outputCore(ChangeDataSet event) throws RuntimeException;

/**
* 获取当前日志
*
* @return Logger
*/
protected abstract Logger getLogger();

public void setDisposeEventPositionBridge(DisposeEventPositionBridge disposeEventPositionBridge) {
this.disposeEventPositionBridge = disposeEventPositionBridge;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package net.neoremind.fountain.consumer.spi.def;

import net.neoremind.fountain.consumer.spi.ConsumerWorkflow;

/**
* 调试模式下的消费流程,不做任何事情
*
* @author hexiufeng
*/
public class DebugConsumer implements ConsumerWorkflow {

@Override
public boolean doConsume(Object message) {
// do nothing
return true;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package net.neoremind.fountain.consumer.spi.def;

import net.neoremind.fountain.changedata.ChangeDataSet;
import net.neoremind.fountain.consumer.spi.ConsumeActor;
import net.neoremind.fountain.eventposition.DisposeEventPositionBridge;

/**
* 默认的消费者Actor,什么都不做,继承的子类可覆盖方法,不用全部重写。
*
* @author zhangxu
*/
public class DefaultConsumeActor implements ConsumeActor {

@Override
public void onReceive(ChangeDataSet event) {
// do nothing
}

@Override
public void onSuccess(ChangeDataSet event, DisposeEventPositionBridge bridge) {
// do nothing
}

@Override
public void onUncaughtException(ChangeDataSet event, Exception e) {
// do nothing
}

}
Loading

0 comments on commit ebb51ca

Please sign in to comment.