如果阅读完文档后,还有任何疑问,请mail to [email protected]
tiger是一种分布式异步执行框架,偏重于执行层面,同一种任务可以由多台机器同时执行,并能保证一条任务不被重复执行。
tiger主要有以下三块组成:
-
zk集群管理:用于管理应用机器的在线情况,进而对机器可执行的任务节点进行自适应分配,保证一个任务同一时间只会被一台机器消费;
-
事件调度管理:用于每隔一定时间触发一次任务执行,并监听任务执行器的配置情况,一旦发生变化,即停止任务执行,重新设置后再触发任务执行;
-
任务执行管理:用于管理本机所分配到的执行器节点,进而进行任务节点捞取、任务过滤等,并对任务的执行结果进行处理;
<groupId>com.dianping</groupId>
<artifactId>tiger</artifactId>
<version>1.1.0</version>
策略a: 各个执行器捞取各自的任务
策略b: 统一捞取任务
策略a情况下实现接口:
配置:
configp.setProperty(ScheduleManagerFactory.ScheduleKeys.taskStrategy.name(),DispatchTaskService.TaskFetchStrategy.Multi.getValue() + "");
则实现各自捞取任务的操作接口
com.dianping.tiger.dispatch.DispatchMultiService
策略b情况下实现接口:
配置:
configp.setProperty(ScheduleManagerFactory.ScheduleKeys.taskStrategy.name(),DispatchTaskService.TaskFetchStrategy.Single.getValue() + "");
则实现统一捞取任务的操作接口
com.dianping.tiger.dispatch.DispatchSingleService
定义spring bean
<bean id="dispatchTaskService" class="你的实现类"/>
public int addDispatchTask(DispatchTaskEntity taskEntity);
策略a情况下实现:
public List<DispatchTaskEntity> findDispatchTasksWithLimit(String handler,List<Integer> nodeList, int limit);
策略b情况下实现:
public List<DispatchTaskEntity> findDispatchTasksWithLimit(List<Integer> nodeList, int limit);
public boolean updateTaskStatus(int taskId,int status,String hostName);
public boolean addRetryTimesAndExecuteTime(int taskId,Date nextExecuteTime,String hostName);
ScheduleManagerFactory.setBackFetchFlag(true)
策略a情况下实现:
public List<DispatchTaskEntity> findDispatchTasksWithLimitByBackFetch(String handler, List<Integer> nodeList, int limit,int taskId);
策略b情况下实现:
public List<DispatchTaskEntity> findDispatchTasksWithLimitByBackFetch(List<Integer> nodeList, int limit, int taskId);
com.dianping.tiger.dispatch.DispatchHandler
这里用于实现 业务逻辑;
任务分发支持并行、串行两种执行策略。 默认是并行执行策略,如果需要串行执行策略(同一个任务有先后执行顺序的情况下),在实现的类里增加一个注解,如:
@ExecuteType(AnnotationConstants.Executor.CHAIN)
public class ChainTestHandler implements DispatchHandler {
@Override
public DispatchResult invoke(DispatchParam param) throws Exception {
int taskId = (Integer) param.getProperty("id");
String jsonStr = (String) param.getProperty("param");
Map<String, String> paramMap = (Map<String, String>) JSON.parse(jsonStr);
...
}
}
com.dianping.tiger.ScheduleManagerFactory
example:
===========声明 ScheduleManagerFactory=======
ScheduleManagerFactory smf = new ScheduleManagerFactory(30*1000);
smf.setAppCtx(applicationcontext);
===========初始化配置==============
Properties configp = new Properties();
zk地址,必须
configp.setProperty(ScheduleManagerFactory.ZookeeperKeys.zkConnectAddress.name(),"127.0.0.1:2181,127.0.1.1:2181");
执行器名称,必须
configp.setProperty(ScheduleManagerFactory.ScheduleKeys.handlers.name(),"handler1,hander2,hangdler3");
zk节点rootpath,必须
configp.setProperty(ScheduleManagerFactory.ZookeeperKeys.rootPath.name(),"/DPWED");
虚拟节点数,最好大于20,默认100,可选
configp.setProperty(ScheduleManagerFactory.ScheduleKeys.visualNodeNum.name(),"30");
zk虚拟节点分配策略,1-散列模式,2-分块模式,默认分块模式,建议用2,可选
configp.setProperty(ScheduleManagerFactory.ScheduleKeys.divideType.name(), "2");
执行器策略,可选,默认为策略a
configp.setProperty(ScheduleManagerFactory.ScheduleKeys.taskStrategy.name(),"1");
总调度开关,默认true,可选
configp.setProperty(ScheduleManagerFactory.ScheduleKeys.scheduleFlag.name(),"true");
启用巡航模式,默认true,可选
configp.setProperty(ScheduleManagerFactory.ScheduleKeys.enableNavigate.name(),"true");
启用反压模式,默认false,可选
configp.setProperty(ScheduleManagerFactory.ScheduleKeys.enableBackFetch.name(),"false");
===========初始化启用==========
smf.initSchedule(configp);
完成以上4步,启动你的应用就可以使用了.
注意点:
-
ScheduleManagerFactory.keys.handlers.name()的名字需要和DispatchHandler接口实现类的 bean名字一样,执行器handler之间用,分隔;
-
DispatchHandler接口实现类的spring bean配置默认是 单例,所以在实现类里最好 不用成员变量,而要用局部变量, 成员变量是有状态的,会有线程安全问题;
初始化需要的配置外,tiger支持运行中的 配置改变,目前支持以下几种:
- 运行过程中执行器配置改变
ScheduleManagerFactory.reSchedule(List<String> handlers);
- 运行中调度总开关控制
ScheduleManagerFactory.setScheduleFlag(boolean flag);
- 运行中巡航开关控制
ScheduleManagerFactory.setNavigateFlag(boolean flag);
- 运行中反压措施开关控制
ScheduleManagerFactory.setBackFetchFlag(boolean flag);
tiger应用运行期间,支持任务监控,部署tiger-monitor 并且在tiger应用中增加如下配置:
监控服务地址,必须
configp.setProperty(ScheduleManagerFactory.MonitorKeys.monitorIP.name(),"http://10.128.122.126:8080");
监控开关,默认关闭,必须
configp.setProperty(ScheduleManagerFactory.MonitorKeys.enableMonitor.name(),"true");
同时支持监控运行中开关控制:
scheduleManagerFactory.setMonitorFlag(boolean flag);
Thanks