Skip to content

Commit

Permalink
[FLINK-34422][test] BatchTestBase uses MiniClusterExtension
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Feb 12, 2024
1 parent 65727fb commit 4c4643c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@
/** IT Case for testing managed table compaction. */
class CompactManagedTableITCase extends BatchTestBase {

private final ObjectIdentifier tableIdentifier =
ObjectIdentifier.of(tEnv().getCurrentCatalog(), tEnv().getCurrentDatabase(), "MyTable");
private ObjectIdentifier tableIdentifier;
private final Map<CatalogPartitionSpec, List<RowData>> collectedElements = new HashMap<>();

private Path rootPath;
Expand All @@ -73,6 +72,9 @@ class CompactManagedTableITCase extends BatchTestBase {
@BeforeEach
public void before() throws Exception {
super.before();
tableIdentifier =
ObjectIdentifier.of(
tEnv().getCurrentCatalog(), tEnv().getCurrentDatabase(), "MyTable");
MANAGED_TABLES.put(tableIdentifier, new AtomicReference<>());
referenceOfManagedTableFileEntries = new AtomicReference<>();
MANAGED_TABLE_FILE_ENTRIES.put(tableIdentifier, referenceOfManagedTableFileEntries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,11 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach}
class BatchTestBase extends BatchAbstractTestBase {

protected var settings = EnvironmentSettings.newInstance().inBatchMode().build()
protected var testingTableEnv: TestingTableEnvironment = TestingTableEnvironment
.create(settings, catalogManager = None, TableConfig.getDefault)
protected var tEnv: TableEnvironment = testingTableEnv
tEnv.getConfig.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED, Boolean.box(false))
protected var planner =
tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
protected var env: StreamExecutionEnvironment = planner.getExecEnv
env.getConfig.enableObjectReuse()
protected var tableConfig: TableConfig = tEnv.getConfig
protected var testingTableEnv: TestingTableEnvironment = _
protected var tEnv: TableEnvironment = _
protected var planner: PlannerBase = _
protected var env: StreamExecutionEnvironment = _
protected var tableConfig: TableConfig = _

val LINE_COL_PATTERN: Pattern = Pattern.compile("At line ([0-9]+), column ([0-9]+)")
val LINE_COL_TWICE_PATTERN: Pattern = Pattern.compile(
Expand All @@ -74,10 +70,22 @@ class BatchTestBase extends BatchAbstractTestBase {

@throws(classOf[Exception])
@BeforeEach
def before(): Unit = {
def setupEnv(): Unit = {
testingTableEnv = TestingTableEnvironment
.create(settings, catalogManager = None, TableConfig.getDefault)
tEnv = testingTableEnv
tEnv.getConfig.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED, Boolean.box(false))
planner = tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
env = planner.getExecEnv
env.getConfig.enableObjectReuse()
tableConfig = tEnv.getConfig
BatchTestBase.configForMiniCluster(tableConfig)
}

@throws(classOf[Exception])
@BeforeEach
def before(): Unit = {}

@AfterEach
def after(): Unit = {
TestValuesTableFactory.clearAllData()
Expand Down

0 comments on commit 4c4643c

Please sign in to comment.