Skip to content

Commit

Permalink
[FLINK-22606][hive] Simplify the usage of SessionState
Browse files Browse the repository at this point in the history
This closes apache#16700
  • Loading branch information
lirui-apache committed Aug 11, 2021
1 parent 0c1b258 commit a5847e3
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 267 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@

import org.apache.calcite.rel.RelNode;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
Expand All @@ -66,7 +64,6 @@
import java.lang.reflect.Method;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -210,7 +207,7 @@ public List<Operation> parse(String statement) {
hiveShim.registerTemporaryFunction("grouping", HiveGenericUDFGrouping.class);
return processCmd(statement, hiveConf, hiveShim, (HiveCatalog) currentCatalog);
} finally {
clearSessionState(hiveConf);
clearSessionState();
}
}

Expand Down Expand Up @@ -311,7 +308,7 @@ private void startSessionState(HiveConf hiveConf, CatalogManager catalogManager)
sessionState.setCurrentDatabase(catalogManager.getCurrentDatabase());
// some Hive functions needs the timestamp
setCurrentTimestamp(sessionState);
SessionState.start(sessionState);
SessionState.setCurrentSessionState(sessionState);
} catch (LockException e) {
throw new FlinkHiveException("Failed to init SessionState", e);
} finally {
Expand All @@ -338,18 +335,11 @@ private static void setCurrentTimestamp(HiveParserSessionState sessionState) {
}
}

