diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index b190d34cecc53..54b2c8ae8de2e 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -64,6 +64,7 @@ import org.junit.Test; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.Serializable; import java.util.ArrayDeque; @@ -468,13 +469,7 @@ public void testClosePartitionDiscovererWhenOpenThrowException() throws Exceptio final DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer<>(failingPartitionDiscoverer); - try { - setupConsumer(consumer); - fail("Exception should be thrown in open method"); - } catch (RuntimeException e) { - assertThat(ExceptionUtils.findThrowable(e, t -> t.equals(failureCause)).isPresent(), is(true)); - } - consumer.close(); + testConsumerLifeCycle(consumer, failureCause); assertTrue("partitionDiscoverer should be closed when consumer is closed", failingPartitionDiscoverer.isClosed()); } @@ -490,15 +485,7 @@ public void testClosePartitionDiscovererWhenCreateKafkaFetcherFails() throws Exc testPartitionDiscoverer, 100L); - setupConsumer(consumer); - - try { - consumer.run(new TestSourceContext<>()); - fail("Exception should be thrown in run method"); - } catch (Exception e) { - assertThat(ExceptionUtils.findThrowable(e, throwable -> throwable.equals(failureCause)).isPresent(), is(true)); - } - consumer.close(); + testConsumerLifeCycle(consumer, failureCause); assertTrue("partitionDiscoverer should be closed when consumer is closed", testPartitionDiscoverer.isClosed()); } @@ -512,16 +499,7 @@ public void testClosePartitionDiscovererWhenKafkaFetcherFails() throws Exception final DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer<>(() -> mock, testPartitionDiscoverer, 100L); - setupConsumer(consumer); - - try { - consumer.run(new TestSourceContext<>()); - fail("Exception should be thrown in run method"); - } catch (Exception e) { - assertThat(ExceptionUtils.findThrowable(e, throwable -> throwable.equals(failureCause)).isPresent(), is(true)); - } - consumer.close(); - consumer.joinDiscoveryLoopThread(); + testConsumerLifeCycle(consumer, failureCause); assertTrue("partitionDiscoverer should be closed when consumer is closed", testPartitionDiscoverer.isClosed()); } @@ -531,19 +509,33 @@ public void testClosePartitionDiscovererWithCancellation() throws Exception { final TestingFlinkKafkaConsumer consumer = new TestingFlinkKafkaConsumer<>(testPartitionDiscoverer, 100L); - setupConsumer(consumer); - - CompletableFuture runFuture = CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> consumer.run(new TestSourceContext<>()))); - - consumer.close(); - - consumer.joinDiscoveryLoopThread(); - runFuture.get(); - + testConsumerLifeCycle(consumer, null); assertTrue("partitionDiscoverer should be closed when consumer is closed", testPartitionDiscoverer.isClosed()); } - protected void setupConsumer(FlinkKafkaConsumerBase consumer) throws Exception { + private void testConsumerLifeCycle( + FlinkKafkaConsumerBase testKafkaConsumer, + @Nullable Exception expectedException) throws Exception { + + if (expectedException == null) { + setupConsumer(testKafkaConsumer); + final CompletableFuture runFuture = CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> testKafkaConsumer.run(new TestSourceContext<>()))); + testKafkaConsumer.close(); + runFuture.get(); + } else { + try { + setupConsumer(testKafkaConsumer); + testKafkaConsumer.run(new TestSourceContext<>()); + + fail("Exception should have been thrown from open / run method of FlinkKafkaConsumerBase."); + } catch (Exception e) { + assertThat(ExceptionUtils.findThrowable(e, throwable -> throwable.equals(expectedException)).isPresent(), is(true)); + } + testKafkaConsumer.close(); + } + } + + private void setupConsumer(FlinkKafkaConsumerBase consumer) throws Exception { setupConsumer( consumer, false,