Skip to content

Commit

Permalink
add local dump and indexbuild without hive and spark supporting
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Mar 5, 2021
1 parent 69b921c commit 37e3881
Show file tree
Hide file tree
Showing 99 changed files with 2,195 additions and 1,216 deletions.
13 changes: 12 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
<module>tis-hadoop-rpc</module>
<module>tis-solrconfig-parser</module>
<module>tis-solrj-client</module>

<module>tis-common</module>
<module>tis-builder-api</module>
<module>tis-base-test</module>
<module>tis-web-start</module>
<module>tis-assemble</module>

Expand All @@ -61,6 +61,7 @@

<module>maven-tpi-plugin</module>


</modules>

<dependencies>
Expand Down Expand Up @@ -151,6 +152,16 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.qlangtech.tis</groupId>
<artifactId>tis-base-test</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>




<dependency>
<groupId>com.qlangtech.tis</groupId>
<artifactId>tis-index-builder</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.qlangtech.tis.exec;

import com.qlangtech.tis.TisZkClient;
import com.qlangtech.tis.fs.ITISFileSystem;
import com.qlangtech.tis.fs.ITISFileSystemFactory;
import com.qlangtech.tis.offline.IndexBuilderTriggerFactory;
import com.qlangtech.tis.offline.TableDumpFactory;
Expand Down Expand Up @@ -50,7 +51,7 @@ public interface IExecChainContext extends IJoinTaskContext {
SqlTaskNodeMeta.SqlDataFlowTopology getTopology();


ITISFileSystemFactory getIndexBuildFileSystem();
ITISFileSystem getIndexBuildFileSystem();

TableDumpFactory getTableDumpFactory();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.qlangtech.tis.exec.ExecutePhaseRange;
import com.qlangtech.tis.exec.IExecChainContext;
import com.qlangtech.tis.exec.IIndexMetaData;
import com.qlangtech.tis.fs.ITISFileSystem;
import com.qlangtech.tis.fs.ITISFileSystemFactory;
import com.qlangtech.tis.fullbuild.IFullBuildContext;
import com.qlangtech.tis.fullbuild.servlet.IRebindableMDC;
Expand Down Expand Up @@ -52,7 +53,7 @@ public class DefaultChainContext implements IExecChainContext {

private ZkStateReader zkStateReader;

private ITISFileSystemFactory indexBuildFileSystem;
private ITISFileSystem indexBuildFileSystem;

private IFlatTableBuilder flatTableBuilderPlugin;

Expand Down Expand Up @@ -96,7 +97,7 @@ public IndexBuilderTriggerFactory getIndexBuilderFactory() {

public void setIndexBuilderTriggerFactory(IndexBuilderTriggerFactory indexBuilderTriggerFactory) {
this.indexBuilderTriggerFactory = indexBuilderTriggerFactory;
this.setIndexBuildFileSystem(indexBuilderTriggerFactory.getFsFactory());
this.setIndexBuildFileSystem(indexBuilderTriggerFactory.getFileSystem());
}

public void setMdcParamContext(IRebindableMDC mdcParamContext) {
Expand Down Expand Up @@ -154,7 +155,7 @@ public ExecutePhaseRange getExecutePhaseRange() {
return this.executePhaseRange;
}

private void setIndexBuildFileSystem(ITISFileSystemFactory fileSystem) {
private void setIndexBuildFileSystem(ITISFileSystem fileSystem) {
Objects.requireNonNull(fileSystem, "indexBuild fileSystem can not be null");
this.indexBuildFileSystem = fileSystem;
}
Expand Down Expand Up @@ -223,7 +224,7 @@ public void setZkClient(TisZkClient zkClient) {
}

@Override
public ITISFileSystemFactory getIndexBuildFileSystem() {
public ITISFileSystem getIndexBuildFileSystem() {
return this.indexBuildFileSystem;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/**
* Copyright (c) 2020 QingLang, Inc. <[email protected]>
*
* <p>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* <p>
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* <p>
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
Expand All @@ -30,6 +30,7 @@
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.management.remote.JMXConnector;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -79,7 +80,7 @@ protected ExecuteResult execute(IExecChainContext context) throws Exception {
// 直接开始回流
DocCollection collection = context.getZkStateReader().getClusterState().getCollection(context.getIndexName());
IndexBackflowManager indexBackFlowQueueTmp = new IndexBackflowManager(collection, context, this);
ImportDataProcessInfo state = new ImportDataProcessInfo(taskid, context.getIndexBuildFileSystem());
ImportDataProcessInfo state = new ImportDataProcessInfo(taskid, context.getIndexBuildFileSystem(), context.getZkClient());
state.setTimepoint(context.getPartitionTimestamp());
indexBackFlowQueueTmp.vistAllReplica((replic) -> {
BuildResult buildResult = new BuildResult(replic, state);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/**
* Copyright (c) 2020 QingLang, Inc. <[email protected]>
*
* <p>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* <p>
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* <p>
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
Expand All @@ -20,9 +20,9 @@
import com.qlangtech.tis.exec.ExecuteResult;
import com.qlangtech.tis.exec.IExecChainContext;
import com.qlangtech.tis.exec.IIndexMetaData;
import com.qlangtech.tis.fullbuild.indexbuild.HdfsSourcePathCreator;
import com.qlangtech.tis.fullbuild.indexbuild.IRemoteJobTrigger;
import com.qlangtech.tis.fullbuild.indexbuild.ITabPartition;
import com.qlangtech.tis.fullbuild.indexbuild.IndexBuildSourcePathCreator;
import com.qlangtech.tis.fullbuild.indexbuild.RunningStatus;
import com.qlangtech.tis.fullbuild.phasestatus.impl.BuildPhaseStatus;
import com.qlangtech.tis.fullbuild.phasestatus.impl.BuildSharedPhaseStatus;
Expand All @@ -41,6 +41,7 @@
import org.apache.solr.common.cloud.ZkStateReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Collections;
import java.util.List;
Expand All @@ -53,7 +54,7 @@
* @author 百岁([email protected]
* @date 2015年12月15日 下午5:08:07
*/
public class IndexBuildInterceptor extends TrackableExecuteInterceptor {
public abstract class IndexBuildInterceptor extends TrackableExecuteInterceptor {

public static final String NAME = "indexBuild";

Expand Down Expand Up @@ -88,12 +89,13 @@ protected ExecuteResult execute(final IExecChainContext execContext) throws Exce
joinPhaseState.setAllComplete();
final ITabPartition ps = ExecChainContextUtils.getDependencyTablesMINPartition(execContext);
// ▼▼▼▼ 触发索引构建
final HdfsSourcePathCreator pathCreator = createIndexBuildSourceCreator(execContext, ps);
final IndexBuildSourcePathCreator pathCreator = createIndexBuildSourceCreator(execContext, ps);
final int groupSize = execContext.getIndexShardCount();
if (groupSize < 1) {
return ExecuteResult.createFaild().setMessage(" build source ps:" + ps.getPt() + " is null");
}
SnapshotDomain domain = HttpConfigFileReader.getResource(execContext.getIndexName(), 0, RunEnvironment.getSysRuntime(), ConfigFileReader.FILE_SCHEMA, ConfigFileReader.FILE_SOLR);
SnapshotDomain domain = HttpConfigFileReader.getResource(execContext.getIndexName(), 0
, RunEnvironment.getSysRuntime(), ConfigFileReader.FILE_SCHEMA, ConfigFileReader.FILE_SOLR);
try {
if (!triggerIndexBuildJob(execContext.getIndexName(), ps, groupSize, pathCreator, execContext, domain)) {
String msg = "index build faild,ps:" + ps.getPt() + ",groupsize:" + groupSize;
Expand All @@ -107,21 +109,23 @@ protected ExecuteResult execute(final IExecChainContext execContext) throws Exce
return ExecuteResult.SUCCESS;
}

protected HdfsSourcePathCreator createIndexBuildSourceCreator(final IExecChainContext execContext, ITabPartition ps) {
throw new UnsupportedOperationException();
}
protected abstract IndexBuildSourcePathCreator createIndexBuildSourceCreator(final IExecChainContext execContext, ITabPartition ps);

/**
* 触发索引build
*
* @param indexName
* @param timepoint
* @param groupSize
* @param hdfsSourcePathCreator
* @param indexBuildSourcePathCreator
* @throws Exception
*/
private boolean triggerIndexBuildJob(String indexName, final ITabPartition timepoint, int groupSize, HdfsSourcePathCreator hdfsSourcePathCreator, IExecChainContext execContext, SnapshotDomain domain) throws Exception {
final ImportDataProcessInfo processInfo = new ImportDataProcessInfo(execContext.getTaskId(), execContext.getIndexBuildFileSystem());
private boolean triggerIndexBuildJob(String indexName, final ITabPartition timepoint, int groupSize
, IndexBuildSourcePathCreator indexBuildSourcePathCreator, IExecChainContext execContext, SnapshotDomain domain) throws Exception {


final ImportDataProcessInfo processInfo
= new ImportDataProcessInfo(execContext.getTaskId(), execContext.getIndexBuildFileSystem(), execContext.getZkClient());
IIndexMetaData indexMetaData = execContext.getIndexMetaData();
IIndexMetaData idexMeta = execContext.getIndexMetaData();
String indexBuilder = idexMeta.getSchemaParseResult().getIndexBuilder();
Expand All @@ -130,7 +134,7 @@ private boolean triggerIndexBuildJob(String indexName, final ITabPartition timep
}
processInfo.setTimepoint(timepoint.getPt());
processInfo.setIndexName(indexName);
processInfo.setHdfsSourcePathCreator(hdfsSourcePathCreator);
processInfo.setIndexBuildSourcePathCreator(indexBuildSourcePathCreator);
processInfo.setLuceneVersion(indexMetaData.getLuceneVersion());
setBuildTableTitleItems(indexName, processInfo, execContext);
final ExecutorCompletionService<BuildResult> completionService = new ExecutorCompletionService<BuildResult>(executorService);
Expand Down Expand Up @@ -165,7 +169,7 @@ private boolean processBuildResult(Future<BuildResult> result, final IndexBackfl
BuildResult buildResult;
buildResult = result.get();
if (!buildResult.isSuccess()) {
logger.error("sourpath:" + buildResult.getHdfsSourcePath() + " build faild.");
//logger.error("sourpath:" + buildResult.getHdfsSourcePath(indexBackflowManager.getExecContext()) + " build faild.");
// build失败
return false;
}
Expand All @@ -177,7 +181,8 @@ private boolean processBuildResult(Future<BuildResult> result, final IndexBackfl
return true;
}

private void createFeedbackJob(IExecChainContext execContext, int groupSize, ExecutorCompletionService<BuildResult> completionService, final IndexBackflowManager indexBackflowManager) {
private void createFeedbackJob(IExecChainContext execContext, int groupSize, ExecutorCompletionService<BuildResult> completionService
, final IndexBackflowManager indexBackflowManager) {
final ExecutorService asynIndexBuildTask = Executors.newSingleThreadExecutor(new ThreadFactory() {

@Override
Expand Down Expand Up @@ -219,7 +224,6 @@ public void uncaughtException(Thread t, Throwable e) {
* @param processinfo
*/
protected void setBuildTableTitleItems(String indexName, ImportDataProcessInfo processinfo, IExecChainContext execContext) {
// processinfo.setBuildTableTitleItems(titleColumn.toString());
throw new UnsupportedOperationException();
}

Expand All @@ -231,8 +235,6 @@ protected void setBuildTableTitleItems(String indexName, ImportDataProcessInfo p
protected final AbstractIndexBuildJob createRemoteIndexBuildJob(final IExecChainContext execContext, ImportDataProcessInfo processinfo
, int grouIndex, SnapshotDomain domain, BuildPhaseStatus phaseStatus) {
// 暂时全部提交到32G机器上构建索引吧
// final BuildPhaseStatus phaseStatus = this.getPhaseStatus(execContext, FullbuildPhase.BUILD);
//
IndexBuilderTriggerFactory indexBuilderFactory = execContext.getIndexBuilderFactory();
return new AbstractIndexBuildJob(execContext, processinfo, grouIndex, domain) {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
/**
* Copyright (c) 2020 QingLang, Inc. <[email protected]>
*
* <p>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* <p>
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* <p>
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package com.qlangtech.tis.exec.impl;

import com.qlangtech.tis.fs.ITISFileSystem;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.qlangtech.tis.exec.ExecuteResult;
import com.qlangtech.tis.exec.IExecChainContext;
import com.qlangtech.tis.fullbuild.indexbuild.HdfsSourcePathCreator;
import com.qlangtech.tis.fs.ITISFileSystem;
import com.qlangtech.tis.fullbuild.indexbuild.ITabPartition;
import com.qlangtech.tis.fullbuild.indexbuild.IndexBuildSourcePathCreator;
import com.qlangtech.tis.fullbuild.servlet.BuildTriggerServlet;
import com.qlangtech.tis.trigger.jst.ImportDataProcessInfo;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 当直接使用数据中心构建好的
Expand All @@ -33,39 +33,22 @@
*/
public final class IndexBuildWithHdfsPathInterceptor extends IndexBuildInterceptor {

// private static final String COMPONENT_NAME = "directbuild";
private static final String HDFS_PATH = "hdfspath";

private static final Logger logger = LoggerFactory.getLogger(IndexBuildWithHdfsPathInterceptor.class);

// @Override
// public ExecuteResult intercept(ActionInvocation invocation) throws
// Exception {
//
// IExecChainContext execContext = invocation.getContext();
//
// // 删除历史build索引文件
// HiveRemoveHistoryDataTask removeHistoryDataTask = new
// HiveRemoveHistoryDataTask(execContext.getIndexName(),
// execContext.getContextUserName(), execContext.getDistributeFileSystem());
// removeHistoryDataTask.removeHistoryBuildFile();
//
// return super.intercept(invocation);
// }
@Override
protected ExecuteResult execute(IExecChainContext execContext) throws Exception {
throw new UnsupportedOperationException();
// return super.execute(execContext);
}

@Override
protected HdfsSourcePathCreator createIndexBuildSourceCreator(final IExecChainContext execContext, ITabPartition ps) {
return new HdfsSourcePathCreator() {

protected IndexBuildSourcePathCreator createIndexBuildSourceCreator(final IExecChainContext execContext, ITabPartition ps) {
return new IndexBuildSourcePathCreator() {
@Override
public String build(String group) {
final String hdfspath = execContext.getString(HDFS_PATH);
ITISFileSystem fs = execContext.getIndexBuildFileSystem().getFileSystem();
ITISFileSystem fs = execContext.getIndexBuildFileSystem();
String path = hdfspath + "/pmod=" + group;
try {
if (fs.exists(fs.getPath(path))) {
Expand All @@ -85,21 +68,7 @@ public String build(String group) {
@Override
protected void setBuildTableTitleItems(String indexName, ImportDataProcessInfo processinfo, IExecChainContext execContext) {
processinfo.setBuildTableTitleItems(execContext.getString(BuildTriggerServlet.KEY_COLS));
// if( execContext.getString(ImportDataProcessInfo.KEY_DELIMITER)){
//
// }
//
processinfo.setHdfsdelimiter(StringUtils.defaultIfEmpty(execContext.getString(ImportDataProcessInfo.KEY_DELIMITER), ImportDataProcessInfo.DELIMITER_001));
processinfo.setHdfsdelimiter(
StringUtils.defaultIfEmpty(execContext.getString(ImportDataProcessInfo.KEY_DELIMITER), ImportDataProcessInfo.DELIMITER_001));
}
// @Override
// protected int getGroupSize(String indexName,
// HdfsSourcePathCreator pathCreator, FileSystem fileSystem)
// throws Exception {
//
// return GROUP_SIZE;
// }
// @Override
// public String getName() {
// return COMPONENT_NAME;
// }
}
Loading

0 comments on commit 37e3881

Please sign in to comment.