Skip to content

Commit

Permalink
Merge branch '1.10_release_4.0.x' into hotfix_1.10_4.0.x_30935
Browse files Browse the repository at this point in the history
  • Loading branch information
FlechazoW committed Dec 7, 2020
2 parents 3935fc2 + 83af3c0 commit 2065fa6
Show file tree
Hide file tree
Showing 39 changed files with 1,409 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@

import java.io.IOException;
import java.net.InetAddress;
import java.sql.Time;
import java.sql.Date;
import java.sql.Timestamp;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.ArrayList;
Expand Down Expand Up @@ -234,7 +237,10 @@ private String buildSql(Row row) {
if (row.getField(index) == null) {
} else {
fields.append(fieldNames[index] + ",");
if (row.getField(index) instanceof String) {
if (row.getField(index) instanceof String
|| row.getField(index) instanceof Time
|| row.getField(index) instanceof Date
|| row.getField(index) instanceof Timestamp) {
values.append("'" + row.getField(index) + "'" + ",");
} else {
values.append(row.getField(index) + ",");
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/com/dtstack/flink/sql/GetPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URL;
import java.net.URLClassLoader;

/**
* local模式获取sql任务的执行计划
* Date: 2020/2/17
Expand All @@ -37,17 +40,23 @@ public class GetPlan {
private static final Logger LOG = LoggerFactory.getLogger(GetPlan.class);

public static String getExecutionPlan(String[] args) {
ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
try {
long start = System.currentTimeMillis();
ParamsInfo paramsInfo = ExecuteProcessHelper.parseParams(args);
paramsInfo.setGetPlan(true);
ClassLoader envClassLoader = StreamExecutionEnvironment.class.getClassLoader();
ClassLoader plannerClassLoader = URLClassLoader.newInstance(new URL[0], envClassLoader);
Thread.currentThread().setContextClassLoader(plannerClassLoader);
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExecution(paramsInfo);
String executionPlan = env.getExecutionPlan();
long end = System.currentTimeMillis();
return ApiResult.createSuccessResultJsonStr(executionPlan, end - start);
} catch (Exception e) {
LOG.error("Get plan error", e);
return ApiResult.createErrorResultJsonStr(ExceptionUtils.getFullStackTrace(e));
} finally {
Thread.currentThread().setContextClassLoader(currentClassLoader);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public class ConfigConstrant {
// default 200ms
public static final String AUTO_WATERMARK_INTERVAL_KEY = "autoWatermarkInterval";

// window early trigger
public static final String EARLY_TRIGGER = "early.trigger";

public static final String SQL_TTL_MINTIME = "sql.ttl.min";
public static final String SQL_TTL_MAXTIME = "sql.ttl.max";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ public static void streamExecutionEnvironmentConfig(StreamExecutionEnvironment s

confProperties = PropertiesUtils.propertiesTrim(confProperties);
streamEnv.getConfig().disableClosureCleaner();
// Disables reusing object
streamEnv.getConfig().enableObjectReuse();

Configuration globalJobParameters = new Configuration();
//Configuration unsupported set properties key-value
Expand All @@ -87,8 +85,8 @@ public static void streamExecutionEnvironmentConfig(StreamExecutionEnvironment s
ExecutionConfig exeConfig = streamEnv.getConfig();
if (exeConfig.getGlobalJobParameters() == null) {
exeConfig.setGlobalJobParameters(globalJobParameters);
} else if (exeConfig.getGlobalJobParameters() instanceof Configuration) {
((Configuration) exeConfig.getGlobalJobParameters()).addAll(globalJobParameters);
} else if (exeConfig.getGlobalJobParameters() instanceof ExecutionConfig.GlobalJobParameters) {
exeConfig.setGlobalJobParameters(globalJobParameters);
}

getEnvParallelism(confProperties).ifPresent(streamEnv::setParallelism);
Expand Down Expand Up @@ -123,6 +121,21 @@ public static void streamExecutionEnvironmentConfig(StreamExecutionEnvironment s
}
}

/**
* 设置TableEnvironment window提前触发
* @param tableEnv
* @param confProperties
*/
public static void streamTableEnvironmentEarlyTriggerConfig(TableEnvironment tableEnv, Properties confProperties) {
confProperties = PropertiesUtils.propertiesTrim(confProperties);
String triggerTime = confProperties.getProperty(ConfigConstrant.EARLY_TRIGGER);
if (StringUtils.isNumeric(triggerTime)) {
TableConfig qConfig = tableEnv.getConfig();
qConfig.getConfiguration().setString("table.exec.emit.early-fire.enabled", "true");
qConfig.getConfiguration().setString("table.exec.emit.early-fire.delay", triggerTime+"s");
}
}

/**
* 设置TableEnvironment状态超时时间
* @param tableEnv
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.flink.sql.exception;

/**
* @author: chuixue
* @create: 2020-11-30 16:23
* @description: 公共错误码
**/
public enum BaseCodeEnum implements ErrorCode {
/**
* 未指明的异常
*/
UNSPECIFIED("000", "unknow exception"),
;

/**
* 错误码
*/
private final String code;

/**
* 描述
*/
private final String description;

/**
* @param code 错误码
* @param description 描述
*/
private BaseCodeEnum(final String code, final String description) {
this.code = code;
this.description = description;
}

@Override
public String getCode() {
return code;
}

@Override
public String getDescription() {
return description;
}
}
126 changes: 126 additions & 0 deletions core/src/main/java/com/dtstack/flink/sql/exception/BaseException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.flink.sql.exception;


/**
* @author: chuixue
* @create: 2020-11-30 14:48
* @description:根异常
**/
public class BaseException extends RuntimeException {

/**
* 错误码
*/
protected final ErrorCode errorCode;

/**
* 无参默认构造UNSPECIFIED
*/
public BaseException() {
super(BaseCodeEnum.UNSPECIFIED.getDescription());
errorCode = BaseCodeEnum.UNSPECIFIED;
}

/**
* 指定错误码构造通用异常
*
* @param errorCode 错误码
*/
public BaseException(ErrorCode errorCode) {
super(errorCode.getDescription());
this.errorCode = errorCode;
}

/**
* 指定详细描述构造通用异常
*
* @param detailedMessage 详细描述
*/
public BaseException(final String detailedMessage) {
super(detailedMessage);
this.errorCode = BaseCodeEnum.UNSPECIFIED;
}

/**
* 指定导火索构造通用异常
*
* @param t 导火索
*/
public BaseException(final Throwable t) {
super(t);
this.errorCode = BaseCodeEnum.UNSPECIFIED;
}

/**
* 构造通用异常
*
* @param errorCode 错误码
* @param detailedMessage 详细描述
*/
public BaseException(final ErrorCode errorCode, final String detailedMessage) {
super(detailedMessage);
this.errorCode = errorCode;
}

/**
* 构造通用异常
*
* @param errorCode 错误码
* @param t 导火索
*/
public BaseException(final ErrorCode errorCode, final Throwable t) {
super(errorCode.getDescription(), t);
this.errorCode = errorCode;
}

/**
* 构造通用异常
*
* @param detailedMessage 详细描述
* @param t 导火索
*/
public BaseException(final String detailedMessage, final Throwable t) {
super(detailedMessage, t);
this.errorCode = BaseCodeEnum.UNSPECIFIED;
}

/**
* 构造通用异常
*
* @param errorCode 错误码
* @param detailedMessage 详细描述
* @param t 导火索
*/
public BaseException(final ErrorCode errorCode, final String detailedMessage,
final Throwable t) {
super(detailedMessage, t);
this.errorCode = errorCode;
}

/**
* 获取错误码
*
* @return
*/
public ErrorCode getErrorCode() {
return errorCode;
}
}
21 changes: 21 additions & 0 deletions core/src/main/java/com/dtstack/flink/sql/exception/ErrorCode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.dtstack.flink.sql.exception;

/**
* 错误码
*/
public interface ErrorCode {

/**
* 获取错误码
*
* @return
*/
String getCode();

/**
* 获取错误信息
*
* @return
*/
String getDescription();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.flink.sql.exception.sqlparse;

import com.dtstack.flink.sql.exception.ErrorCode;

/**
* @author: chuixue
* @create: 2020-11-30 16:56
* @description:sql解析错误码
**/
public enum SqlParseCodeEnum implements ErrorCode {
/**
* 流join维表时,select、join、group by等字段未使用t.field
*/
WITHOUT_TABLENAME("001", "field invalid , please use like t.field"),
;

/**
* 错误码
*/
private final String code;

/**
* 描述
*/
private final String description;

/**
* @param code 错误码
* @param description 描述
*/
private SqlParseCodeEnum(final String code, final String description) {
this.code = code;
this.description = description;
}

@Override
public String getCode() {
return code;
}

@Override
public String getDescription() {
return description;
}
}
Loading

0 comments on commit 2065fa6

Please sign in to comment.