Skip to content

Commit

Permalink
[hotfix][table-planner][tests] Cleanup by using PlannerMocks
Browse files Browse the repository at this point in the history
Cleanup some test code by using `PlannerMocks` instead of creating
`PlannerContext` manually.
  • Loading branch information
matriv authored and twalthr committed Mar 24, 2022
1 parent 3ddd74e commit db070b4
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,65 +20,36 @@

import org.apache.flink.table.api.SqlParserEOFException;
import org.apache.flink.table.api.SqlParserException;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.flink.table.planner.utils.PlannerMocks;

import org.junit.Test;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Supplier;

import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
import static org.apache.flink.table.planner.delegation.ParserImplTest.TestSpec.forStatement;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test for {@link ParserImpl}. */
public class ParserImplTest {

private final boolean isStreamingMode = false;
private final TableConfig tableConfig = TableConfig.getDefault();
private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", "default");
private final CatalogManager catalogManager =
CatalogManagerMocks.preparedCatalogManager().defaultCatalog("builtin", catalog).build();
private final ModuleManager moduleManager = new ModuleManager();
private final FunctionCatalog functionCatalog =
new FunctionCatalog(tableConfig, catalogManager, moduleManager);
private final PlannerContext plannerContext =
new PlannerContext(
!isStreamingMode,
tableConfig,
moduleManager,
functionCatalog,
catalogManager,
asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
new ArrayList<>());

private final Supplier<FlinkPlannerImpl> plannerSupplier =
() ->
plannerContext.createFlinkPlanner(
catalogManager.getCurrentCatalog(),
catalogManager.getCurrentDatabase());
private final PlannerMocks plannerMocks = PlannerMocks.create(true);

private final Supplier<FlinkPlannerImpl> plannerSupplier = plannerMocks::getPlanner;

private final Parser parser =
new ParserImpl(
catalogManager,
plannerMocks.getCatalogManager(),
plannerSupplier,
() -> plannerSupplier.get().parser(),
plannerContext.getSqlExprToRexConverterFactory());
plannerMocks.getPlannerContext().getSqlExprToRexConverterFactory());

private static final List<TestSpec> TEST_SPECS =
Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,12 @@
package org.apache.flink.table.planner.expressions.converter;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.expressions.TimePointUnit;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.planner.delegation.PlannerContext;
import org.apache.flink.table.planner.plan.metadata.MetadataTestUtil;
import org.apache.flink.table.planner.plan.trait.FlinkRelDistributionTraitDef;
import org.apache.flink.table.planner.utils.PlannerMocks;
import org.apache.flink.table.utils.CatalogManagerMocks;

import org.apache.calcite.avatica.util.TimeUnit;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.type.SqlTypeName;
Expand All @@ -48,29 +40,14 @@
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.Period;
import java.util.Arrays;

import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link ExpressionConverter}. */
public class ExpressionConverterTest {

private final TableConfig tableConfig = TableConfig.getDefault();
private final CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
private final ModuleManager moduleManager = new ModuleManager();
private final PlannerContext plannerContext =
new PlannerContext(
false,
tableConfig,
moduleManager,
new FunctionCatalog(tableConfig, catalogManager, moduleManager),
catalogManager,
CalciteSchema.from(MetadataTestUtil.initRootSchema()),
Arrays.asList(
ConventionTraitDef.INSTANCE,
FlinkRelDistributionTraitDef.INSTANCE(),
RelCollationTraitDef.INSTANCE));
private final PlannerContext plannerContext = PlannerMocks.create().getPlannerContext();
private final ExpressionConverter converter =
new ExpressionConverter(
plannerContext.createRelBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.factories.TestManagedTableFactory;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.BeginStatementSetOperation;
import org.apache.flink.table.operations.EndStatementSetOperation;
import org.apache.flink.table.operations.ExplainOperation;
Expand Down Expand Up @@ -96,6 +95,7 @@
import org.apache.flink.table.planner.parse.CalciteParser;
import org.apache.flink.table.planner.parse.ExtendedParser;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
import org.apache.flink.table.planner.utils.PlannerMocks;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.flink.table.utils.ExpressionResolverMocks;
Expand Down Expand Up @@ -146,35 +146,32 @@ public class SqlToOperationConverterTest {
ExecutionOptions.RUNTIME_MODE.key(),
RuntimeExecutionMode.BATCH.name())))
.build();
private final ModuleManager moduleManager = new ModuleManager();
private final FunctionCatalog functionCatalog =
new FunctionCatalog(tableConfig, catalogManager, moduleManager);

private final PlannerMocks plannerMocks =
PlannerMocks.newBuilder()
.withBatchMode(true)
.withTableConfig(tableConfig)
.withCatalogManager(catalogManager)
.withRootSchema(
asRootSchema(
new CatalogManagerCalciteSchema(
catalogManager, isStreamingMode)))
.build();
private final PlannerContext plannerContext = plannerMocks.getPlannerContext();
private final FunctionCatalog functionCatalog = plannerMocks.getFunctionCatalog();

private final Supplier<FlinkPlannerImpl> plannerSupplier =
() ->
getPlannerContext()
.createFlinkPlanner(
catalogManager.getCurrentCatalog(),
catalogManager.getCurrentDatabase());
private final PlannerContext plannerContext =
new PlannerContext(
true,
tableConfig,
moduleManager,
functionCatalog,
catalogManager,
asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
Collections.emptyList());
plannerContext.createFlinkPlanner(
catalogManager.getCurrentCatalog(),
catalogManager.getCurrentDatabase());

