Skip to content

Commit

Permalink
[build] Remove net.jodah.failsafe dependency (fix JDK17 build) (apach…
Browse files Browse the repository at this point in the history
…e#14124)

* [build] Upgrade failsafe (fix jdk17 build)

* [build] Upgrade failsafe (fix jdk17 build)

* move to awaitility

* move to awaitility
  • Loading branch information
nicoloboschi authored Feb 4, 2022
1 parent cc4e352 commit 4e29a1e
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 22 deletions.
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ flexible messaging model and an intuitive client API.</description>
<mockito.version>3.12.4</mockito.version>
<powermock.version>2.0.9</powermock.version>
<javassist.version>3.25.0-GA</javassist.version>
<failsafe.version>2.3.1</failsafe.version>
<skyscreamer.version>1.5.0</skyscreamer.version>
<objenesis.version>3.1</objenesis.version>
<awaitility.version>4.0.3</awaitility.version>
Expand Down
7 changes: 0 additions & 7 deletions tests/integration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>net.jodah</groupId>
<artifactId>failsafe</artifactId>
<version>${failsafe.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import lombok.Cleanup;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;
Expand All @@ -40,6 +38,7 @@
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
Expand Down Expand Up @@ -75,14 +74,6 @@ public class AvroKafkaSourceTest extends PulsarFunctionsTestBase {

private static final String SOURCE_TYPE = "kafka";

final Duration ONE_MINUTE = Duration.ofMinutes(1);
final Duration TEN_SECONDS = Duration.ofSeconds(10);

final RetryPolicy statusRetryPolicy = new RetryPolicy()
.withMaxDuration(ONE_MINUTE)
.withDelay(TEN_SECONDS)
.onRetry(e -> log.error("Retry ... "));

private final String kafkaTopicName = "kafkasourcetopic";

private EnhancedKafkaContainer kafkaContainer;
Expand Down Expand Up @@ -221,14 +212,34 @@ private void testSource() throws Exception {
getSourceInfoSuccess(tenant, namespace, sourceName);

// get source status
Failsafe.with(statusRetryPolicy).run(() -> getSourceStatus(tenant, namespace, sourceName));

Awaitility.with()
.timeout(Duration.ofMinutes(1))
.pollInterval(Duration.ofSeconds(10))
.until(() -> {
try {
getSourceStatus(tenant, namespace, sourceName);
return true;
} catch (Throwable ex) {
log.error("Error while getting source status, will retry", ex);
return false;
}
});
// produce messages
List<MyBean> messages = produceSourceMessages(numMessages);

// wait for source to process messages
Failsafe.with(statusRetryPolicy).run(() ->
waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages));
Awaitility.with()
.timeout(Duration.ofMinutes(1))
.pollInterval(Duration.ofSeconds(10))
.until(() -> {
try {
waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages);
return true;
} catch (Throwable ex) {
log.error("Error while processing source messages, will retry", ex);
return false;
}
});

// validate the source result
validateSourceResultAvro(consumer, messages);
Expand Down

0 comments on commit 4e29a1e

Please sign in to comment.