Skip to content

Commit

Permalink
add multi master support
Browse files Browse the repository at this point in the history
  • Loading branch information
cenwenchu committed Jan 12, 2012
1 parent f33bd4b commit 2b38c38
Show file tree
Hide file tree
Showing 13 changed files with 329 additions and 82 deletions.
2 changes: 1 addition & 1 deletion src/conf.test/master-config-ms.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
masterName=TOPAnalyzer
masterPort=6800
jobsSource=ms-jobs-config.properties
otherMasters=127.0.0.1:6801,127.0.0.1:6802
masterGroup=TOPAnalyzer:127.0.0.1:6800|5,TOPAnalyzer1:127.0.0.1:6801,TOPAnalyzer2:127.0.0.1:6802
4 changes: 4 additions & 0 deletions src/conf.test/master-config-ms1.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
masterName=TOPAnalyzer1
masterPort=6801
jobsSource=ms-jobs-config.properties
masterGroup=TOPAnalyzer:127.0.0.1:6800|5,TOPAnalyzer1:127.0.0.1:6801,TOPAnalyzer2:127.0.0.1:6802
4 changes: 4 additions & 0 deletions src/conf.test/master-config-ms2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
masterName=TOPAnalyzer2
masterPort=6802
jobsSource=ms-jobs-config.properties
masterGroup=TOPAnalyzer:127.0.0.1:6800|5,TOPAnalyzer1:127.0.0.1:6801,TOPAnalyzer2:127.0.0.1:6802
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void test() throws AnalysisException, InterruptedException
masterNode.setConfig(config);
masterNode.startNode();

Thread.sleep(1000);
Thread.sleep(1000000000);

//build SlaveNode
SlaveNode slaveNode = new SlaveNode();
Expand Down
14 changes: 7 additions & 7 deletions src/java/com/taobao/top/analysis/config/MasterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,19 @@ public class MasterConfig extends AbstractConfig{

//支持多个master,不过其他master仅仅作为分担合并任务的工作,
//主要目的就是分担主master的业务合并压力,部分报表可以定义给其他master合并
//配置方式(ip:port的方式):otherMasters=127.0.0.1:8877,127.0.0.1:6654
private final static String OTHER_MASTERS = "otherMasters";
//配置方式(name:ip:port|weight的方式,weight默认是1可以不填写):masterGroup=TOPAnalyzer:127.0.0.1:6800,TOPAnalyzer1:127.0.0.1:6801
private final static String MASTER_GROUP = "masterGroup";

public String getOtherMasters()
public String getMasterGroup()
{
if(this.properties.containsKey(OTHER_MASTERS))
return (String)this.properties.get(OTHER_MASTERS);
if(this.properties.containsKey(MASTER_GROUP))
return (String)this.properties.get(MASTER_GROUP);
else
return null;
}

public void setOtherMasters(String otherMasters) {
this.properties.put(OTHER_MASTERS,otherMasters);
public void setMasterGroup(String masterGroup) {
this.properties.put(MASTER_GROUP,masterGroup);
}

public boolean isUseAsynModeToSendResponse() {
Expand Down
94 changes: 45 additions & 49 deletions src/java/com/taobao/top/analysis/node/component/FileJobBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,32 @@ public Map<String,Job> build(String config) throws AnalysisException
buildTasks(job);
jobs.put(job.getJobName(), job);
}

//编译好rule后针对当前是否有mastergroup来做多master的report分配
if (StringUtils.isNotEmpty(this.config.getMasterGroup()))
{
String[] ms = StringUtils.split(this.config.getMasterGroup(),",");
List<String> masters = new ArrayList<String>();
List<String> reports = new ArrayList<String>();
for(String m : ms)
masters.add(m);

for(Job j : jobs.values())
{
Rule rule = j.getStatisticsRule();
reports.clear();

for(Report r : rule.getReportPool().values())
{
reports.add(new StringBuilder().append(r.getId())
.append("|").append(r.getWeight()).toString());
}

rule.getReport2Master().putAll(ReportUtil.SimpleAllocationAlgorithm(masters, reports, "|"));
}

}

}
}
catch(IOException ex)
Expand Down Expand Up @@ -524,6 +550,11 @@ public void setReport(StartElement start, Report report,
.getValue());
}

if (start.getAttributeByName(new QName("", "weight")) != null) {
report.setWeight(Integer.valueOf(start.getAttributeByName(new QName("", "weight"))
.getValue()));
}

