Skip to content

Commit

Permalink
[AMORO-2008]: Refactor the optimizer module to separate distributing …
Browse files Browse the repository at this point in the history
…pacakges for different engines. (#2051)

* refactor modules

* module name refactor

* adjust pom.xml

* add checkstyle configs

* Revert "add checkstyle configs"

This reverts commit bbaa7f9.

* checkstyle problems

* fix package

* local -> standalone optimizer

* local-optimizer-container name

* refactor code struct as of review comment.

* fix unit tests

* fix package script

* fix review comments

* fix ut error

* fix ut

* fix rename

* remove useless import

* fix review comments

* fix review comments
  • Loading branch information
baiyangtx authored Oct 9, 2023
1 parent b349310 commit 835ff61
Show file tree
Hide file tree
Showing 38 changed files with 315 additions and 163 deletions.
51 changes: 51 additions & 0 deletions ams/optimizer-container/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>amoro-ams</artifactId>
<groupId>com.netease.amoro</groupId>
<version>0.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>amoro-optimizer-container</artifactId>
<name>Amoro Project Optimizer Container</name>
<url>https://amoro.netease.com</url>

<dependencies>
<dependency>
<groupId>com.netease.amoro</groupId>
<artifactId>optimizer-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package com.netease.arctic.optimizer;
package com.netease.arctic.optimizer.container;

import com.netease.arctic.ams.api.PropertyNames;
import com.netease.arctic.ams.api.resource.Resource;
import com.netease.arctic.ams.api.resource.ResourceContainer;
import com.netease.arctic.ams.api.resource.ResourceStatus;
import com.netease.arctic.optimizer.util.PropertyUtil;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package com.netease.arctic.optimizer.util;
package com.netease.arctic.optimizer.container;

import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.netease.arctic.optimizer.util;
package com.netease.arctic.optimizer.container;

import org.apache.iceberg.relocated.com.google.common.collect.Maps;

Expand Down Expand Up @@ -53,7 +53,7 @@ public String toCliOptions() {
.collect(Collectors.joining(" "));
}

public static FlinkConf.Builder buildFor(Map<String, String> flinkConf, Map<String, String> containerProperties) {
public static Builder buildFor(Map<String, String> flinkConf, Map<String, String> containerProperties) {
return new Builder(flinkConf, containerProperties);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@
* limitations under the License.
*/

package com.netease.arctic.optimizer;
package com.netease.arctic.optimizer.container;

import com.netease.arctic.ams.api.PropertyNames;
import com.netease.arctic.ams.api.resource.Resource;
import com.netease.arctic.optimizer.flink.FlinkOptimizer;
import com.netease.arctic.optimizer.util.FlinkConf;
import com.netease.arctic.optimizer.util.PropertyUtil;
import com.netease.arctic.optimizer.common.Optimizer;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Function;
Expand Down Expand Up @@ -54,7 +52,8 @@ public class FlinkOptimizerContainer extends AbstractResourceContainer {
public static final String FLINK_CONFIG_YAML = "/flink-conf.yaml";
public static final String ENV_FLINK_CONF_DIR = "FLINK_CONF_DIR";

private static final String DEFAULT_JOB_URI = "/plugin/optimize/OptimizeJob.jar";
private static final String DEFAULT_JOB_URI = "/plugin/optimizer/flink/optimizer-job.jar";
private static final String FLINK_JOB_MAIN_CLASS = "com.netease.arctic.optimizer.flink.FlinkOptimizer";

/**
* This will be removed in 0.7.0, using flink properties `flink-conf.taskmanager.memory.process.size`.
Expand Down Expand Up @@ -187,7 +186,7 @@ protected String buildOptimizerStartupArgsString(Resource resource) {
// ./bin/flink ACTION --target=TARGET OPTIONS -c <main-class> <job-file> <arguments>
// options: -D<property=value>
return String.format("%s/bin/flink %s --target=%s %s -c %s %s %s",
flinkHome, flinkAction, target.getValue(), flinkOptions, FlinkOptimizer.class.getName(), jobUri, jobArgs);
flinkHome, flinkAction, target.getValue(), flinkOptions, FLINK_JOB_MAIN_CLASS, jobUri, jobArgs);
}

private Map<String, String> loadFlinkConfig() {
Expand Down Expand Up @@ -321,9 +320,9 @@ private String buildReleaseYarnCommand(Resource resource) {
String applicationId = resource.getProperties().get(YARN_APPLICATION_ID_PROPERTY);
String options = "-Dyarn.application.id=" + applicationId;

Preconditions.checkArgument(resource.getProperties().containsKey(FlinkOptimizer.JOB_ID_PROPERTY),
"Cannot find {} from optimizer properties", FlinkOptimizer.JOB_ID_PROPERTY);
String jobId = resource.getProperties().get(FlinkOptimizer.JOB_ID_PROPERTY);
Preconditions.checkArgument(resource.getProperties().containsKey(Optimizer.PROPERTY_JOB_ID),
"Cannot find {} from optimizer properties", Optimizer.PROPERTY_JOB_ID);
String jobId = resource.getProperties().get(Optimizer.PROPERTY_JOB_ID);
return String.format("%s/bin/flink cancel -t %s %s %s",
flinkHome, target.getValue(), options, jobId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
* limitations under the License.
*/

package com.netease.arctic.optimizer;
package com.netease.arctic.optimizer.container;

import com.netease.arctic.ams.api.resource.Resource;
import com.netease.arctic.optimizer.util.ExecUtil;
import com.netease.arctic.optimizer.util.PropertyUtil;
import com.netease.arctic.optimizer.common.Optimizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,7 +31,6 @@ public class LocalOptimizerContainer extends AbstractResourceContainer {

private static final Logger LOG = LoggerFactory.getLogger(LocalOptimizerContainer.class);

public static final String JOB_ID_PROPERTY = "job_id";
public static final String JOB_MEMORY_PROPERTY = "memory";


Expand Down Expand Up @@ -64,7 +62,7 @@ protected String buildOptimizerStartupArgsString(Resource resource) {
@Override
public void releaseOptimizer(Resource resource) {
long jobId = Long.parseLong(PropertyUtil.checkAndGetProperty(resource.getProperties(),
JOB_ID_PROPERTY));
Optimizer.PROPERTY_JOB_ID));

String os = System.getProperty("os.name").toLowerCase();
String cmd;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.netease.arctic.optimizer.util;
package com.netease.arctic.optimizer.container;

import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
# limitations under the License.
#

com.netease.arctic.optimizer.FlinkOptimizerContainer
com.netease.arctic.optimizer.LocalOptimizerContainer
com.netease.arctic.optimizer.container.FlinkOptimizerContainer
com.netease.arctic.optimizer.container.LocalOptimizerContainer
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
* limitations under the License.
*/

package com.netease.arctic.optimizer;
package com.netease.arctic.optimizer.container;

import com.netease.arctic.ams.api.PropertyNames;
import com.netease.arctic.optimizer.util.FlinkConf;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Test;
Expand Down
72 changes: 72 additions & 0 deletions ams/optimizer/common/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>amoro-ams</artifactId>
<groupId>com.netease.amoro</groupId>
<version>0.6.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>optimizer-common</artifactId>
<name>Amoro Project Optimizer Common</name>
<url>https://amoro.netease.com</url>

<dependencies>
<dependency>
<groupId>com.netease.amoro</groupId>
<artifactId>amoro-core</artifactId>
</dependency>

<dependency>
<groupId>com.netease.amoro</groupId>
<artifactId>amoro-hive</artifactId>
</dependency>

<dependency>
<groupId>args4j</groupId>
<artifactId>args4j</artifactId>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>

<dependency>
<groupId>com.netease.amoro</groupId>
<artifactId>amoro-core</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.netease.amoro</groupId>
<artifactId>amoro-ams-api</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.netease.arctic.optimizer;
package com.netease.arctic.optimizer.common;

import com.netease.arctic.ams.api.ArcticException;
import com.netease.arctic.ams.api.ErrorCodes;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.netease.arctic.optimizer;
package com.netease.arctic.optimizer.common;

import com.netease.arctic.ams.api.PropertyNames;
import org.slf4j.Logger;
Expand All @@ -8,6 +8,11 @@
import java.util.stream.IntStream;

public class Optimizer {

/**
* Job-Id, This property must be included when registering the optimizer.
*/
public static final String PROPERTY_JOB_ID = "job-id";
private static final Logger LOG = LoggerFactory.getLogger(Optimizer.class);

private final OptimizerConfig config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package com.netease.arctic.optimizer;
package com.netease.arctic.optimizer.common;

import com.netease.arctic.ams.api.PropertyNames;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.netease.arctic.optimizer;
package com.netease.arctic.optimizer.common;

import com.netease.arctic.ams.api.OptimizingTask;
import com.netease.arctic.ams.api.OptimizingTaskResult;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.netease.arctic.optimizer;
package com.netease.arctic.optimizer.common;

import com.netease.arctic.ams.api.ArcticException;
import com.netease.arctic.ams.api.ErrorCodes;
Expand All @@ -14,7 +14,7 @@
public class OptimizerToucher extends AbstractOptimizerOperator {
private static final Logger LOG = LoggerFactory.getLogger(OptimizerToucher.class);

private OptimizerToucher.TokenChangeListener tokenChangeListener;
private TokenChangeListener tokenChangeListener;
private final Map<String, String> registerProperties = Maps.newHashMap();
private long startTime;

Expand All @@ -23,7 +23,7 @@ public OptimizerToucher(OptimizerConfig config) {
this.startTime = System.currentTimeMillis();
}

public OptimizerToucher withTokenChangeListener(OptimizerToucher.TokenChangeListener tokenChangeListener) {
public OptimizerToucher withTokenChangeListener(TokenChangeListener tokenChangeListener) {
this.tokenChangeListener = tokenChangeListener;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package com.netease.arctic.optimizer;
package com.netease.arctic.optimizer.common;

import com.netease.arctic.TestAms;
import org.junit.Before;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
* limitations under the License.
*/

package com.netease.arctic.optimizer;
package com.netease.arctic.optimizer.common;

import org.apache.iceberg.common.DynFields;
import org.kohsuke.args4j.CmdLineException;

public class OptimizerTestHelpers {
public static final int CALL_AMS_INTERVAL = 500;
public static final long CALL_AMS_INTERVAL = 500;

public static OptimizerConfig buildOptimizerConfig(String amsUrl) {
String[] optimizerArgs = new String[]{"-a", amsUrl, "-p", "2", "-g", "g1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package com.netease.arctic.optimizer;
package com.netease.arctic.optimizer.common;

import com.netease.arctic.ams.api.OptimizingTaskResult;
import org.junit.Assert;
Expand All @@ -35,7 +35,7 @@ public static void reduceCallAmsInterval() {

@Test
public void testStartOptimizer() throws InterruptedException {
OptimizerConfig optimizerConfig = OptimizerTestHelpers.buildOptimizerConfig(TEST_AMS.getServerUrl());
OptimizerConfig optimizerConfig = OptimizerTestHelpers.buildOptimizerConfig(TEST_AMS.getServerUrl());
Optimizer optimizer = new Optimizer(optimizerConfig);
new Thread(optimizer::startOptimizing).start();
TimeUnit.SECONDS.sleep(1);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.netease.arctic.optimizer;
package com.netease.arctic.optimizer.common;

import org.junit.Assert;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package com.netease.arctic.optimizer;
package com.netease.arctic.optimizer.common;

import com.google.common.collect.Maps;
import com.netease.arctic.ams.api.OptimizerRegisterInfo;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.netease.arctic.optimizer;
package com.netease.arctic.optimizer.common;

import com.netease.arctic.ams.api.OptimizerRegisterInfo;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down
Loading

0 comments on commit 835ff61

Please sign in to comment.