private void clearSessionState(HiveConf hiveConf) {
private void clearSessionState() {
SessionState sessionState = SessionState.get();
if (sessionState != null) {
try {
sessionState.close();
List<Path> toDelete = new ArrayList<>();
toDelete.add(SessionState.getHDFSSessionPath(hiveConf));
toDelete.add(SessionState.getLocalSessionPath(hiveConf));
for (Path path : toDelete) {
FileSystem fs = path.getFileSystem(hiveConf);
fs.delete(path, true);
}
} catch (Exception e) {
LOG.warn("Error closing SessionState", e);
}
Expand Down Expand Up @@ -409,8 +399,8 @@ public void close() throws IOException {
new File(getConf().getVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR));
LOG.debug("Removing resource dir " + resourceDir);
FileUtils.deleteDirectoryQuietly(resourceDir);
detachSession();
Hive.closeCurrent();
detachSession();
}

public Timestamp getHiveParserCurrentTS() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,12 @@ private RelNode genTableLogicalPlan(String tableAlias, HiveParserQB qb)
if (table.isTemporary()) {
// Hive creates a temp table for VALUES, we need to convert it to LogicalValues
RelNode values =
genValues(tableAlias, table, rowResolver, semanticAnalyzer, cluster);
genValues(
tableAlias,
table,
rowResolver,
cluster,
getQB().getValuesTableToData().get(tableAlias));
relToRowResolver.put(values, rowResolver);
relToHiveColNameCalcitePosMap.put(values, buildHiveToCalciteColumnMap(rowResolver));
return values;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.table.planner.delegation.hive.copy;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.table.planner.delegation.hive.HiveParserConstants;
import org.apache.flink.table.planner.delegation.hive.HiveParserRexNodeConverter;
import org.apache.flink.table.planner.delegation.hive.HiveParserTypeCheckProcFactory;
Expand All @@ -33,6 +32,7 @@
import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser;
import org.apache.flink.table.planner.delegation.hive.parse.HiveParserDDLSemanticAnalyzer;
import org.apache.flink.table.planner.delegation.hive.parse.HiveParserErrorMsg;
import org.apache.flink.util.Preconditions;

import org.antlr.runtime.tree.Tree;
import org.antlr.runtime.tree.TreeVisitor;
Expand Down Expand Up @@ -60,8 +60,6 @@
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
Expand Down Expand Up @@ -97,8 +95,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
Expand Down Expand Up @@ -790,8 +786,7 @@ static String unparseExprForValuesClause(HiveParserASTNode expr) throws Semantic
return "-"
+ unparseExprForValuesClause((HiveParserASTNode) expr.getChildren().get(0));
case HiveASTParser.TOK_NULL:
// Hive's text input will translate this as a null
return "\\N";
return null;
default:
throw new SemanticException(
"Expression of type " + expr.getText() + " not supported in insert/values");
Expand Down Expand Up @@ -1766,120 +1761,103 @@ public static RelNode genValues(
String tabAlias,
Table tmpTable,
HiveParserRowResolver rowResolver,
HiveParserSemanticAnalyzer semanticAnalyzer,
RelOptCluster cluster) {
try {
Path dataFile = new Path(tmpTable.getSd().getLocation(), "data_file");
FileSystem fs = dataFile.getFileSystem(semanticAnalyzer.getConf());
List<List<RexLiteral>> rows = new ArrayList<>();
// TODO: leverage Hive to read the data
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(fs.open(dataFile)))) {
List<TypeInfo> tmpTableTypes =
tmpTable.getCols().stream()
.map(f -> TypeInfoUtils.getTypeInfoFromTypeString(f.getType()))
.collect(Collectors.toList());

RexBuilder rexBuilder = cluster.getRexBuilder();

// calcite types for each field
List<RelDataType> calciteTargetTypes =
tmpTableTypes.stream()
.map(
i ->
HiveParserTypeConverter.convert(
(PrimitiveTypeInfo) i,
rexBuilder.getTypeFactory()))
.collect(Collectors.toList());

// calcite field names
List<String> calciteFieldNames =
IntStream.range(0, calciteTargetTypes.size())
.mapToObj(SqlUtil::deriveAliasFromOrdinal)
.collect(Collectors.toList());

// calcite type for each row
List<RelDataType> calciteRowTypes = new ArrayList<>();

String line = reader.readLine();
while (line != null) {
String[] values = line.split("\u0001");
List<RexLiteral> row = new ArrayList<>();
for (int i = 0; i < tmpTableTypes.size(); i++) {
PrimitiveTypeInfo primitiveTypeInfo =
(PrimitiveTypeInfo) tmpTableTypes.get(i);
RelDataType calciteType = calciteTargetTypes.get(i);
if (i >= values.length || values[i].equals("\\N")) {
row.add(rexBuilder.makeNullLiteral(calciteType));
} else {
String val = values[i];
switch (primitiveTypeInfo.getPrimitiveCategory()) {
case BYTE:
case SHORT:
case INT:
case LONG:
row.add(
rexBuilder.makeExactLiteral(
new BigDecimal(val), calciteType));
break;
case DECIMAL:
BigDecimal bigDec = new BigDecimal(val);
row.add(
SqlTypeUtil.isValidDecimalValue(bigDec, calciteType)
? rexBuilder.makeExactLiteral(
bigDec, calciteType)
: rexBuilder.makeNullLiteral(calciteType));
break;
case FLOAT:
case DOUBLE:
row.add(
rexBuilder.makeApproxLiteral(
new BigDecimal(val), calciteType));
break;
case BOOLEAN:
row.add(rexBuilder.makeLiteral(Boolean.parseBoolean(val)));
break;
default:
row.add(
rexBuilder.makeCharLiteral(
HiveParserUtils.asUnicodeString(val)));
}
}
RelOptCluster cluster,
List<List<String>> values) {
List<TypeInfo> tmpTableTypes =
tmpTable.getCols().stream()
.map(f -> TypeInfoUtils.getTypeInfoFromTypeString(f.getType()))
.collect(Collectors.toList());

RexBuilder rexBuilder = cluster.getRexBuilder();
// calcite types for each field
List<RelDataType> calciteTargetTypes =
tmpTableTypes.stream()
.map(
ti ->
HiveParserTypeConverter.convert(
(PrimitiveTypeInfo) ti,
rexBuilder.getTypeFactory()))
.collect(Collectors.toList());
// calcite field names
List<String> calciteFieldNames =
IntStream.range(0, calciteTargetTypes.size())
.mapToObj(SqlUtil::deriveAliasFromOrdinal)
.collect(Collectors.toList());

// calcite type for each row
List<RelDataType> calciteRowTypes = new ArrayList<>();

List<List<RexLiteral>> rows = new ArrayList<>();
for (List<String> value : values) {
Preconditions.checkArgument(
value.size() == tmpTableTypes.size(),
String.format(
"Values table col length (%d) and data length (%d) mismatch",
tmpTableTypes.size(), value.size()));
List<RexLiteral> row = new ArrayList<>();
for (int i = 0; i < tmpTableTypes.size(); i++) {
PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) tmpTableTypes.get(i);
RelDataType calciteType = calciteTargetTypes.get(i);
String col = value.get(i);
if (col == null) {
row.add(rexBuilder.makeNullLiteral(calciteType));
} else {
switch (primitiveTypeInfo.getPrimitiveCategory()) {
case BYTE:
case SHORT:
case INT:
case LONG:
row.add(rexBuilder.makeExactLiteral(new BigDecimal(col), calciteType));
break;
case DECIMAL:
BigDecimal bigDec = new BigDecimal(col);
row.add(
SqlTypeUtil.isValidDecimalValue(bigDec, calciteType)
? rexBuilder.makeExactLiteral(bigDec, calciteType)
: rexBuilder.makeNullLiteral(calciteType));
break;
case FLOAT:
case DOUBLE:
row.add(rexBuilder.makeApproxLiteral(new BigDecimal(col), calciteType));
break;
case BOOLEAN:
row.add(rexBuilder.makeLiteral(Boolean.parseBoolean(col)));
break;
default:
row.add(
rexBuilder.makeCharLiteral(
HiveParserUtils.asUnicodeString(col)));
}

calciteRowTypes.add(
rexBuilder
.getTypeFactory()
.createStructType(
row.stream()
.map(RexLiteral::getType)
.collect(Collectors.toList()),
calciteFieldNames));
rows.add(row);
line = reader.readLine();
}

// compute the final row type
RelDataType calciteRowType =
rexBuilder.getTypeFactory().leastRestrictive(calciteRowTypes);
for (int i = 0; i < calciteFieldNames.size(); i++) {
ColumnInfo colInfo =
new ColumnInfo(
calciteFieldNames.get(i),
HiveParserTypeConverter.convert(
calciteRowType.getFieldList().get(i).getType()),
tabAlias,
false);
rowResolver.put(tabAlias, calciteFieldNames.get(i), colInfo);
}
return HiveParserUtils.genValuesRelNode(
cluster,
rexBuilder.getTypeFactory().createStructType(calciteRowType.getFieldList()),
rows);
}
} catch (Exception e) {
throw new FlinkHiveException("Failed to convert temp table to LogicalValues", e);
}

calciteRowTypes.add(
rexBuilder
.getTypeFactory()
.createStructType(
row.stream()
.map(RexLiteral::getType)
.collect(Collectors.toList()),
calciteFieldNames));
rows.add(row);
}

// compute the final row type
RelDataType calciteRowType = rexBuilder.getTypeFactory().leastRestrictive(calciteRowTypes);
for (int i = 0; i < calciteFieldNames.size(); i++) {
ColumnInfo colInfo =
new ColumnInfo(
calciteFieldNames.get(i),
HiveParserTypeConverter.convert(
calciteRowType.getFieldList().get(i).getType()),
tabAlias,
false);
rowResolver.put(tabAlias, calciteFieldNames.get(i), colInfo);
}
return HiveParserUtils.genValuesRelNode(
cluster,
rexBuilder.getTypeFactory().createStructType(calciteRowType.getFieldList()),
rows);
}

private static void validatePartColumnType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@

import org.antlr.runtime.TokenRewriteStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -92,10 +90,6 @@ public boolean isExplainSkipExecution() {
return false;
}

public Path getMRTmpPath(URI uri) {
return null;
}

/**
* Set the token rewrite stream being used to parse the current top-level SQL statement. Note
* that this should <b>not</b> be used for other parsing activities; for example, when we
Expand Down
Loading

0 comments on commit a5847e3

Please sign in to comment.