Skip to content

Commit

Permalink
[FLINK-22802][k8s] Bump fabric8 Kubernetes client version to 5.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyang0918 committed Jul 31, 2021
1 parent 047e105 commit a115a3c
Show file tree
Hide file tree
Showing 14 changed files with 63 additions and 73 deletions.
5 changes: 2 additions & 3 deletions flink-kubernetes/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ under the License.
<packaging>jar</packaging>

<properties>
<kubernetes.client.version>4.9.2</kubernetes.client.version>
<kubernetes.client.version>5.5.0</kubernetes.client.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -129,8 +129,7 @@ under the License.
<artifactSet>
<includes combine.children="append">
<include>io.fabric8:kubernetes-client</include>
<include>io.fabric8:kubernetes-model</include>
<include>io.fabric8:kubernetes-model-common</include>
<include>io.fabric8:kubernetes-model-*</include>
<include>io.fabric8:zjsonpatch</include>

<!-- Shade all the dependencies of kubernetes client -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ public class KubernetesHaServices extends AbstractHaServices {
this.watchExecutorService =
Executors.newCachedThreadPool(
new ExecutorThreadFactory("config-map-watch-handler"));
this.configMapSharedWatcher.run();

lockIdentity = UUID.randomUUID().toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
/** The interface for the Kubernetes shared watcher. */
public interface KubernetesSharedWatcher<T> extends AutoCloseable {

/** Run the shared watcher. */
void run();

/** Close the shared watcher without Exception. */
@Override
void close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.net.HttpURLConnection.HTTP_GONE;

/** Watcher for resources in Kubernetes. */
public abstract class AbstractKubernetesWatcher<
T extends HasMetadata, K extends KubernetesResource<T>>
Expand All @@ -42,7 +40,7 @@ public abstract class AbstractKubernetesWatcher<
}

@Override
public void onClose(KubernetesClientException cause) {
public void onClose(WatcherException cause) {
// null means the watcher is closed normally.
if (cause == null) {
logger.info("The watcher is closing.");
Expand All @@ -51,7 +49,7 @@ public void onClose(KubernetesClientException cause) {
// status code, so this should be handled by the caller. Refer to
// https://github.com/fabric8io/kubernetes-client/blob/v4.9.2/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java#L255
// for more information about the implementation.
if (cause.getCode() == HTTP_GONE) {
if (cause.isHttpGone()) {
logger.debug(
"Got a http code 'HTTP_GONE' which means the Kubernetes client has the "
+ "too old resource version.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,29 @@
package org.apache.flink.kubernetes.kubeclient.resources;

import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapList;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.dsl.Informable;

import java.util.Map;

/** The shared informer for {@link ConfigMap}, it can be used as a shared watcher. */
public class KubernetesConfigMapSharedInformer
extends KubernetesSharedInformer<ConfigMap, ConfigMapList, KubernetesConfigMap>
extends KubernetesSharedInformer<ConfigMap, KubernetesConfigMap>
implements KubernetesConfigMapSharedWatcher {

public KubernetesConfigMapSharedInformer(
NamespacedKubernetesClient client, Map<String, String> labels) {
super(client, ConfigMap.class, ConfigMapList.class, labels, KubernetesConfigMap::new);
super(client, getInformableConfigMaps(client, labels), KubernetesConfigMap::new);
}

private static Informable<ConfigMap> getInformableConfigMaps(
NamespacedKubernetesClient client, Map<String, String> labels) {
Preconditions.checkArgument(
!CollectionUtil.isNullOrEmpty(labels), "Labels must not be null or empty");
return client.configMaps().withLabels(labels);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@

import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.dsl.Informable;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.SharedInformerEventListener;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,47 +45,36 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;

/** Base class for shared watcher based on {@link SharedIndexInformer}. */
public abstract class KubernetesSharedInformer<
T extends HasMetadata, TList extends KubernetesResourceList<T>, R>
public abstract class KubernetesSharedInformer<T extends HasMetadata, R>
implements KubernetesSharedWatcher<R> {

protected final Logger log = LoggerFactory.getLogger(getClass());

private final NamespacedKubernetesClient client;
private final SharedInformerFactory sharedInformerFactory;
private final SharedIndexInformer<T> sharedIndexInformer;
private final Function<T, R> eventWrapper;

private final ExecutorService informerExecutor;

private final AggregatedEventHandler aggregatedEventHandler;

public KubernetesSharedInformer(
NamespacedKubernetesClient client,
Class<T> apiTypeClass,
Class<TList> apiListTypeClass,
Map<String, String> labels,
Informable<T> informable,
Function<T, R> eventWrapper) {
Preconditions.checkArgument(
!CollectionUtil.isNullOrEmpty(labels), "Labels must not be null or empty");
this.client = client;
final ExecutorService executorService =

informerExecutor =
Executors.newSingleThreadExecutor(
new ExecutorThreadFactory("KubernetesClient-Informer"));
this.sharedInformerFactory = client.informers(executorService);
this.sharedInformerFactory.withLabels(labels);
// Using Long.MAX_VALUE as resync period to disable the internal periodical resync. Zero
// value does not work exactly here. It could be fixed after we bump the fabric8 Kubernetes
// client version to 5.0.0+. For more details, see
// https://github.com/fabric8io/kubernetes-client/issues/2651.
this.sharedIndexInformer =
sharedInformerFactory.sharedIndexInformerFor(
apiTypeClass, apiListTypeClass, Long.MAX_VALUE);
this.aggregatedEventHandler = new AggregatedEventHandler(executorService);
this.sharedIndexInformer.addEventHandler(aggregatedEventHandler);
this.sharedInformerFactory.addSharedInformerEventListener(aggregatedEventHandler);

this.aggregatedEventHandler = new AggregatedEventHandler(informerExecutor);
this.sharedIndexInformer = informable.inform(aggregatedEventHandler, 0);

this.eventWrapper = eventWrapper;
}
Expand All @@ -100,22 +87,17 @@ public Watch watch(
return aggregatedEventHandler.watch(name, new WatchCallback<>(handler, executorService));
}

@Override
public void run() {
sharedInformerFactory.startAllRegisteredInformers();
}

@Override
public void close() {
sharedInformerFactory.stopAllRegisteredInformers();
this.sharedIndexInformer.stop();
ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, this.informerExecutor);
}

private String getResourceKey(String name) {
return client.getNamespace() + "/" + name;
}

private class AggregatedEventHandler
implements ResourceEventHandler<T>, SharedInformerEventListener {
private class AggregatedEventHandler implements ResourceEventHandler<T> {
private final Map<String, EventHandler> handlers = new HashMap<>();
private final ExecutorService executorService;

Expand All @@ -141,11 +123,6 @@ public void onDelete(T obj, boolean deletedFinalStateUnknown) {
() -> findHandler(obj).ifPresent(EventHandler::handleResourceEvent));
}

@Override
public void onException(Exception exception) {
handlers.forEach((k, h) -> h.handleExceptionEvent(exception));
}

private Watch watch(String name, WatchCallback<R> watch) {
final String resourceKey = getResourceKey(name);
final String watchId = UUID.randomUUID().toString();
Expand Down Expand Up @@ -241,10 +218,6 @@ private void onDeleted(T obj) {
private List<R> wrapEvent(T obj) {
return Collections.singletonList(eventWrapper.apply(obj));
}

private void handleExceptionEvent(Exception e) {
this.callbacks.forEach((id, callback) -> callback.run(h -> h.handleError(e)));
}
}

private static final class WatchCallback<T> {
Expand Down
24 changes: 21 additions & 3 deletions flink-kubernetes/src/main/resources/META-INF/NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,27 @@ This project bundles the following dependencies under the Apache Software Licens
- com.squareup.okhttp3:logging-interceptor:3.14.9
- com.squareup.okhttp3:okhttp:3.14.9
- com.squareup.okio:okio:1.17.2
- io.fabric8:kubernetes-client:4.9.2
- io.fabric8:kubernetes-model:4.9.2
- io.fabric8:kubernetes-model-common:4.9.2
- io.fabric8:kubernetes-client:5.5.0
- io.fabric8:kubernetes-model-core:5.5.0
- io.fabric8:kubernetes-model-common:5.5.0
- io.fabric8:kubernetes-model-rbac:5.5.0
- io.fabric8:kubernetes-model-admissionregistration:5.5.0
- io.fabric8:kubernetes-model-apps:5.5.0
- io.fabric8:kubernetes-model-autoscaling:5.5.0
- io.fabric8:kubernetes-model-apiextensions:5.5.0
- io.fabric8:kubernetes-model-batch:5.5.0
- io.fabric8:kubernetes-model-certificates:5.5.0
- io.fabric8:kubernetes-model-coordination:5.5.0
- io.fabric8:kubernetes-model-discovery:5.5.0
- io.fabric8:kubernetes-model-events:5.5.0
- io.fabric8:kubernetes-model-extensions:5.5.0
- io.fabric8:kubernetes-model-flowcontrol:5.5.0
- io.fabric8:kubernetes-model-networking:5.5.0
- io.fabric8:kubernetes-model-metrics:5.5.0
- io.fabric8:kubernetes-model-policy:5.5.0
- io.fabric8:kubernetes-model-scheduling:5.5.0
- io.fabric8:kubernetes-model-storageclass:5.5.0
- io.fabric8:kubernetes-model-node:5.5.0
- io.fabric8:zjsonpatch:0.3.0
- org.yaml:snakeyaml:1.27

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.function.Function;

Expand Down Expand Up @@ -103,7 +104,8 @@ protected Service buildExternalServiceWithLoadBalancer(
.withLoadBalancer(
new LoadBalancerStatus(
Collections.singletonList(
new LoadBalancerIngress(hostname, ip))))
new LoadBalancerIngress(
hostname, ip, new ArrayList<>()))))
.build();

return buildExternalService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public MixedDispatcher(Map<ServerRequest, Queue<ServerResponse>> responses) {
}

@Override
public MockResponse dispatch(RecordedRequest request) throws InterruptedException {
public MockResponse dispatch(RecordedRequest request) {
HttpMethod method = HttpMethod.valueOf(request.getMethod());
String path = request.getPath();
SimpleRequest key = new SimpleRequest(method, path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ protected class Context {
}

void runTest(RunnableWithException testMethod) throws Exception {
configMapSharedWatcher.run();
electionEventHandler.init(leaderElectionDriver);
testMethod.run();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public void testLeaderElectionAndRetrieval() throws Exception {
flinkKubeClient.createConfigMapSharedWatcher(
KubernetesUtils.getConfigMapLabels(
clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY));
configMapSharedWatcher.run();
final ExecutorService watchExecutorService = Executors.newCachedThreadPool();

final TestingLeaderElectionEventHandler electionEventHandler =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,11 +456,6 @@ public void setWatchFunction(
this.watchFunction = watchFunction;
}

@Override
public void run() {
// noop
}

@Override
public void close() {
// noop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.fabric8.kubernetes.api.model.StatusBuilder;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -60,7 +61,7 @@ public void testClosingWithException() {
final AtomicBoolean called = new AtomicBoolean(false);
final KubernetesPodsWatcher podsWatcher =
new KubernetesPodsWatcher(new TestingCallbackHandler(e -> called.set(true)));
podsWatcher.onClose(new KubernetesClientException("exception"));
podsWatcher.onClose(new WatcherException("exception"));
assertThat(called.get(), is(true));
}

Expand Down Expand Up @@ -95,7 +96,10 @@ public void testClosingWithTooOldResourceVersion() {
assertThat(e, FlinkMatchers.containsMessage(errMsg));
}));
podsWatcher.onClose(
new KubernetesClientException(errMsg, HTTP_GONE, new StatusBuilder().build()));
new WatcherException(
errMsg,
new KubernetesClientException(
errMsg, HTTP_GONE, new StatusBuilder().build())));
}

private class TestingCallbackHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public void testWatch() throws Exception {

try (KubernetesConfigMapSharedWatcher watcher =
client.createConfigMapSharedWatcher(labels)) {
watcher.run();
for (int i = 0; i < 10; i++) {
List<TestingCallbackHandler> callbackHandlers = new ArrayList<>();
List<Watch> watchers = new ArrayList<>();
Expand Down Expand Up @@ -119,7 +118,6 @@ public void testWatch() throws Exception {
public void testWatchWithBlockHandler() throws Exception {
try (KubernetesConfigMapSharedWatcher watcher =
client.createConfigMapSharedWatcher(labels)) {
watcher.run();

final String configMapName = getConfigMapName(System.currentTimeMillis());
final long block = 500;
Expand Down

0 comments on commit a115a3c

Please sign in to comment.