if (start.getAttributeByName(new QName("", "key")) != null) {
report.setKey(start.getAttributeByName(new QName("", "key"))
.getValue());
Expand Down Expand Up @@ -591,33 +622,20 @@ public void setReportEntry(boolean isPublic, StartElement start,
StringBuilder globalConditions, StringBuilder globalValuefilter,
List<String> globalMapClass, List<String> parents) throws AnalysisException {

if (start.getAttributeByName(new QName("", "refId")) != null) {
ReportEntry node = entryPool.get(start.getAttributeByName(
new QName("", "refId")).getValue());

if (node != null) {
report.getReportEntrys().add(node);
} else {
String errorMsg = new StringBuilder()
.append("ref Entry not exist :")
.append(start
.getAttributeByName(new QName("", "refId"))
.getValue()).toString();

throw new java.lang.RuntimeException(errorMsg);
}

return;
}
// 引用的方式,只在非共享模式下有用
if (!isPublic && start.getAttributeByName(new QName("", "id")) != null
&& start.getAttributeByName(new QName("", "name")) == null) {
if ((!isPublic && start.getAttributeByName(new QName("", "id")) != null
&& start.getAttributeByName(new QName("", "name")) == null)
||(start.getAttributeByName(new QName("", "refId")) != null))
{

ReportEntry node = entryPool.get(start.getAttributeByName(
new QName("", "id")).getValue());

if (node != null) {
report.getReportEntrys().add(node);

//给node增加report的属性
node.getReports().add(report.getId());
} else {
String errorMsg = new StringBuilder()
.append("reportEntry not exist :")
Expand All @@ -629,6 +647,12 @@ public void setReportEntry(boolean isPublic, StartElement start,

return;
}

if (report != null)
{
//给node增加report的属性
entry.getReports().add(report.getId());
}

if (start.getAttributeByName(new QName("", "name")) != null) {
entry.setName(start.getAttributeByName(new QName("", "name"))
Expand Down Expand Up @@ -720,31 +744,14 @@ public void setReportEntry(boolean isPublic, StartElement start,
entry.setLazy(true);
}
entry.setCalculator(new SimpleCalculator(expression, aliasPool));
// entry.setValueExpression(expression, aliasPool);
}


//
// if (start.getAttributeByName(new QName("", "engine")) != null) {
// entry.setEngine(start.getAttributeByName(new QName("", "engine"))
// .getValue());
// }

if (start.getAttributeByName(new QName("", "lazy")) != null) {
entry.setLazy(Boolean.valueOf(start.getAttributeByName(
new QName("", "lazy")).getValue()));
}
//FIXME 壓縮
// if (start.getAttributeByName(new QName("", "useCompressKeyMode")) != null) {
// entry.setUseCompressKeyMode(start.getAttributeByName(
// new QName("", "useCompressKeyMode")).getValue());
// }
//
// if (start.getAttributeByName(new QName("", "compressedDestMaxLength")) != null) {
// entry.setCompressedDestMaxLength(Integer.parseInt(start.getAttributeByName(
// new QName("", "compressedDestMaxLength")).getValue()));
// }



// 以下修改conditions的设置方式 modify by fangliang 2010-05-26
StringBuilder conditions = new StringBuilder();
Expand All @@ -763,32 +770,21 @@ public void setReportEntry(boolean isPublic, StartElement start,
conditions.append("&" + attr.getValue());
}
if (conditions.length() > 0) {
// entry.setConditions(conditions.toString(), aliasPool);
entry.setCondition(new SimpleCondition(conditions.toString(), aliasPool));
}
String filter = null;
if (start.getAttributeByName(new QName("", "valuefilter")) != null) {
if (globalValuefilter != null && globalValuefilter.length() > 0)
// entry.setValuefilter(new StringBuilder(globalValuefilter)
// .append(start.getAttributeByName(
// new QName("", "valuefilter")).getValue())
// .toString());
filter = new StringBuilder(globalValuefilter)
.append(start.getAttributeByName(
new QName("", "valuefilter")).getValue()).toString();
else
// entry.setValuefilter(start.getAttributeByName(
// new QName("", "valuefilter")).getValue());
filter = start.getAttributeByName(new QName("", "valuefilter")).getValue();
} else {
if (globalValuefilter != null && globalValuefilter.length() > 0)
// entry.setValuefilter(globalValuefilter.toString());
filter = globalValuefilter.toString();
}
entry.setValueFilter(new SimpleFilter(filter));
//FIXME GLOBLEMAPPER
// if (globalMapClass != null && globalMapClass.size() > 0)
// entry.setGlobalMapClass(globalMapClass);

if (report != null)
report.getReportEntrys().add(entry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ private void allocateTask(JobTask jobTask)
{
jobTask.setStatus(JobTaskStatus.DOING);
jobTask.setStartTime(System.currentTimeMillis());
jobTask.setOtherMasters(config.getOtherMasters());
}

//分配任务和结果提交处理由于是单线程处理,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public Map<String,Job> build(String config) throws
public void init() {
jobBuilders = new HashMap<String,IJobBuilder>();
jobBuilders.put("file", new FileJobBuilder());
jobBuilders.get("file").setConfig(config);
}

@Override
Expand Down
18 changes: 0 additions & 18 deletions src/java/com/taobao/top/analysis/node/job/JobTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,6 @@ public class JobTask implements Serializable{
* 该task所属的job分配时的代数
*/
private int jobEpoch;

/**
* 可以将任务结果发送到其他多个master去做合并,减轻master压力
*/
String otherMasters;

public JobTask(JobConfig jobConfig)
{
Expand All @@ -86,19 +81,6 @@ public JobTask(JobConfig jobConfig)
recycleCounter= new AtomicInteger(0);
}


public String getOtherMasters() {
return otherMasters;
}



public void setOtherMasters(String otherMasters) {
this.otherMasters = otherMasters;
}



public int getJobEpoch() {
return jobEpoch;
}
Expand Down
10 changes: 10 additions & 0 deletions src/java/com/taobao/top/analysis/statistics/data/Report.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class Report implements java.io.Serializable {
private List<ReportEntry> reportEntrys;// entry的列表
private String orderby;// 暂时未使用
private int rowCount = 0;// 最多获取多少行

private int weight = 1;//权重,用来判断结果数据量有多少,越大表示数据越多,在多个master做合并的时候会有用

private boolean period = false;// 是否周期性输出结果,用于片段维度统计
private long exportInterval;
Expand All @@ -31,6 +33,14 @@ public class Report implements java.io.Serializable {
private String key;
private String condition;

public int getWeight() {
return weight;
}

public void setWeight(int weight) {
this.weight = weight;
}

public String getCondition() {
return condition;
}
Expand Down
13 changes: 13 additions & 0 deletions src/java/com/taobao/top/analysis/statistics/data/ReportEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package com.taobao.top.analysis.statistics.data;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import com.taobao.top.analysis.statistics.map.IMapper;
import com.taobao.top.analysis.statistics.reduce.IReducer;
Expand Down Expand Up @@ -61,6 +63,17 @@ public class ReportEntry implements Serializable, Cloneable {
*/
private String reduceParams;

/**
* 该entry所属的report
*/
private List<String> reports = new ArrayList<String>();

public List<String> getReports() {
return reports;
}
public void setReports(List<String> reports) {
this.reports = reports;
}
public boolean isLazy() {
return lazy;
}
Expand Down
16 changes: 16 additions & 0 deletions src/java/com/taobao/top/analysis/statistics/data/Rule.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ public class Rule implements Serializable {
* 报表定义池
*/
private Map<String, Report> reportPool;

/**
* 支持多个master来分担合并数据的任务,这里保存的是报表和master的对应关系
* reportid,masterip:port
*/
private Map<String, String> report2Master;
/**
* 别名定义池
*/
Expand Down Expand Up @@ -57,6 +63,7 @@ public Rule() {
entryPool = new HashMap<String, ReportEntry>();
parentEntryPool = new HashMap<String, ReportEntry>();
reportPool = new TreeMap<String, Report>();
report2Master = new HashMap<String,String>();
aliasPool = new HashMap<String, Alias>();
referEntrys = new HashMap<String, ReportEntry>();
innerKeyPool = new ArrayList<InnerKey>();
Expand All @@ -70,11 +77,20 @@ public void clear() {
aliasPool.clear();
referEntrys.clear();
innerKeyPool.clear();
report2Master.clear();

domain = null;
version = 0;
}

public Map<String, String> getReport2Master() {
return report2Master;
}

public void setReport2Master(Map<String, String> report2Master) {
this.report2Master = report2Master;
}

public List<InnerKey> getInnerKeyPool() {
return innerKeyPool;
}
Expand Down
Loading

0 comments on commit 2b38c38

Please sign in to comment.