Skip to content

Commit

Permalink
[Improve] Related app release: LanuchState change to ReleaseState (ap…
Browse files Browse the repository at this point in the history
…ache#2367)

Related app release: LanuchState change to ReleaseState apache#2366

* update pgsql ddl

* maven setting bug fixed
  • Loading branch information
wolfboys authored Feb 26, 2023
1 parent 4a65a61 commit 2499471
Show file tree
Hide file tree
Showing 35 changed files with 190 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ insert into `t_menu` values (100021, 100015, 'edit streampark', '/flink/app/edit
insert into `t_menu` values (100022, 100014, 'build', null, null, 'project:build', null, '1', 1, null, now(), now());
insert into `t_menu` values (100023, 100014, 'delete', null, null, 'project:delete', null, '1', 1, null, now(), now());
insert into `t_menu` values (100024, 100015, 'mapping', null, null, 'app:mapping', null, '1', 1, null, now(), now());
insert into `t_menu` values (100025, 100015, 'launch', null, null, 'app:launch', null, '1', 1, null, now(), now());
insert into `t_menu` values (100025, 100015, 'release', null, null, 'app:release', null, '1', 1, null, now(), now());
insert into `t_menu` values (100026, 100015, 'start', null, null, 'app:start', null, '1', 1, null, now(), now());
insert into `t_menu` values (100027, 100015, 'clean', null, null, 'app:clean', null, '1', 1, null, now(), now());
insert into `t_menu` values (100028, 100015, 'cancel', null, null, 'app:cancel', null, '1', 1, null, now(), now());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ insert into "public"."t_menu" values (100021, 100015, 'edit streampark', '/flink
insert into "public"."t_menu" values (100022, 100014, 'build', null, null, 'project:build', null, '1', '1', null, now(), now());
insert into "public"."t_menu" values (100023, 100014, 'delete', null, null, 'project:delete', null, '1', '1', null, now(), now());
insert into "public"."t_menu" values (100024, 100015, 'mapping', null, null, 'app:mapping', null, '1', '1', null, now(), now());
insert into "public"."t_menu" values (100025, 100015, 'launch', null, null, 'app:launch', null, '1', '1', null, now(), now());
insert into "public"."t_menu" values (100025, 100015, 'release', null, null, 'app:release', null, '1', '1', null, now(), now());
insert into "public"."t_menu" values (100026, 100015, 'start', null, null, 'app:start', null, '1', '1', null, now(), now());
insert into "public"."t_menu" values (100027, 100015, 'clean', null, null, 'app:clean', null, '1', '1', null, now(), now());
insert into "public"."t_menu" values (100028, 100015, 'cancel', null, null, 'app:cancel', null, '1', '1', null, now(), now());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ create table `t_flink_app` (
`create_time` datetime not null default current_timestamp comment 'create time',
`modify_time` datetime not null default current_timestamp on update current_timestamp comment 'modify time',
`option_time` datetime default null,
`launch` tinyint default 1,
`release` tinyint default 1,
`build` tinyint default 1,
`start_time` datetime default null,
`end_time` datetime default null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ create table "public"."t_flink_app" (
"create_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone),
"modify_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone),
"option_time" timestamp(6),
"launch" int2 default 1,
"release" int2 default 1,
"build" boolean default true,
"start_time" timestamp(6),
"end_time" timestamp(6),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,9 @@ alter table `t_flink_savepoint` modify column `path` varchar(1024) collate utf8
insert into `t_menu` values (100070, 100015, 'savepoint trigger', null, null, 'savepoint:trigger', null, '1', 1, null, now(), now());

-- ISSUE-2192 DDL & DML End


-- ISSUE-2366 DDL & DML Start
alter table `t_flink_app` change column `launch` `release` tinyint default 1;
update `t_menu` set `menu_name`='release',`perms` = 'app:release' where `menu_id` = 100025;
-- ISSUE-2366 DDL & DML End
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,10 @@ alter table "public"."t_flink_savepoint" alter column "path" type varchar(1024)
insert into "public"."t_menu" values (100070, 100015, 'savepoint trigger', null, null, 'savepoint:trigger', null, '1', '1', null, now(), now());

-- ISSUE-2192 DDL & DML End


-- ISSUE-2366 DDL & DML Start
alter table "public"."t_flink_app" rename "launch" to "release";
update "public"."t_menu" set "menu_name"='release',"perms" = 'app:release' where "menu_id" = 100025;
-- ISSUE-2366 DDL & DML End

Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,16 @@ public class ApplicationBuildPipelineController {
@Autowired private FlinkSqlService flinkSqlService;

/**
* Launch application building pipeline.
* Release application building pipeline.
*
* @param appId application id
* @param forceBuild forced start pipeline or not
* @return Whether the pipeline was successfully started
*/
@ApiAccess
@ApiOperation(
value = "Launch application",
notes = "Launch application",
value = "Release application",
notes = "Release application",
tags = ApiDocConstant.FLINK_APP_OP_TAG,
consumes = "application/x-www-form-urlencoded")
@ApiImplicitParams({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public RestResponse mapping(Application app) {

@AppUpdated
@PostMapping("revoke")
@RequiresPermissions("app:launch")
@RequiresPermissions("app:release")
public RestResponse revoke(Application app) {
applicationService.revoke(app);
return RestResponse.success();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.bean.AppControl;
import org.apache.streampark.console.core.enums.FlinkAppState;
import org.apache.streampark.console.core.enums.LaunchState;
import org.apache.streampark.console.core.enums.ReleaseState;
import org.apache.streampark.console.core.enums.ResourceFrom;
import org.apache.streampark.console.core.metrics.flink.JobsOverview;
import org.apache.streampark.console.core.utils.YarnQueueLabelExpression;
Expand Down Expand Up @@ -111,8 +111,9 @@ public class Application implements Serializable {
private String k8sNamespace = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE();

private Integer state;
/** task launch status */
private Integer launch;
/** task release status */
@TableField("`release`")
private Integer release;

/** determine if a task needs to be built */
private Boolean build;
Expand Down Expand Up @@ -313,13 +314,8 @@ public boolean shouldBeTrack() {
}

@JsonIgnore
public LaunchState getLaunchState() {
return LaunchState.of(state);
}

@JsonIgnore
public void setLaunchState(LaunchState launchState) {
this.launch = launchState.get();
public ReleaseState getReleaseState() {
return ReleaseState.of(release);
}

@JsonIgnore
Expand Down Expand Up @@ -468,7 +464,7 @@ public boolean isRunning() {

@JsonIgnore
public boolean isNeedRollback() {
return LaunchState.NEED_ROLLBACK.get() == this.getLaunch();
return ReleaseState.NEED_ROLLBACK.get() == this.getRelease();
}

@JsonIgnore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.streampark.console.core.entity;

import org.apache.streampark.common.conf.CommonConfig;
import org.apache.streampark.common.conf.InternalConfigHolder;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.util.CommandUtils;
import org.apache.streampark.console.base.exception.ApiDetailException;
import org.apache.streampark.console.base.util.CommonUtils;
import org.apache.streampark.console.base.util.GitUtils;
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.enums.GitAuthorizedError;
import org.apache.streampark.console.core.service.SettingService;

import com.baomidou.mybatisplus.annotation.FieldStrategy;
import com.baomidou.mybatisplus.annotation.IdType;
Expand Down Expand Up @@ -197,15 +198,15 @@ public String getMavenArgs() {
}
}

StringBuffer cmdBuffer = new StringBuffer(mvn).append(" clean package -DskipTests ");
StringBuilder cmdBuffer = new StringBuilder(mvn).append(" clean package -DskipTests ");

if (StringUtils.isNotEmpty(this.buildArgs)) {
cmdBuffer.append(this.buildArgs.trim());
}

Setting setting = SettingService.SETTINGS.get(SettingService.KEY_MAVEN_SETTINGS);
if (setting != null) {
cmdBuffer.append(" --settings ").append(setting.getSettingValue());
String setting = InternalConfigHolder.get(CommonConfig.MAVEN_SETTINGS_PATH());
if (StringUtils.isNotEmpty(setting)) {
cmdBuffer.append(" --settings ").append(setting);
}

return cmdBuffer.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public enum OptionState implements Serializable {

/** Application which is currently action: none. */
NONE(0),
/** Application which is currently action: deploying. */
LAUNCHING(1),
/** Application which is currently action: releasing. */
RELEASING(1),
/** Application which is currently action: cancelling. */
CANCELLING(2),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@
import java.io.Serializable;
import java.util.Arrays;

public enum LaunchState implements Serializable {
public enum ReleaseState implements Serializable {

/** launch failed */
/** release failed */
FAILED(-1),
/** launch done */
/** release done */
DONE(0),

/** need relaunch after modify task */
NEED_LAUNCH(1),
/** need release after modify task */
NEED_RELEASE(1),

/** launching */
LAUNCHING(2),
/** releasing */
RELEASING(2),

/** launch complete, need restart */
/** release complete, need restart */
NEED_RESTART(3),

/** need rollback */
Expand All @@ -47,15 +47,15 @@ public enum LaunchState implements Serializable {

private final int value;

LaunchState(int value) {
ReleaseState(int value) {
this.value = value;
}

public int get() {
return this.value;
}

public static LaunchState of(Integer state) {
public static ReleaseState of(Integer state) {
return Arrays.stream(values()).filter((x) -> x.value == state).findFirst().orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public interface ApplicationService extends IService<Application> {

boolean checkAlter(Application application);

void updateLaunch(Application application);
void updateRelease(Application application);

List<Application> getByProjectId(Long id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
import org.apache.streampark.console.core.entity.FlinkSql;
import org.apache.streampark.console.core.entity.Message;
import org.apache.streampark.console.core.enums.CandidateType;
import org.apache.streampark.console.core.enums.LaunchState;
import org.apache.streampark.console.core.enums.NoticeType;
import org.apache.streampark.console.core.enums.OptionState;
import org.apache.streampark.console.core.enums.ReleaseState;
import org.apache.streampark.console.core.mapper.ApplicationBuildPipelineMapper;
import org.apache.streampark.console.core.service.AppBuildPipeService;
import org.apache.streampark.console.core.service.ApplicationBackUpService;
Expand Down Expand Up @@ -164,8 +164,8 @@ public void onStart(PipeSnapshot snapshot) {
AppBuildPipeline.fromPipeSnapshot(snapshot).setAppId(app.getId());
saveEntity(buildPipeline);

app.setLaunch(LaunchState.LAUNCHING.get());
applicationService.updateLaunch(app);
app.setRelease(ReleaseState.RELEASING.get());
applicationService.updateRelease(app);

if (flinkRESTAPIWatcher.isWatchingApp(app.getId())) {
flinkRESTAPIWatcher.init();
Expand Down Expand Up @@ -235,10 +235,10 @@ public void onFinish(PipeSnapshot snapshot, BuildResult result) {
if (result.pass()) {
// running job ...
if (app.isRunning()) {
app.setLaunch(LaunchState.NEED_RESTART.get());
app.setRelease(ReleaseState.NEED_RESTART.get());
} else {
app.setOptionState(OptionState.NONE.getValue());
app.setLaunch(LaunchState.DONE.get());
app.setRelease(ReleaseState.DONE.get());
// If the current task is not running, or the task has just been added, directly set
// the candidate version to the official version
if (app.isFlinkSqlJob()) {
Expand Down Expand Up @@ -269,15 +269,15 @@ public void onFinish(PipeSnapshot snapshot, BuildResult result) {
new Message(
commonService.getUserId(),
app.getId(),
app.getJobName().concat(" launch failed"),
app.getJobName().concat(" release failed"),
Utils.stringifyException(snapshot.error().exception()),
NoticeType.EXCEPTION);
messageService.push(message);
app.setLaunch(LaunchState.FAILED.get());
app.setRelease(ReleaseState.FAILED.get());
app.setOptionState(OptionState.NONE.getValue());
app.setBuild(true);
}
applicationService.updateLaunch(app);
applicationService.updateRelease(app);
if (flinkRESTAPIWatcher.isWatchingApp(app.getId())) {
flinkRESTAPIWatcher.init();
}
Expand Down Expand Up @@ -305,14 +305,14 @@ public void onDockerPushProgressChange(DockerPushSnapshot snapshot) {
}
});
}
// save pipeline instance snapshot to db before launch it.
// save pipeline instance snapshot to db before release it.
AppBuildPipeline buildPipeline =
AppBuildPipeline.initFromPipeline(pipeline).setAppId(app.getId());
boolean saved = saveEntity(buildPipeline);
DOCKER_PULL_PG_SNAPSHOTS.invalidate(app.getId());
DOCKER_BUILD_PG_SNAPSHOTS.invalidate(app.getId());
DOCKER_PUSH_PG_SNAPSHOTS.invalidate(app.getId());
// async launch pipeline
// async release pipeline
executorService.submit((Runnable) pipeline::launch);
return saved;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.streampark.console.core.entity.ApplicationConfig;
import org.apache.streampark.console.core.entity.FlinkSql;
import org.apache.streampark.console.core.enums.EffectiveType;
import org.apache.streampark.console.core.enums.LaunchState;
import org.apache.streampark.console.core.enums.ReleaseState;
import org.apache.streampark.console.core.mapper.ApplicationBackUpMapper;
import org.apache.streampark.console.core.service.ApplicationBackUpService;
import org.apache.streampark.console.core.service.ApplicationConfigService;
Expand Down Expand Up @@ -144,7 +144,7 @@ public void rollback(ApplicationBackUp backParam) {
new UpdateWrapper<Application>()
.lambda()
.eq(Application::getId, application.getId())
.set(Application::getLaunch, LaunchState.NEED_RESTART.get()));
.set(Application::getRelease, ReleaseState.NEED_RESTART.get()));
} catch (Exception e) {
throw e;
}
Expand Down
Loading

0 comments on commit 2499471

Please sign in to comment.