Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Feb 24, 2021
1 parent 351e261 commit 3fce947
Show file tree
Hide file tree
Showing 79 changed files with 1,993 additions and 385 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -715,9 +715,9 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.8</version>
<configuration>
<!---->
<!--
<test>TestAll</test>

-->
<classpathDependencyExcludes>
<classpathDependencyExclude>log4j:log4j</classpathDependencyExclude>
<classpathDependencyExclude>org.slf4j:slf4j-log4j12</classpathDependencyExclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface IExecChainContext extends IJoinTaskContext {

String getPartitionTimestamp();

ExecutePhaseRange getExecutePhaseRange();


IIndexMetaData getIndexMetaData();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public static void createTaskComplete(int taskid, ExecResult execResult) {
//
List<PostParam> params = Lists.newArrayList(//
new PostParam("execresult", String.valueOf(execResult.getValue())), //
new PostParam("taskid", String.valueOf(taskid)));
new PostParam(IParamContext.KEY_TASK_ID, String.valueOf(taskid)));
HttpUtils.soapRemote(url, params, CreateNewTaskResult.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import com.qlangtech.tis.order.center.IParamContext;
import org.apache.commons.io.IOUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
Expand All @@ -36,7 +38,7 @@ public class TaskStatusServlet extends HttpServlet {
private static final long serialVersionUID = 1L;

protected void service(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException {
int taskid = Integer.parseInt(req.getParameter("taskid"));
int taskid = Integer.parseInt(req.getParameter(IParamContext.KEY_TASK_ID));
// 是否要获取全部的日志信息,比如dump已經完成了,那麼只需要獲取dump之後的日志信息
// boolean all = Boolean.parseBoolean(req.getParameter("all"));
PhaseStatusCollection statusSet = TrackableExecuteInterceptor.taskPhaseReference.get(taskid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ private MDCParamContext getMDCParam(final HttpExecContext execContext, HttpServl

private abstract class MDCParamContext implements IRebindableMDC {

protected static final String MDC_KEY_TASK_ID = "taskid";
protected static final String MDC_KEY_TASK_ID = IParamContext.KEY_TASK_ID;

protected final HttpServletResponse res;

Expand Down Expand Up @@ -295,7 +295,7 @@ void resetParam(Integer taskid) {
throw new IllegalArgumentException("param taskid can not be empty");
}
this.taskid = taskid;
MDC.put(MDC_KEY_TASK_ID, String.valueOf(taskid));
MDC.put(IParamContext.KEY_TASK_ID, String.valueOf(taskid));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ public TabPartitions getTablePartition() {
}

@Override
public IJoinTaskContext joinTaskContext() {
return this.getParams();
public IExecChainContext getExecContext() {
return this.params;
}

public IExecChainContext getParams() {
return params;
}
// public IExecChainContext getParams() {
// return params;
// }

public void putContextValue(String key, Object v) {
this.contextValues.put(key, v);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import ch.qos.logback.classic.spi.LoggingEvent;
import com.qlangtech.tis.BaseTestCase;
import com.qlangtech.tis.order.center.IParamContext;
import com.qlangtech.tis.rpc.server.FullBuildStatCollectorServer;
import com.qlangtech.tis.rpc.server.IncrStatusUmbilicalProtocolImpl;
import com.qlangtech.tis.trigger.jst.MonotorTarget;
Expand Down Expand Up @@ -72,7 +73,7 @@ public boolean isClosed() {
* 测试全量构建日志
*/
public void testFullBuildLogger() {
MDC.put("taskid", String.valueOf(taskid));
MDC.put(IParamContext.KEY_TASK_ID, String.valueOf(taskid));
logger.info("start");
String loggerName = "full-" + taskid;
RealtimeLoggerCollectorAppender bufferAppender = RealtimeLoggerCollectorAppender.getBufferAppender(loggerName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.Sets;
import com.qlangtech.tis.BaseTestCase;
import com.qlangtech.tis.log.RealtimeLoggerCollectorAppender;
import com.qlangtech.tis.order.center.IParamContext;
import com.qlangtech.tis.realtime.transfer.TableSingleDataIndexStatus;
import com.qlangtech.tis.realtime.utils.NetUtils;
import com.qlangtech.tis.realtime.yarn.rpc.*;
Expand Down Expand Up @@ -220,7 +221,7 @@ public void validateExpect() {
FullBuildStatCollectorServer.registerMonitorEventHook = eventHook;
// 启动rpc服务端
Runnable writeLog = () -> {
MDC.put("taskid", String.valueOf(taskid));
MDC.put(IParamContext.KEY_TASK_ID, String.valueOf(taskid));
MDC.put("app", appname);
String logMsg = "test_log_msg";
Logger log = LoggerFactory.getLogger(TestIncrStatusServer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

import com.qlangtech.tis.exec.ExecChainContextUtils;
import com.qlangtech.tis.fs.ITaskContext;
import com.qlangtech.tis.fullbuild.indexbuild.IDumpTable;
import com.qlangtech.tis.fullbuild.indexbuild.ITabPartition;
import com.qlangtech.tis.order.center.IJoinTaskContext;
import com.qlangtech.tis.sql.parser.TabPartitions;

Expand All @@ -40,13 +38,13 @@ public abstract class AdapterTask extends DataflowTask {
private ITaskContext taskContext;

protected final TabPartitions getDumpPartition() {
TabPartitions dumpPartition = ExecChainContextUtils.getDependencyTablesPartitions(this.getContext().joinTaskContext());
TabPartitions dumpPartition = ExecChainContextUtils.getDependencyTablesPartitions(this.getContext().getExecContext());
return dumpPartition;
}

@Override
protected Map<String, Boolean> getTaskWorkStatus() {
return createTaskWorkStatus(this.getContext().joinTaskContext());
return createTaskWorkStatus(this.getContext().getExecContext());
}

public static Map<String, Boolean> createTaskWorkStatus(IJoinTaskContext chainContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ public interface ITemplateContext {

public void putContextValue(String key, Object v);

public IJoinTaskContext joinTaskContext();
public IJoinTaskContext getExecContext();
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
/**
* Copyright (c) 2020 QingLang, Inc. <[email protected]>
*
* <p>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* <p>
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* <p>
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package com.qlangtech.tis.order.center;

import com.qlangtech.tis.exec.ExecutePhaseRange;

/**
* @author 百岁([email protected]
* @date 2015年12月11日 上午11:09:21
Expand All @@ -32,6 +34,8 @@ public interface IParamContext {

public String KEY_BUILD_INDEXING_ALL_ROWS_COUNT = "indexing.all.rows.count";

ExecutePhaseRange getExecutePhaseRange();

public String getString(String key);

public boolean getBoolean(String key);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
/**
* Copyright (c) 2020 QingLang, Inc. <[email protected]>
*
* <p>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* <p>
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* <p>
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package com.qlangtech.tis.sql.parser.er;

import org.apache.commons.lang.StringUtils;

import java.util.Optional;

/**
* @author 百岁([email protected]
* @create: 2020-06-03 18:09
*/
public class TableMeta {

public static boolean hasValidPrimayTableSharedKey(Optional<TableMeta> ptab) {
return ptab.isPresent() && StringUtils.isNotEmpty(ptab.get().getSharedKey());
}

// 主索引表名称
private final String tabName;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/**
* Copyright (c) 2020 QingLang, Inc. <[email protected]>
*
* <p>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* <p>
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* <p>
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
Expand All @@ -17,6 +17,7 @@
import com.qlangtech.tis.dump.INameWithPathGetter;
import com.qlangtech.tis.fullbuild.indexbuild.IDumpTable;
import org.apache.commons.lang.StringUtils;

import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -213,7 +214,7 @@ public int hashCode() {
@Override
public boolean equals(Object obj) {
if (!(obj instanceof EntityName)) {
throw new IllegalStateException("obj" + obj + " is not type of EntityName");
throw new IllegalStateException("obj" + obj + ",[" + obj.getClass() + "] is not type of EntityName");
}
return this.hashCode() == ((EntityName) obj).hashCode();
}
Expand Down
7 changes: 6 additions & 1 deletion tis-console/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@

</profiles>
<dependencies>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<version>4.9.0</version>
<scope>test</scope>
</dependency>


<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@
import com.qlangtech.tis.runtime.module.action.SchemaAction;
import com.qlangtech.tis.runtime.module.action.SysInitializeAction;
import com.qlangtech.tis.runtime.module.misc.IMessageHandler;
import com.qlangtech.tis.runtime.module.screen.ViewPojo;
import com.qlangtech.tis.solrdao.ISchemaField;
import com.qlangtech.tis.solrdao.ISchemaPluginContext;
import com.qlangtech.tis.solrdao.SchemaResult;
import com.qlangtech.tis.solrdao.SolrFieldsParser;
import com.qlangtech.tis.solrdao.pojo.PSchemaField;
import com.qlangtech.tis.sql.parser.SqlTaskNode;
import com.qlangtech.tis.sql.parser.SqlTaskNodeMeta;
Expand All @@ -68,6 +70,7 @@
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CommonParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -107,6 +110,12 @@ public class CollectionAction extends com.qlangtech.tis.runtime.module.action.Ad
public static final String RESULT_KEY_ROWS_COUNT = "rowsCount";
public static final String RESULT_KEY_ROWS = "rows";

public static final String KEY_PK = "pk";
public static final String KEY_SHARD_NAME = "name";
public static final String KEY_CORE_URL = "coreUrl";
public static final String KEY_IS_ACTIVE = "active";
public static final String KEY_REPLICS = "replics";


// private

Expand All @@ -130,6 +139,47 @@ public void setTransactionManager(PlatformTransactionManager transactionManager)
this.transactionManager = transactionManager;
}

/**
* 给
*
* @param context
* @throws Exception
*/
public void doGetIndexTopology(Context context) throws Exception {
this.getIndexWithPost();
JSONObject biz = new JSONObject();

CollectionTopology topology = CoreAction.getCollectionTopology(this);

JSONArray shards = new JSONArray();
JSONObject shard = null;
JSONArray replics = null;
JSONObject replic = null;
for (CollectionTopology.Shared s : topology.getShareds()) {
shard = new JSONObject();
replics = new JSONArray();
shard.put(KEY_SHARD_NAME, s.getName());
for (Replica r : s.getReplics()) {
replic = new JSONObject();
replic.put(KEY_CORE_URL, r.getCoreUrl());
replic.put(KEY_IS_ACTIVE, r.getState() == Replica.State.ACTIVE);
replics.add(replic);
}
shard.put(KEY_REPLICS, replics);
shards.add(shard);
}


biz.put(SqlTaskNodeMeta.KEY_PROFILE_TOPOLOGY, shards);
SnapshotDomain snapshot = ViewPojo.getSnapshotDoamin(this, this.getAppDomain());
SolrFieldsParser.ParseResult parseResult = SolrFieldsParser.parse(() -> {
return snapshot.getSolrSchema().getContent();
}).getSchemaParseResult();

biz.put(KEY_PK, parseResult.getUniqueKey());
this.setBizResult(context, biz);
}

/**
* 回调获取索引当前状态
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void doCreateNewTask(Context context) {
@Func(value = PermissionConstant.DATAFLOW_MANAGE, sideEffect = false)
public void doTaskComplete(Context context) {
// Integer phaseid = this.getInt("phaseid");
Integer taskid = this.getInt("taskid");
Integer taskid = this.getInt(IParamContext.KEY_TASK_ID);
// 执行结果
// final String phaseinfo = this.getString("phaseinfo");
// WorkFlowBuildPhase phase = new WorkFlowBuildPhase();
Expand Down
Loading

0 comments on commit 3fce947

Please sign in to comment.