Skip to content

Commit

Permalink
[FLINK-32161][test] Migrate and remove legacy ZookeeperResource.
Browse files Browse the repository at this point in the history
  • Loading branch information
reswqa committed May 25, 2023
1 parent d40d410 commit d79f4c1
Show file tree
Hide file tree
Showing 9 changed files with 274 additions and 316 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,27 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.util.TestLogger;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;

import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.state.ConnectionState;

import org.junit.Rule;
import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import static org.hamcrest.number.OrderingComparison.greaterThan;
import static org.junit.Assert.assertThat;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link ZooKeeperCheckpointIDCounter} in a ZooKeeper ensemble. */
public final class ZKCheckpointIDCounterMultiServersTest extends TestLogger {
class ZKCheckpointIDCounterMultiServersTest {

@Rule public ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
@RegisterExtension
private final EachCallbackWrapper<ZooKeeperExtension> zookeeperExtensionWrapper =
new EachCallbackWrapper<>(new ZooKeeperExtension());

/**
* Tests that {@link ZooKeeperCheckpointIDCounter} can be recovered after a connection loss
Expand All @@ -48,15 +49,16 @@ public final class ZKCheckpointIDCounterMultiServersTest extends TestLogger {
* <p>See also FLINK-14091.
*/
@Test
public void testRecoveredAfterConnectionLoss() throws Exception {
void testRecoveredAfterConnectionLoss() throws Exception {

final Configuration configuration = new Configuration();
configuration.setString(
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper =
ZooKeeperUtils.startCuratorFramework(configuration, NoOpFatalErrorHandler.INSTANCE);
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
zookeeperExtensionWrapper.getCustomExtension().getConnectString());

try {
try (final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper =
ZooKeeperUtils.startCuratorFramework(
configuration, NoOpFatalErrorHandler.INSTANCE)) {
OneShotLatch connectionLossLatch = new OneShotLatch();
OneShotLatch reconnectedLatch = new OneShotLatch();

Expand All @@ -71,14 +73,12 @@ public void testRecoveredAfterConnectionLoss() throws Exception {

final long initialID = idCounter.getAndIncrement();

zooKeeperResource.restart();
zookeeperExtensionWrapper.getCustomExtension().restart();

connectionLossLatch.await();
reconnectedLatch.await();

assertThat(idCounter.getAndIncrement(), greaterThan(initialID));
} finally {
curatorFrameworkWrapper.close();
assertThat(idCounter.getAndIncrement()).isGreaterThan(initialID);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
Expand All @@ -33,16 +34,14 @@
import org.apache.flink.runtime.state.SharedStateRegistryImpl;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;

import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;

import org.hamcrest.Matchers;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import javax.annotation.Nonnull;

Expand All @@ -54,60 +53,64 @@

import static java.util.Collections.emptyList;
import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link DefaultCompletedCheckpointStore} with {@link ZooKeeperStateHandleStore}. */
public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
class ZooKeeperCompletedCheckpointStoreTest {

@ClassRule public static ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
@RegisterExtension
public static AllCallbackWrapper<ZooKeeperExtension> zooKeeperExtensionWrapper =
new AllCallbackWrapper<>(new ZooKeeperExtension());

private static final ZooKeeperCheckpointStoreUtil zooKeeperCheckpointStoreUtil =
ZooKeeperCheckpointStoreUtil.INSTANCE;

@Test
public void testPathConversion() {
void testPathConversion() {
final long checkpointId = 42L;

final String path = zooKeeperCheckpointStoreUtil.checkpointIDToName(checkpointId);

assertEquals(checkpointId, zooKeeperCheckpointStoreUtil.nameToCheckpointID(path));
assertThat(zooKeeperCheckpointStoreUtil.nameToCheckpointID(path)).isEqualTo(checkpointId);
}

@Test
public void testRecoverFailsIfDownloadFails() {
void testRecoverFailsIfDownloadFails() {
final Configuration configuration = new Configuration();
configuration.setString(
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
zooKeeperExtensionWrapper.getCustomExtension().getConnectString());
final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointsInZk =
new ArrayList<>();
final ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper =
new ZooKeeperStateHandleStore<CompletedCheckpoint>(
ZooKeeperUtils.startCuratorFramework(
configuration, NoOpFatalErrorHandler.INSTANCE)
.asCuratorFramework(),
new TestingRetrievableStateStorageHelper<>()) {
@Override
public List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>
getAllAndLock() {
return checkpointsInZk;
}
};

checkpointsInZk.add(
createHandle(
1,
id -> {
throw new ExpectedTestException();
}));
final Exception exception =
assertThrows(
Exception.class,
() ->
DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints(
checkpointsInZooKeeper, zooKeeperCheckpointStoreUtil));
assertThat(exception, FlinkMatchers.containsCause(ExpectedTestException.class));
try (CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper =
ZooKeeperUtils.startCuratorFramework(
configuration, NoOpFatalErrorHandler.INSTANCE)) {
final ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper =
new ZooKeeperStateHandleStore<CompletedCheckpoint>(
curatorFrameworkWrapper.asCuratorFramework(),
new TestingRetrievableStateStorageHelper<>()) {
@Override
public List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>
getAllAndLock() {
return checkpointsInZk;
}
};

checkpointsInZk.add(
createHandle(
1,
id -> {
throw new ExpectedTestException();
}));
assertThatThrownBy(
() ->
DefaultCompletedCheckpointStoreUtils
.retrieveCompletedCheckpoints(
checkpointsInZooKeeper,
zooKeeperCheckpointStoreUtil))
.satisfies(FlinkAssertions.anyCauseMatches(ExpectedTestException.class));
}
}

private Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> createHandle(
Expand All @@ -119,11 +122,12 @@ private Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> createHandle

/** Tests that subsumed checkpoints are discarded. */
@Test
public void testDiscardingSubsumedCheckpoints() throws Exception {
void testDiscardingSubsumedCheckpoints() throws Exception {
final SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
final Configuration configuration = new Configuration();
configuration.setString(
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
zooKeeperExtensionWrapper.getCustomExtension().getConnectString());

final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper =
ZooKeeperUtils.startCuratorFramework(configuration, NoOpFatalErrorHandler.INSTANCE);
Expand All @@ -136,15 +140,14 @@ public void testDiscardingSubsumedCheckpoints() throws Exception {

checkpointStore.addCheckpointAndSubsumeOldestOne(
checkpoint1, new CheckpointsCleaner(), () -> {});
assertThat(checkpointStore.getAllCheckpoints(), Matchers.contains(checkpoint1));
assertThat(checkpointStore.getAllCheckpoints()).containsExactly(checkpoint1);

final CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint2 =
CompletedCheckpointStoreTest.createCheckpoint(1, sharedStateRegistry);
checkpointStore.addCheckpointAndSubsumeOldestOne(
checkpoint2, new CheckpointsCleaner(), () -> {});
final List<CompletedCheckpoint> allCheckpoints = checkpointStore.getAllCheckpoints();
assertThat(allCheckpoints, Matchers.contains(checkpoint2));
assertThat(allCheckpoints, Matchers.not(Matchers.contains(checkpoint1)));
assertThat(allCheckpoints).containsExactly(checkpoint2);

// verify that the subsumed checkpoint is discarded
CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
Expand All @@ -158,11 +161,12 @@ public void testDiscardingSubsumedCheckpoints() throws Exception {
* globally terminal state.
*/
@Test
public void testDiscardingCheckpointsAtShutDown() throws Exception {
void testDiscardingCheckpointsAtShutDown() throws Exception {
final SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
final Configuration configuration = new Configuration();
configuration.setString(
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
zooKeeperExtensionWrapper.getCustomExtension().getConnectString());

final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper =
ZooKeeperUtils.startCuratorFramework(configuration, NoOpFatalErrorHandler.INSTANCE);
Expand All @@ -175,7 +179,7 @@ public void testDiscardingCheckpointsAtShutDown() throws Exception {

checkpointStore.addCheckpointAndSubsumeOldestOne(
checkpoint1, new CheckpointsCleaner(), () -> {});
assertThat(checkpointStore.getAllCheckpoints(), Matchers.contains(checkpoint1));
assertThat(checkpointStore.getAllCheckpoints()).containsExactly(checkpoint1);

checkpointStore.shutdown(JobStatus.FINISHED, new CheckpointsCleaner());

Expand Down Expand Up @@ -232,41 +236,43 @@ public long getStateSize() {
* (i.e., there exists an exception thrown by the method).
*/
@Test
public void testAddCheckpointWithFailedRemove() throws Exception {
void testAddCheckpointWithFailedRemove() throws Exception {

final int numCheckpointsToRetain = 1;
final Configuration configuration = new Configuration();
configuration.setString(
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());

final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper =
ZooKeeperUtils.startCuratorFramework(configuration, NoOpFatalErrorHandler.INSTANCE);

final CompletedCheckpointStore store =
createZooKeeperCheckpointStore(curatorFrameworkWrapper.asCuratorFramework());

CountDownLatch discardAttempted = new CountDownLatch(1);
for (long i = 0; i < numCheckpointsToRetain + 1; ++i) {
CompletedCheckpoint checkpointToAdd =
new CompletedCheckpoint(
new JobID(),
i,
i,
i,
Collections.emptyMap(),
Collections.emptyList(),
CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
new TestCompletedCheckpointStorageLocation(),
null);
// shouldn't fail despite the exception
store.addCheckpointAndSubsumeOldestOne(
checkpointToAdd,
new CheckpointsCleaner(),
() -> {
discardAttempted.countDown();
throw new RuntimeException();
});
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
zooKeeperExtensionWrapper.getCustomExtension().getConnectString());

try (final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper =
ZooKeeperUtils.startCuratorFramework(
configuration, NoOpFatalErrorHandler.INSTANCE)) {
final CompletedCheckpointStore store =
createZooKeeperCheckpointStore(curatorFrameworkWrapper.asCuratorFramework());

CountDownLatch discardAttempted = new CountDownLatch(1);
for (long i = 0; i < numCheckpointsToRetain + 1; ++i) {
CompletedCheckpoint checkpointToAdd =
new CompletedCheckpoint(
new JobID(),
i,
i,
i,
Collections.emptyMap(),
Collections.emptyList(),
CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
new TestCompletedCheckpointStorageLocation(),
null);
// shouldn't fail despite the exception
store.addCheckpointAndSubsumeOldestOne(
checkpointToAdd,
new CheckpointsCleaner(),
() -> {
discardAttempted.countDown();
throw new RuntimeException();
});
}
discardAttempted.await();
}
discardAttempted.await();
}
}
Loading

0 comments on commit d79f4c1

Please sign in to comment.