Skip to content

Commit

Permalink
[WIP] Phase support for Flink-K8s runtime (apache#320)
Browse files Browse the repository at this point in the history
* fix bug: streamx.console.workspace setting missing

* remove refilling setting process

* init streamx-flink-kubernetes module

* init streamx-flink-kubernetes module

* temporary removal of fat-jar build cache

* add necessary enum

* coverter between FlinkAppState and k8s.enums.FlinkJobState

* add tracking info cache

* update tracking cache

* add tracking monitor

* replace cache with cachePool

* update FlinkTRKMonitor

* add CachePool

* add k8s connection checking method

* update k8s events cache value model

* add comment

* update model

* add isInTracking method

* add Kubernetes Event Watcher

* add enhanced try-with-resource method

* update jobStatus cache value

* remove tracking of k8s service events

* add new FlinkClusterClient method

* flink watcher trait

* update flink watcher trait

* update flink watcher trait

* update to be safe for thread

* add watcher config object

* add flink job status watcher

* update flink watcher trait

* update flink watcher trait

* update flink watcher trait

* update flink watcher

* add method comments

* update conf class name

* add @nonnull sign for external api

* replace safeSet method

* add httpclient dependencies

* update FlinkJobStatusWatcher

* update FlinkJobStatusWatcher

* update FlinkJobStatusWatcher

* add collectDistinctTrkIds method

* support implicit call of json marshal/unmarshal

* move conf class to new package

* flink metrics bean

* update flink job status watcher

* support flink metrics tracking

* update copyright

* refactor tracking watcher conf

* support closeable

* integrate watcher control to monitor

* resolve code conflicts

* resolve code conflicts

* resolve code conflicts

* import scalatest dependency

* move model

* default namespace value

* replace with copy

* update method decalaration

* add exception throw declaration

* limit flink-client timeout

* add test for single tracking task

* fix bug

* add default conf for debug

* fix type error bug

* support get all trackingIds from cache

* update single trk test

* add trkCache watching method for debug

* add FlinkTrkMonitor test

* move TrkConf

* add k8s event watcher

* add guava dependency

* support for flink job status change event publish/subscribe

* test for eventbus

* delete unnecessary comments

* update tryWithResourceException

* method for determining the existence of flink job on remote cluster

* fix bug: retrieve flink-cluster-client error

* ChangeEventBus support sync/async post

* check the legitimacy of the TrkId before the monitor trk method is called

* keep the idempotent of start method

* add the necessary comments

* update lazy start aop

* add necessary comments

* test lazy start behavior of FlinkTrkMonitor

* add necessary comments

* add TrkMonitor Factory

* rename default conf method

* rename FlinkTrkMonitor to K8sFlinkTrkMonitor

* remove unnecessary comments

* add covert method

* update ExecutionMode

* update Application

* untracking flink job on k8s mode

* Integrated K8sFlinkTrkMonitor

* refactor event

* support for K8sFlinkTrkMonitor event post and build-in event listener

* let postEvent method trigger TrkMonitor lazy start behavior

* support more flink metrics tracking

* rename K8S_DEPLOYING state

* catch all flink-k8s-native event

* speed up watcher tracking frequency

* add totalJob complete method

* checkIsInRemoteCluster supplemental unsafe trkId check

* untracking k8s flink job

* update wrapper

* differentiate flink LOST state into SILENT and LOST

* add flink SILENT state

* remove delay start trigger of method getClusterMetrics

* fix flywaydb error

* format k8sNamespace field of Application

* kubernetes mode support

* adaptation request

* update behavior of recovery data

* fix stupid bug

* add FlinkTrkMonitor debug helper

* fix bug: k8sNamespace storage missing

* fix bug: k8s info missing

* update jobName when clusterId changed

* fix bug: flink job cancel fail

* add log for key steps of flink submit

* resolve the conflicts of guava

* update

* resolve conflicts of guave

* refactor streamx-packer module

* fix bug: tag format error during push image

* fix bug: flink.pipeline.jars missing when submit job on application mode

* add DOCKER_IMAGE_NAMESPACE config

* add DOCKER_IMAGE_NAMESPACE config

* support check kubernetes deployment resource

* add new flink state

* optimization flink k8s job state inference algorithm

* add new flink job state

* optimize silent state inference algorithm

* update

* update

* update timeout logger

* update status persistence

* enhance JobStatusCV

* optimize JobStatusCV persistence

* support record flink cluster metrics for each flink job

* support record flink cluster metrics for each flink job

* add new cache debugging method

* set application name to flink job

* enhance flink-k8s session job status catching processing

* abandon the maintenance of separate flink cluster metrics aggregation cache

* fixbug: missing application metrics in view list

* further inference of TERMINAL/POS_TERMINAL status

* cache flink rest api on kubernetes cluster

* error single task timeout limitation

* fixbug: missing upload jar parameter

* support third party dependency for flink-k8s job

* integrate flink-k8s support to streamx-console-server

* explicitly throwing exceptions

* fixbug: resolved maven dependencies are incomplete

* support custom pod-template for flink-k8s mode

* temporarily block flame graph for flink-k8s job

* don't request the yarn interface when for the flink-k8s job

* temporarily remove support for remapping flink-k8s job

* remove some support of K8sTrkMonitor interface

* add retracking interface support

* remove retracking interface support

* custom savepoint support for flink-k8s mode

* [fix bug] OutOffBound Exception while activeProfiles is empty

* Support for customizing flink-k8s configuration from SpringBoot application configuration.

* temporarily block checkpoint failover setting for flink-k8s mode.

* fix wrong flink state inference

* support for email alert on flink-k8s mode

* temporarily block flink-k8s support in custom-code mode

* resolve merge conflicts

* typo

* cancel shade shims

* update default shims to flink-1.13

* fixbug: flinkUserJar path error

* update dockerfile template

* import flink-shims dependency explicitly for package programming on flink session mode

* add flink CLASSLOADER_RESOLVE_ORDER configuration

* rename sub building workspace to be more friendly for debugging

* rename sub workspace path, and add support for log4j configuration

* fix bug: flink job state should be shown in initialing when the k8s pod is in initialization phase for a long time

* support custom flink rest service exposed type on flink-native-k8s mode

* modify the execution mode text

* remove multiple space

* add log for start building docker image

* invalidate rest url cache when untracking flink job

* support get flink remote rest url from k8s

* support jump to flink web ui

* remove direct import from flink lib

* update

* replace with scala implicit converter

* resolve conflict

Co-authored-by: benjobs <[email protected]>
  • Loading branch information
Al-assad and wolfboys authored Sep 27, 2021
1 parent a4fac3b commit c22c1c3
Show file tree
Hide file tree
Showing 96 changed files with 3,884 additions and 1,270 deletions.
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<scalatest.version>3.2.9</scalatest.version>
<junit.version>5.6.3</junit.version>
<zkclient.version>0.11</zkclient.version>
<curator.version>4.2.0</curator.version>
<redis.version>3.3.0</redis.version>
Expand All @@ -88,6 +89,7 @@
<grpc.version>1.15.0</grpc.version>
<jackson.version>2.12.1</jackson.version>
<gson.version>2.8.5</gson.version>
<guava.version>30.0-jre</guava.version>

<checkstyle.version>2.17</checkstyle.version>
<puppycrawl.version>8.29</puppycrawl.version>
Expand Down
36 changes: 36 additions & 0 deletions streamx-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -153,24 +153,48 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
<optional>true</optional>
</dependency>

Expand All @@ -190,6 +214,12 @@
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.6.0</version>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
<optional>true</optional>
</dependency>

Expand All @@ -200,6 +230,12 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ object ConfigConst {

val KEY_FLINK_APP_NAME = "yarn.application.name"

val KEY_FLINK_SAVEPOINT_PATH = "execution.savepoint.path"

// --checkpoints--
val KEY_FLINK_CHECKPOINTS_ENABLE = "flink.checkpoints.enable"

Expand Down Expand Up @@ -323,6 +325,12 @@ object ConfigConst {
*/
val DEFAULT_MAVEN_REMOTE_URL = "https://repo1.maven.org/maven2/"

/**
* namespace for docker image used in docker build env and image register
*/
val KEY_DOCKER_IMAGE_NAMESPACE = "streamx.docker.register.image-namespace"
val DOCKER_IMAGE_NAMESPACE_DEFAULT = "streamx"
lazy val DOCKER_IMAGE_NAMESPACE: String = System.getProperties.getProperty(KEY_DOCKER_IMAGE_NAMESPACE, DOCKER_IMAGE_NAMESPACE_DEFAULT)

val LOGO =
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
package com.streamxhub.streamx.common.enums;


import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.List;

/**
* @author benjobs
Expand Down Expand Up @@ -108,4 +110,8 @@ public static boolean isKubernetesMode(Integer value) {
return isKubernetesMode(of(value));
}

public static List<Integer> getKubernetesMode(){
return Lists.newArrayList(KUBERNETES_NATIVE_SESSION.getMode(), KUBERNETES_NATIVE_APPLICATION.getMode());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2021 The StreamX Project
* <p>
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.streamxhub.streamx.common.enums;

/**
* kubernetes.rest-service.exposed.type
*/
public enum FlinkK8sRestExposedType {

LoadBalancer("LoadBalancer", 0),
ClusterIP("ClusterIP", 1),
NodePort("NodePort", 2);

private final String name;
private final Integer value;

FlinkK8sRestExposedType(String name, Integer value) {
this.name = name;
this.value = value;
}

public static FlinkK8sRestExposedType of(Integer value) {
for (FlinkK8sRestExposedType order : values()) {
if (order.value.equals(value)) {
return order;
}
}
return null;
}

public String getName() {
return name;
}

public Integer getValue() {
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ object Utils {
/*
* Mimicking the try-with-resource syntax of Java-8+
*/
def tryWithResource[R,T <: AutoCloseable](handle: T)(func: T => R): R = {
def tryWithResource[R, T <: AutoCloseable](handle: T)(func: T => R): R = {
try {
func(handle)
} finally {
Expand All @@ -108,17 +108,16 @@ object Utils {
* and also provides callback function param for handing
* Exception.
*/
def tryWithResourceException[R,T <: AutoCloseable](handle: T)(func: T => R)(excFunc: Exception => R): R = {
def tryWithResourceException[R, T <: AutoCloseable](handle: T)(func: T => R)(excFunc: Throwable => R): R = {
try {
func(handle)
} catch {
case e: Exception => excFunc(e)
case e: Throwable => excFunc(e)
} finally {
if (handle != null) {
handle.close()
}
}
}


}
27 changes: 27 additions & 0 deletions streamx-console/streamx-console-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
<PermGen>64m</PermGen>
<MaxPermGen>512m</MaxPermGen>
<CodeCacheSize>512m</CodeCacheSize>
<guava.version>30.0-jre</guava.version>
</properties>

<dependencies>
Expand All @@ -52,6 +53,12 @@
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down Expand Up @@ -254,6 +261,10 @@
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>

Expand All @@ -266,6 +277,10 @@
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>

Expand All @@ -282,6 +297,10 @@
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>

Expand All @@ -298,6 +317,10 @@
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>

Expand All @@ -310,6 +333,10 @@
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2021 The StreamX Project
* <p>
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.streamxhub.streamx.console.core.conf;

import com.streamxhub.streamx.flink.kubernetes.FlinkTrkConf;
import com.streamxhub.streamx.flink.kubernetes.JobStatusWatcherConf;
import com.streamxhub.streamx.flink.kubernetes.MetricWatcherConf;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

/**
* flink-k8s configuration from springboot properties, prefix = flink-k8s
* author: Al-assad
*/
@Configuration
@Data
public class K8sFlinkConfig {

final private FlinkTrkConf defaultTrkConf = FlinkTrkConf.defaultConf();

@Value("${streamx.flink-k8s.tracking.polling-task-timeout-sec.job-status:}")
private Long sglJobStatusTrkTaskTimeoutSec = defaultTrkConf.jobStatusWatcherConf().sglTrkTaskTimeoutSec();


@Value("${streamx.flink-k8s.tracking.polling-task-timeout-sec.cluster-metric:}")
private Long sglMetricTrkTaskTimeoutSec = defaultTrkConf.metricWatcherConf().sglTrkTaskTimeoutSec();

@Value("${streamx.flink-k8s.tracking.polling-interval-sec.job-status:}")
private Long sglJobStatueTrkTaskIntervalSec = defaultTrkConf.jobStatusWatcherConf().sglTrkTaskIntervalSec();

@Value("${streamx.flink-k8s.tracking.polling-interval-sec.cluster-metric:}")
private Long sglMetricTrkTaskIntervalSec = defaultTrkConf.metricWatcherConf().sglTrkTaskIntervalSec();

@Value("${streamx.flink-k8s.tracking.silent-state-keep-sec:}")
private Integer silentStateJobKeepTrackingSec = defaultTrkConf.jobStatusWatcherConf().silentStateJobKeepTrackingSec();

/**
* covert to com.streamxhub.streamx.flink.kubernetes.FlinkTrkConf
*/
public FlinkTrkConf toFlinkTrkConf() {
return new FlinkTrkConf(
new JobStatusWatcherConf(
sglJobStatusTrkTaskTimeoutSec,
sglJobStatueTrkTaskIntervalSec,
silentStateJobKeepTrackingSec),
new MetricWatcherConf(
sglMetricTrkTaskTimeoutSec,
sglMetricTrkTaskIntervalSec)
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public RestResponse cancel(Application app) {
return RestResponse.create();
}

// fixme to adapte for kubernetes-only scenarios
@PostMapping("yarn")
public RestResponse yarn() {
return RestResponse.create().data(HadoopUtils.getRMWebAppURL(false));
Expand Down
Loading

0 comments on commit c22c1c3

Please sign in to comment.