From 4c204c2c7252e4a5643b20e6a2f5693219c50d17 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 30 Mar 2021 04:50:21 +0300 Subject: [PATCH] [Tests] Reduce functions integration test flakiness (#10074) * Use unique function names * Use Awaitility for all function status checks - wait up to 15 seconds --- .../functions/PulsarFunctionsTest.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 1486d7f9fefa1..84fede2c7bb59 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -895,7 +895,7 @@ public void testFunctionLocalRun(Runtime runtime) throws Exception { commandGenerator.setAdminUrl("pulsar://pulsar-broker-0:6650"); commandGenerator.setSourceTopic(inputTopicName); commandGenerator.setSinkTopic(outputTopicName); - commandGenerator.setFunctionName("localRunTest"); + commandGenerator.setFunctionName("localRunTest-" + randomName(8)); commandGenerator.setRuntime(runtime); switch (runtime) { case JAVA: @@ -1640,13 +1640,8 @@ private void testExclamationFunction(Runtime runtime, // update parallelism updateFunctionParallelism(functionName, 2); - Awaitility.await() - .pollInterval(Duration.ofMillis(500L)) - .ignoreExceptions() - .untilAsserted(() -> - //get function status - getFunctionStatus(functionName, 0, true, 2) - ); + //get function status + getFunctionStatus(functionName, 0, true, 2); // delete function deleteFunction(functionName); @@ -1993,6 +1988,16 @@ private void getFunctionStatus(String functionName, int numMessages, boolean che private void getFunctionStatus(String functionName, int numMessages, boolean checkRestarts, int parallelism) throws Exception { + Awaitility.await() + .pollInterval(Duration.ofSeconds(1)) + .atMost(Duration.ofSeconds(15)) + .ignoreExceptions() + .untilAsserted(() -> + doGetFunctionStatus(functionName, numMessages, checkRestarts, parallelism)); + } + + private void doGetFunctionStatus(String functionName, int numMessages, boolean checkRestarts, int parallelism) + throws Exception { ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd( PulsarCluster.ADMIN_SCRIPT, "functions", @@ -2228,7 +2233,7 @@ public void testAvroSchemaFunction() throws Exception { log.info("testAvroSchemaFunction start ..."); final String inputTopic = "test-avroschema-input-" + randomName(8); final String outputTopic = "test-avroschema-output-" + randomName(8); - final String functionName = "test-avroschema-fn-202003241756"; + final String functionName = "test-avroschema-fn-" + randomName(8); final int numMessages = 10; if (pulsarCluster == null) {