private final Parser parser =
new ParserImpl(
catalogManager,
plannerSupplier,
() -> plannerSupplier.get().parser(),
getPlannerContext().getSqlExprToRexConverterFactory());

private PlannerContext getPlannerContext() {
return plannerContext;
}
plannerContext.getSqlExprToRexConverterFactory());

@BeforeEach
public void before() throws TableAlreadyExistException, DatabaseNotExistException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
import org.apache.flink.table.planner.delegation.ParserImpl;
import org.apache.flink.table.planner.delegation.PlannerContext;
import org.apache.flink.table.planner.utils.PlannerMocks;
import org.apache.flink.table.utils.CatalogManagerMocks;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
Expand All @@ -36,7 +35,6 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

import java.io.IOException;
import java.util.Collections;

import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -70,18 +68,14 @@ static SerdeContext configuredSerdeContext(
static SerdeContext configuredSerdeContext(
CatalogManager catalogManager, TableConfig tableConfig) {
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
final ModuleManager moduleManager = new ModuleManager();
final FunctionCatalog functionCatalog =
new FunctionCatalog(tableConfig, catalogManager, moduleManager);
final PlannerContext plannerContext =
new PlannerContext(
false,
tableConfig,
moduleManager,
functionCatalog,
catalogManager,
asRootSchema(new CatalogManagerCalciteSchema(catalogManager, true)),
Collections.emptyList());
PlannerMocks.newBuilder()
.withCatalogManager(catalogManager)
.withTableConfig(tableConfig)
.withRootSchema(
asRootSchema(new CatalogManagerCalciteSchema(catalogManager, true)))
.build()
.getPlannerContext();
return new SerdeContext(
new ParserImpl(null, null, plannerContext::createCalciteParser, null),
plannerContext.getFlinkContext(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@
import org.apache.flink.table.planner.delegation.PlannerContext;
import org.apache.flink.table.utils.CatalogManagerMocks;

import java.util.ArrayList;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.plan.RelTraitDef;

import java.util.Collections;
import java.util.List;

import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;

Expand All @@ -47,27 +50,37 @@ public class PlannerMocks {
private final FlinkPlannerImpl planner;
private final ParserImpl parser;
private final CatalogManager catalogManager;
private final FunctionCatalog functionCatalog;
private final TableConfig tableConfig;
private final PlannerContext plannerContext;

private PlannerMocks(TableConfig tableConfig) {
this.catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
@SuppressWarnings("rawtypes")
private PlannerMocks(
boolean isBatchMode,
TableConfig tableConfig,
CatalogManager catalogManager,
List<RelTraitDef> traitDefs,
CalciteSchema rootSchema) {
this.catalogManager = catalogManager;
this.tableConfig = tableConfig;

final ModuleManager moduleManager = new ModuleManager();

final FunctionCatalog functionCatalog =
new FunctionCatalog(tableConfig, catalogManager, moduleManager);
this.functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);

this.plannerContext =
new PlannerContext(
false,
isBatchMode,
tableConfig,
moduleManager,
functionCatalog,
catalogManager,
asRootSchema(new CatalogManagerCalciteSchema(catalogManager, true)),
new ArrayList<>());
rootSchema != null
? rootSchema
: asRootSchema(
new CatalogManagerCalciteSchema(
catalogManager, !isBatchMode)),
traitDefs);

this.planner =
plannerContext.createFlinkPlanner(
Expand Down Expand Up @@ -103,6 +116,10 @@ public CatalogManager getCatalogManager() {
return catalogManager;
}

public FunctionCatalog getFunctionCatalog() {
return functionCatalog;
}

public TableConfig getTableConfig() {
return tableConfig;
}
Expand All @@ -127,13 +144,66 @@ public PlannerMocks registerTemporaryTable(String tableName, Schema tableSchema)
return this;
}

/** Builder for {@link PlannerMocks} to facilitate various test use cases. */
@SuppressWarnings("rawtypes")
public static class Builder {

private boolean batchMode = false;
private TableConfig tableConfig = TableConfig.getDefault();
private CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
private List<RelTraitDef> traitDefs = Collections.emptyList();
private CalciteSchema rootSchema;

private Builder() {}

public Builder withBatchMode(boolean batchMode) {
this.batchMode = batchMode;
return this;
}

public Builder withTableConfig(TableConfig tableConfig) {
this.tableConfig = tableConfig;
return this;
}

public Builder withConfiguration(Configuration configuration) {
tableConfig.addConfiguration(configuration);
return this;
}

public Builder withCatalogManager(CatalogManager catalogManager) {
this.catalogManager = catalogManager;
return this;
}

public Builder withTraitDefs(List<RelTraitDef> traitDefs) {
this.traitDefs = traitDefs;
return this;
}

public Builder withRootSchema(CalciteSchema rootSchema) {
this.rootSchema = rootSchema;
return this;
}

public PlannerMocks build() {
return new PlannerMocks(batchMode, tableConfig, catalogManager, traitDefs, rootSchema);
}
}

public static Builder newBuilder() {
return new Builder();
}

public static PlannerMocks create() {
return new PlannerMocks(TableConfig.getDefault());
return new Builder().build();
}

public static PlannerMocks create(boolean batchMode) {
return new Builder().withBatchMode(batchMode).build();
}

public static PlannerMocks create(Configuration configuration) {
TableConfig tableConfig = TableConfig.getDefault();
tableConfig.addConfiguration(configuration);
return new PlannerMocks(tableConfig);
return new Builder().withConfiguration(configuration).build();
}
}
Loading

0 comments on commit db070b4

Please sign in to comment.