From a5847e3871ffb9515af9c754bd10c42611976c82 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Thu, 13 May 2021 20:02:34 +0800 Subject: [PATCH] [FLINK-22606][hive] Simplify the usage of SessionState This closes #16700 --- .../planner/delegation/hive/HiveParser.java | 18 +- .../hive/HiveParserCalcitePlanner.java | 7 +- .../copy/HiveParserBaseSemanticAnalyzer.java | 214 ++++++++---------- .../hive/copy/HiveParserContext.java | 6 - .../delegation/hive/copy/HiveParserQB.java | 15 +- .../hive/copy/HiveParserSemanticAnalyzer.java | 168 ++++---------- .../hive/HiveDialectQueryITCase.java | 22 ++ 7 files changed, 183 insertions(+), 267 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java index f5511b78f5ebe..81cb5e4fbe906 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java @@ -49,8 +49,6 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.tools.FrameworkConfig; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.lockmgr.LockException; @@ -66,7 +64,6 @@ import java.lang.reflect.Method; import java.sql.Timestamp; import java.time.Instant; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -210,7 +207,7 @@ public List parse(String statement) { hiveShim.registerTemporaryFunction("grouping", HiveGenericUDFGrouping.class); return processCmd(statement, hiveConf, hiveShim, (HiveCatalog) currentCatalog); } finally { - clearSessionState(hiveConf); + clearSessionState(); } } @@ -311,7 +308,7 @@ private void startSessionState(HiveConf hiveConf, CatalogManager catalogManager) sessionState.setCurrentDatabase(catalogManager.getCurrentDatabase()); // some Hive functions needs the timestamp setCurrentTimestamp(sessionState); - SessionState.start(sessionState); + SessionState.setCurrentSessionState(sessionState); } catch (LockException e) { throw new FlinkHiveException("Failed to init SessionState", e); } finally { @@ -338,18 +335,11 @@ private static void setCurrentTimestamp(HiveParserSessionState sessionState) { } } - private void clearSessionState(HiveConf hiveConf) { + private void clearSessionState() { SessionState sessionState = SessionState.get(); if (sessionState != null) { try { sessionState.close(); - List toDelete = new ArrayList<>(); - toDelete.add(SessionState.getHDFSSessionPath(hiveConf)); - toDelete.add(SessionState.getLocalSessionPath(hiveConf)); - for (Path path : toDelete) { - FileSystem fs = path.getFileSystem(hiveConf); - fs.delete(path, true); - } } catch (Exception e) { LOG.warn("Error closing SessionState", e); } @@ -409,8 +399,8 @@ public void close() throws IOException { new File(getConf().getVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR)); LOG.debug("Removing resource dir " + resourceDir); FileUtils.deleteDirectoryQuietly(resourceDir); - detachSession(); Hive.closeCurrent(); + detachSession(); } public Timestamp getHiveParserCurrentTS() { diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java index c5a0329c0dfe8..d5b3a8ac52b2a 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java @@ -806,7 +806,12 @@ private RelNode genTableLogicalPlan(String tableAlias, HiveParserQB qb) if (table.isTemporary()) { // Hive creates a temp table for VALUES, we need to convert it to LogicalValues RelNode values = - genValues(tableAlias, table, rowResolver, semanticAnalyzer, cluster); + genValues( + tableAlias, + table, + rowResolver, + cluster, + getQB().getValuesTableToData().get(tableAlias)); relToRowResolver.put(values, rowResolver); relToHiveColNameCalcitePosMap.put(values, buildHiveToCalciteColumnMap(rowResolver)); return values; diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java index d1c9dfd3bb90c..c7eb61f0b45ff 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java @@ -19,7 +19,6 @@ package org.apache.flink.table.planner.delegation.hive.copy; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.connectors.hive.FlinkHiveException; import org.apache.flink.table.planner.delegation.hive.HiveParserConstants; import org.apache.flink.table.planner.delegation.hive.HiveParserRexNodeConverter; import org.apache.flink.table.planner.delegation.hive.HiveParserTypeCheckProcFactory; @@ -33,6 +32,7 @@ import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser; import org.apache.flink.table.planner.delegation.hive.parse.HiveParserDDLSemanticAnalyzer; import org.apache.flink.table.planner.delegation.hive.parse.HiveParserErrorMsg; +import org.apache.flink.util.Preconditions; import org.antlr.runtime.tree.Tree; import org.antlr.runtime.tree.TreeVisitor; @@ -60,8 +60,6 @@ import org.apache.calcite.util.ImmutableBitSet; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -97,8 +95,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.InputStreamReader; import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.math.BigDecimal; @@ -790,8 +786,7 @@ static String unparseExprForValuesClause(HiveParserASTNode expr) throws Semantic return "-" + unparseExprForValuesClause((HiveParserASTNode) expr.getChildren().get(0)); case HiveASTParser.TOK_NULL: - // Hive's text input will translate this as a null - return "\\N"; + return null; default: throw new SemanticException( "Expression of type " + expr.getText() + " not supported in insert/values"); @@ -1766,120 +1761,103 @@ public static RelNode genValues( String tabAlias, Table tmpTable, HiveParserRowResolver rowResolver, - HiveParserSemanticAnalyzer semanticAnalyzer, - RelOptCluster cluster) { - try { - Path dataFile = new Path(tmpTable.getSd().getLocation(), "data_file"); - FileSystem fs = dataFile.getFileSystem(semanticAnalyzer.getConf()); - List> rows = new ArrayList<>(); - // TODO: leverage Hive to read the data - try (BufferedReader reader = - new BufferedReader(new InputStreamReader(fs.open(dataFile)))) { - List tmpTableTypes = - tmpTable.getCols().stream() - .map(f -> TypeInfoUtils.getTypeInfoFromTypeString(f.getType())) - .collect(Collectors.toList()); - - RexBuilder rexBuilder = cluster.getRexBuilder(); - - // calcite types for each field - List calciteTargetTypes = - tmpTableTypes.stream() - .map( - i -> - HiveParserTypeConverter.convert( - (PrimitiveTypeInfo) i, - rexBuilder.getTypeFactory())) - .collect(Collectors.toList()); - - // calcite field names - List calciteFieldNames = - IntStream.range(0, calciteTargetTypes.size()) - .mapToObj(SqlUtil::deriveAliasFromOrdinal) - .collect(Collectors.toList()); - - // calcite type for each row - List calciteRowTypes = new ArrayList<>(); - - String line = reader.readLine(); - while (line != null) { - String[] values = line.split("\u0001"); - List row = new ArrayList<>(); - for (int i = 0; i < tmpTableTypes.size(); i++) { - PrimitiveTypeInfo primitiveTypeInfo = - (PrimitiveTypeInfo) tmpTableTypes.get(i); - RelDataType calciteType = calciteTargetTypes.get(i); - if (i >= values.length || values[i].equals("\\N")) { - row.add(rexBuilder.makeNullLiteral(calciteType)); - } else { - String val = values[i]; - switch (primitiveTypeInfo.getPrimitiveCategory()) { - case BYTE: - case SHORT: - case INT: - case LONG: - row.add( - rexBuilder.makeExactLiteral( - new BigDecimal(val), calciteType)); - break; - case DECIMAL: - BigDecimal bigDec = new BigDecimal(val); - row.add( - SqlTypeUtil.isValidDecimalValue(bigDec, calciteType) - ? rexBuilder.makeExactLiteral( - bigDec, calciteType) - : rexBuilder.makeNullLiteral(calciteType)); - break; - case FLOAT: - case DOUBLE: - row.add( - rexBuilder.makeApproxLiteral( - new BigDecimal(val), calciteType)); - break; - case BOOLEAN: - row.add(rexBuilder.makeLiteral(Boolean.parseBoolean(val))); - break; - default: - row.add( - rexBuilder.makeCharLiteral( - HiveParserUtils.asUnicodeString(val))); - } - } + RelOptCluster cluster, + List> values) { + List tmpTableTypes = + tmpTable.getCols().stream() + .map(f -> TypeInfoUtils.getTypeInfoFromTypeString(f.getType())) + .collect(Collectors.toList()); + + RexBuilder rexBuilder = cluster.getRexBuilder(); + // calcite types for each field + List calciteTargetTypes = + tmpTableTypes.stream() + .map( + ti -> + HiveParserTypeConverter.convert( + (PrimitiveTypeInfo) ti, + rexBuilder.getTypeFactory())) + .collect(Collectors.toList()); + // calcite field names + List calciteFieldNames = + IntStream.range(0, calciteTargetTypes.size()) + .mapToObj(SqlUtil::deriveAliasFromOrdinal) + .collect(Collectors.toList()); + + // calcite type for each row + List calciteRowTypes = new ArrayList<>(); + + List> rows = new ArrayList<>(); + for (List value : values) { + Preconditions.checkArgument( + value.size() == tmpTableTypes.size(), + String.format( + "Values table col length (%d) and data length (%d) mismatch", + tmpTableTypes.size(), value.size())); + List row = new ArrayList<>(); + for (int i = 0; i < tmpTableTypes.size(); i++) { + PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) tmpTableTypes.get(i); + RelDataType calciteType = calciteTargetTypes.get(i); + String col = value.get(i); + if (col == null) { + row.add(rexBuilder.makeNullLiteral(calciteType)); + } else { + switch (primitiveTypeInfo.getPrimitiveCategory()) { + case BYTE: + case SHORT: + case INT: + case LONG: + row.add(rexBuilder.makeExactLiteral(new BigDecimal(col), calciteType)); + break; + case DECIMAL: + BigDecimal bigDec = new BigDecimal(col); + row.add( + SqlTypeUtil.isValidDecimalValue(bigDec, calciteType) + ? rexBuilder.makeExactLiteral(bigDec, calciteType) + : rexBuilder.makeNullLiteral(calciteType)); + break; + case FLOAT: + case DOUBLE: + row.add(rexBuilder.makeApproxLiteral(new BigDecimal(col), calciteType)); + break; + case BOOLEAN: + row.add(rexBuilder.makeLiteral(Boolean.parseBoolean(col))); + break; + default: + row.add( + rexBuilder.makeCharLiteral( + HiveParserUtils.asUnicodeString(col))); } - - calciteRowTypes.add( - rexBuilder - .getTypeFactory() - .createStructType( - row.stream() - .map(RexLiteral::getType) - .collect(Collectors.toList()), - calciteFieldNames)); - rows.add(row); - line = reader.readLine(); } - - // compute the final row type - RelDataType calciteRowType = - rexBuilder.getTypeFactory().leastRestrictive(calciteRowTypes); - for (int i = 0; i < calciteFieldNames.size(); i++) { - ColumnInfo colInfo = - new ColumnInfo( - calciteFieldNames.get(i), - HiveParserTypeConverter.convert( - calciteRowType.getFieldList().get(i).getType()), - tabAlias, - false); - rowResolver.put(tabAlias, calciteFieldNames.get(i), colInfo); - } - return HiveParserUtils.genValuesRelNode( - cluster, - rexBuilder.getTypeFactory().createStructType(calciteRowType.getFieldList()), - rows); } - } catch (Exception e) { - throw new FlinkHiveException("Failed to convert temp table to LogicalValues", e); - } + + calciteRowTypes.add( + rexBuilder + .getTypeFactory() + .createStructType( + row.stream() + .map(RexLiteral::getType) + .collect(Collectors.toList()), + calciteFieldNames)); + rows.add(row); + } + + // compute the final row type + RelDataType calciteRowType = rexBuilder.getTypeFactory().leastRestrictive(calciteRowTypes); + for (int i = 0; i < calciteFieldNames.size(); i++) { + ColumnInfo colInfo = + new ColumnInfo( + calciteFieldNames.get(i), + HiveParserTypeConverter.convert( + calciteRowType.getFieldList().get(i).getType()), + tabAlias, + false); + rowResolver.put(tabAlias, calciteFieldNames.get(i), colInfo); + } + return HiveParserUtils.genValuesRelNode( + cluster, + rexBuilder.getTypeFactory().createStructType(calciteRowType.getFieldList()), + rows); } private static void validatePartColumnType( diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserContext.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserContext.java index 53a7243a922fc..f928be687246a 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserContext.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserContext.java @@ -22,11 +22,9 @@ import org.antlr.runtime.TokenRewriteStream; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URI; import java.util.HashMap; import java.util.Map; @@ -92,10 +90,6 @@ public boolean isExplainSkipExecution() { return false; } - public Path getMRTmpPath(URI uri) { - return null; - } - /** * Set the token rewrite stream being used to parse the current top-level SQL statement. Note * that this should not be used for other parsing activities; for example, when we diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserQB.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserQB.java index 0de803ce98db2..f8f98c4e67181 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserQB.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserQB.java @@ -18,7 +18,6 @@ package org.apache.flink.table.planner.delegation.hive.copy; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.QBMetaData; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; @@ -26,7 +25,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -52,6 +50,7 @@ public class HiveParserQB { private boolean isQuery; private boolean insideView; private Set aliasInsideView; + private final Map>> valuesTableToData = new HashMap<>(); // used by PTFs /* @@ -269,14 +268,6 @@ public int incrNumSubQueryPredicates() { return ++numSubQueryPredicates; } - /** - * List of dbName.tblName of encrypted target tables of insert statement Used to support Insert - * ... values(...). - */ - List getEncryptedTargetTablePaths() { - return Collections.emptyList(); - } - public HashMap getViewToTabSchema() { return viewAliasToViewSchema; } @@ -311,4 +302,8 @@ public boolean containsQueryWithoutSourceTable() { public boolean isMaterializedView() { return false; } + + public Map>> getValuesTableToData() { + return valuesTableToData; + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSemanticAnalyzer.java index 26e3f1e2b7d46..11ea60d6eaf19 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSemanticAnalyzer.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSemanticAnalyzer.java @@ -32,6 +32,7 @@ import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser; import org.apache.flink.table.planner.delegation.hive.parse.HiveParserDDLSemanticAnalyzer; import org.apache.flink.table.planner.delegation.hive.parse.HiveParserErrorMsg; +import org.apache.flink.util.Preconditions; import org.antlr.runtime.ClassicToken; import org.antlr.runtime.Token; @@ -40,9 +41,8 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.tools.FrameworkConfig; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; @@ -83,12 +83,12 @@ import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.InputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; +import java.io.File; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -108,7 +108,6 @@ import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.findTabRefIdxs; import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.getAliasId; import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.getColumnInternalName; -import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.getQualifiedTableName; import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.getUnescapedName; import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.handleQueryWindowClauses; import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.initPhase1Ctx; @@ -601,25 +600,18 @@ public Map getNameToSplitSampleMap() { // See also preProcessForInsert(HiveParserASTNode, HiveParserQB) private HiveParserASTNode genValuesTempTable(HiveParserASTNode originalFrom, HiveParserQB qb) throws SemanticException { - Path dataDir = null; - if (!qb.getEncryptedTargetTablePaths().isEmpty()) { - // currently only Insert into T values(...) is supported thus only 1 values clause - // and only 1 target table are possible. If/when support for - // select ... from values(...) is added an insert statement may have multiple - // encrypted target tables. - dataDir = ctx.getMRTmpPath(qb.getEncryptedTargetTablePaths().get(0).toUri()); - } - // Pick a name for the table - SessionState ss = SessionState.get(); - String tableName = VALUES_TMP_TABLE_NAME_PREFIX + ss.getNextValuesTempTableSuffix(); + // hive creates a temp table and writes the values data into it + // here we skip writing the data but remember the values data instead + // later calcite planner can generate LogicalValues from it // Step 1, parse the values clause we were handed List fromChildren = originalFrom.getChildren(); // First child should be the virtual table ref HiveParserASTNode virtualTableRef = (HiveParserASTNode) fromChildren.get(0); - assert virtualTableRef.getToken().getType() == HiveASTParser.TOK_VIRTUAL_TABREF - : "Expected first child of TOK_VIRTUAL_TABLE to be TOK_VIRTUAL_TABREF but was " - + virtualTableRef.getName(); + Preconditions.checkArgument( + virtualTableRef.getToken().getType() == HiveASTParser.TOK_VIRTUAL_TABREF, + "Expected first child of TOK_VIRTUAL_TABLE to be TOK_VIRTUAL_TABREF but was " + + virtualTableRef.getName()); List virtualTableRefChildren = virtualTableRef.getChildren(); // First child of this should be the table name. If it's anonymous, @@ -633,80 +625,63 @@ private HiveParserASTNode genValuesTempTable(HiveParserASTNode originalFrom, Hiv // The second child of the TOK_VIRTUAL_TABLE should be TOK_VALUES_TABLE HiveParserASTNode valuesTable = (HiveParserASTNode) fromChildren.get(1); - assert valuesTable.getToken().getType() == HiveASTParser.TOK_VALUES_TABLE - : "Expected second child of TOK_VIRTUAL_TABLE to be TOK_VALUE_TABLE but was " - + valuesTable.getName(); - // Each of the children of TOK_VALUES_TABLE will be a TOK_VALUE_ROW - List valuesTableChildren = valuesTable.getChildren(); - - // Now that we're going to start reading through the rows, open a file to write the rows too - // If we leave this method before creating the temporary table we need to be sure to clean - // up this file. - Path tablePath = null; - FileSystem fs = null; - FSDataOutputStream out = null; + Preconditions.checkArgument( + valuesTable.getToken().getType() == HiveASTParser.TOK_VALUES_TABLE, + "Expected second child of TOK_VIRTUAL_TABLE to be TOK_VALUE_TABLE but was " + + valuesTable.getName()); + + // Pick a name for the table + SessionState ss = SessionState.get(); + String tableName = + (VALUES_TMP_TABLE_NAME_PREFIX + ss.getNextValuesTempTableSuffix()).toLowerCase(); + + List rows = valuesTable.getChildren(); + List> valuesData = new ArrayList<>(rows.size()); try { - if (dataDir == null) { - tablePath = Warehouse.getDnsPath(new Path(ss.getTempTableSpace(), tableName), conf); - } else { - // if target table of insert is encrypted, make sure temporary table data is stored - // similarly encrypted - tablePath = Warehouse.getDnsPath(new Path(dataDir, tableName), conf); - } - fs = tablePath.getFileSystem(conf); - fs.mkdirs(tablePath); - Path dataFile = new Path(tablePath, "data_file"); - out = fs.create(dataFile); List fields = new ArrayList<>(); boolean firstRow = true; - for (Node n : valuesTableChildren) { - HiveParserASTNode valuesRow = (HiveParserASTNode) n; - assert valuesRow.getToken().getType() == HiveASTParser.TOK_VALUE_ROW - : "Expected child of TOK_VALUE_TABLE to be TOK_VALUE_ROW but was " - + valuesRow.getName(); + for (Node n : rows) { + // Each of the children of TOK_VALUES_TABLE will be a TOK_VALUE_ROW + HiveParserASTNode row = (HiveParserASTNode) n; + Preconditions.checkArgument( + row.getToken().getType() == HiveASTParser.TOK_VALUE_ROW, + "Expected child of TOK_VALUE_TABLE to be TOK_VALUE_ROW but was " + + row.getName()); // Each of the children of this should be a literal - List valuesRowChildren = valuesRow.getChildren(); - boolean isFirst = true; + List columns = row.getChildren(); + List data = new ArrayList<>(columns.size()); int nextColNum = 1; - for (Node n1 : valuesRowChildren) { - HiveParserASTNode value = (HiveParserASTNode) n1; + for (Node n1 : columns) { + HiveParserASTNode column = (HiveParserASTNode) n1; if (firstRow) { fields.add(new FieldSchema("tmp_values_col" + nextColNum++, "string", "")); } - if (isFirst) { - isFirst = false; - } else { - HiveParserUtils.writeAsText("\u0001", out); - } - HiveParserUtils.writeAsText(unparseExprForValuesClause(value), out); + data.add(unparseExprForValuesClause(column)); } - HiveParserUtils.writeAsText("\n", out); firstRow = false; + valuesData.add(data); } - // Step 2, create a temp table, using the created file as the data + // Step 2, create a temp table Table table = db.newTable(tableName); table.setSerializationLib(conf.getVar(HiveConf.ConfVars.HIVEDEFAULTSERDE)); HiveTableUtil.setStorageFormat(table.getSd(), "TextFile", conf); table.setFields(fields); - table.setDataLocation(tablePath); - table.getTTable().setTemporary(true); - table.setStoredAsSubDirectories(false); - db.createTable(table, false); - } catch (Exception e) { - String errMsg = ErrorMsg.INSERT_CANNOT_CREATE_TEMP_FILE.getMsg() + e.getMessage(); - LOG.error(errMsg); - // Try to delete the file - if (fs != null && tablePath != null) { - try { - fs.delete(tablePath, false); - } catch (IOException swallowIt) { - } + // make up a path for this table + File dataLocation = Files.createTempDirectory(tableName).toFile(); + try { + table.setDataLocation(new Path(dataLocation.toURI().toString(), tableName)); + table.getTTable().setTemporary(true); + table.setStoredAsSubDirectories(false); + db.createTable(table, false); + } finally { + FileUtils.deleteQuietly(dataLocation); } - throw new SemanticException(errMsg, e); - } finally { - IOUtils.closeStream(out); + // remember the data for this table + qb.getValuesTableToData().put(tableName, valuesData); + } catch (Exception e) { + throw new SemanticException("Failed to create temp table for VALUES", e); } // Step 3, return a new subtree with a from clause built around that temp table @@ -2297,7 +2272,6 @@ public boolean genResolvedParseTree(HiveParserASTNode ast, HiveParserPlannerCont // 4. continue analyzing from the child HiveParserASTNode. HiveParserBaseSemanticAnalyzer.Phase1Ctx ctx1 = initPhase1Ctx(); - preProcessForInsert(ast); if (!doPhase1(ast, qb, ctx1, plannerCtx)) { // if phase1Result false return return false; @@ -2312,48 +2286,6 @@ public boolean genResolvedParseTree(HiveParserASTNode ast, HiveParserPlannerCont return true; } - /** - * This will walk AST of an INSERT statement and assemble a list of target tables which are in - * an HDFS encryption zone. This is needed to make sure that so that the data from values clause - * of Insert ... select values(...) is stored securely. See also {@link - * #genValuesTempTable(HiveParserASTNode, HiveParserQB)} - */ - private void preProcessForInsert(HiveParserASTNode node) throws SemanticException { - try { - if (!(node != null - && node.getToken() != null - && node.getToken().getType() == HiveASTParser.TOK_QUERY)) { - return; - } - for (Node child : node.getChildren()) { - // each insert of multi insert looks like - // (TOK_INSERT (TOK_INSERT_INTO (TOK_TAB (TOK_TABNAME T1))) - if (((HiveParserASTNode) child).getToken().getType() != HiveASTParser.TOK_INSERT) { - continue; - } - HiveParserASTNode n = - (HiveParserASTNode) - ((HiveParserASTNode) child) - .getFirstChildWithType(HiveASTParser.TOK_INSERT_INTO); - if (n == null) { - continue; - } - n = (HiveParserASTNode) n.getFirstChildWithType(HiveASTParser.TOK_TAB); - if (n == null) { - continue; - } - n = (HiveParserASTNode) n.getFirstChildWithType(HiveASTParser.TOK_TABNAME); - if (n == null) { - continue; - } - String[] dbTab = getQualifiedTableName(n); - db.getTable(dbTab[0], dbTab[1]); - } - } catch (Exception ex) { - throw new SemanticException(ex); - } - } - // Generates an expression node descriptor for the expression with HiveParserTypeCheckCtx. public ExprNodeDesc genExprNodeDesc(HiveParserASTNode expr, HiveParserRowResolver input) throws SemanticException { diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java index 389700057683c..0d1bed115e3b2 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java @@ -289,6 +289,28 @@ public void testGroupingID() throws Exception { } } + @Test + public void testValues() throws Exception { + tableEnv.executeSql( + "create table test_values(" + + "t tinyint,s smallint,i int,b bigint,f float,d double,de decimal(10,5),ts timestamp,dt date," + + "str string,ch char(3),vch varchar(3),bl boolean)"); + try { + tableEnv.executeSql( + "insert into table test_values values " + + "(1,-2,3,4,1.1,1.1,1.1,'2021-08-04 16:26:33.4','2021-08-04',null,'1234','56',false)") + .await(); + List result = + CollectionUtil.iteratorToList( + tableEnv.executeSql("select * from test_values").collect()); + assertEquals( + "[+I[1, -2, 3, 4, 1.1, 1.1, 1.10000, 2021-08-04T16:26:33.400, 2021-08-04, null, 123, 56, false]]", + result.toString()); + } finally { + tableEnv.executeSql("drop table test_values"); + } + } + private void runQFile(File qfile) throws Exception { QTest qTest = extractQTest(qfile); for (int i = 0; i < qTest.statements.size(); i++) {