diff --git a/ams/optimizer-container/pom.xml b/ams/optimizer-container/pom.xml new file mode 100644 index 0000000000..e3540e976a --- /dev/null +++ b/ams/optimizer-container/pom.xml @@ -0,0 +1,51 @@ + + + + + + amoro-ams + com.netease.amoro + 0.6.0-SNAPSHOT + + 4.0.0 + + amoro-optimizer-container + Amoro Project Optimizer Container + https://amoro.netease.com + + + + com.netease.amoro + optimizer-common + ${project.version} + + + + org.yaml + snakeyaml + + + + junit + junit + + + diff --git a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/AbstractResourceContainer.java b/ams/optimizer-container/src/main/java/com/netease/arctic/optimizer/container/AbstractResourceContainer.java similarity index 97% rename from ams/optimizer/src/main/java/com/netease/arctic/optimizer/AbstractResourceContainer.java rename to ams/optimizer-container/src/main/java/com/netease/arctic/optimizer/container/AbstractResourceContainer.java index c4286fb184..9ffa0b3100 100644 --- a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/AbstractResourceContainer.java +++ b/ams/optimizer-container/src/main/java/com/netease/arctic/optimizer/container/AbstractResourceContainer.java @@ -1,10 +1,9 @@ -package com.netease.arctic.optimizer; +package com.netease.arctic.optimizer.container; import com.netease.arctic.ams.api.PropertyNames; import com.netease.arctic.ams.api.resource.Resource; import com.netease.arctic.ams.api.resource.ResourceContainer; import com.netease.arctic.ams.api.resource.ResourceStatus; -import com.netease.arctic.optimizer.util.PropertyUtil; import org.apache.commons.lang3.StringUtils; import java.io.IOException; diff --git a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/util/ExecUtil.java b/ams/optimizer-container/src/main/java/com/netease/arctic/optimizer/container/ExecUtil.java similarity index 98% rename from ams/optimizer/src/main/java/com/netease/arctic/optimizer/util/ExecUtil.java rename to ams/optimizer-container/src/main/java/com/netease/arctic/optimizer/container/ExecUtil.java index d2a74a53e6..7fccef930c 100644 --- a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/util/ExecUtil.java +++ b/ams/optimizer-container/src/main/java/com/netease/arctic/optimizer/container/ExecUtil.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package com.netease.arctic.optimizer.util; +package com.netease.arctic.optimizer.container; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; diff --git a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/util/FlinkConf.java b/ams/optimizer-container/src/main/java/com/netease/arctic/optimizer/container/FlinkConf.java similarity index 95% rename from ams/optimizer/src/main/java/com/netease/arctic/optimizer/util/FlinkConf.java rename to ams/optimizer-container/src/main/java/com/netease/arctic/optimizer/container/FlinkConf.java index 16a3f1c98d..1e66dc033e 100644 --- a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/util/FlinkConf.java +++ b/ams/optimizer-container/src/main/java/com/netease/arctic/optimizer/container/FlinkConf.java @@ -1,4 +1,4 @@ -package com.netease.arctic.optimizer.util; +package com.netease.arctic.optimizer.container; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -53,7 +53,7 @@ public String toCliOptions() { .collect(Collectors.joining(" ")); } - public static FlinkConf.Builder buildFor(Map flinkConf, Map containerProperties) { + public static Builder buildFor(Map flinkConf, Map containerProperties) { return new Builder(flinkConf, containerProperties); } diff --git a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/FlinkOptimizerContainer.java b/ams/optimizer-container/src/main/java/com/netease/arctic/optimizer/container/FlinkOptimizerContainer.java similarity index 96% rename from ams/optimizer/src/main/java/com/netease/arctic/optimizer/FlinkOptimizerContainer.java rename to ams/optimizer-container/src/main/java/com/netease/arctic/optimizer/container/FlinkOptimizerContainer.java index 783a3b329d..e4eb90d254 100644 --- a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/FlinkOptimizerContainer.java +++ b/ams/optimizer-container/src/main/java/com/netease/arctic/optimizer/container/FlinkOptimizerContainer.java @@ -16,13 +16,11 @@ * limitations under the License. */ -package com.netease.arctic.optimizer; +package com.netease.arctic.optimizer.container; import com.netease.arctic.ams.api.PropertyNames; import com.netease.arctic.ams.api.resource.Resource; -import com.netease.arctic.optimizer.flink.FlinkOptimizer; -import com.netease.arctic.optimizer.util.FlinkConf; -import com.netease.arctic.optimizer.util.PropertyUtil; +import com.netease.arctic.optimizer.common.Optimizer; import org.apache.commons.lang3.StringUtils; import org.apache.curator.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Function; @@ -54,7 +52,8 @@ public class FlinkOptimizerContainer extends AbstractResourceContainer { public static final String FLINK_CONFIG_YAML = "/flink-conf.yaml"; public static final String ENV_FLINK_CONF_DIR = "FLINK_CONF_DIR"; - private static final String DEFAULT_JOB_URI = "/plugin/optimize/OptimizeJob.jar"; + private static final String DEFAULT_JOB_URI = "/plugin/optimizer/flink/optimizer-job.jar"; + private static final String FLINK_JOB_MAIN_CLASS = "com.netease.arctic.optimizer.flink.FlinkOptimizer"; /** * This will be removed in 0.7.0, using flink properties `flink-conf.taskmanager.memory.process.size`. @@ -187,7 +186,7 @@ protected String buildOptimizerStartupArgsString(Resource resource) { // ./bin/flink ACTION --target=TARGET OPTIONS -c // options: -D return String.format("%s/bin/flink %s --target=%s %s -c %s %s %s", - flinkHome, flinkAction, target.getValue(), flinkOptions, FlinkOptimizer.class.getName(), jobUri, jobArgs); + flinkHome, flinkAction, target.getValue(), flinkOptions, FLINK_JOB_MAIN_CLASS, jobUri, jobArgs); } private Map loadFlinkConfig() { @@ -321,9 +320,9 @@ private String buildReleaseYarnCommand(Resource resource) { String applicationId = resource.getProperties().get(YARN_APPLICATION_ID_PROPERTY); String options = "-Dyarn.application.id=" + applicationId; - Preconditions.checkArgument(resource.getProperties().containsKey(FlinkOptimizer.JOB_ID_PROPERTY), - "Cannot find {} from optimizer properties", FlinkOptimizer.JOB_ID_PROPERTY); - String jobId = resource.getProperties().get(FlinkOptimizer.JOB_ID_PROPERTY); + Preconditions.checkArgument(resource.getProperties().containsKey(Optimizer.PROPERTY_JOB_ID), + "Cannot find {} from optimizer properties", Optimizer.PROPERTY_JOB_ID); + String jobId = resource.getProperties().get(Optimizer.PROPERTY_JOB_ID); return String.format("%s/bin/flink cancel -t %s %s %s", flinkHome, target.getValue(), options, jobId); } diff --git a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/LocalOptimizerContainer.java b/ams/optimizer-container/src/main/java/com/netease/arctic/optimizer/container/LocalOptimizerContainer.java similarity index 93% rename from ams/optimizer/src/main/java/com/netease/arctic/optimizer/LocalOptimizerContainer.java rename to ams/optimizer-container/src/main/java/com/netease/arctic/optimizer/container/LocalOptimizerContainer.java index f5c07a3c47..b9e9185c9f 100644 --- a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/LocalOptimizerContainer.java +++ b/ams/optimizer-container/src/main/java/com/netease/arctic/optimizer/container/LocalOptimizerContainer.java @@ -16,11 +16,10 @@ * limitations under the License. */ -package com.netease.arctic.optimizer; +package com.netease.arctic.optimizer.container; import com.netease.arctic.ams.api.resource.Resource; -import com.netease.arctic.optimizer.util.ExecUtil; -import com.netease.arctic.optimizer.util.PropertyUtil; +import com.netease.arctic.optimizer.common.Optimizer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +31,6 @@ public class LocalOptimizerContainer extends AbstractResourceContainer { private static final Logger LOG = LoggerFactory.getLogger(LocalOptimizerContainer.class); - public static final String JOB_ID_PROPERTY = "job_id"; public static final String JOB_MEMORY_PROPERTY = "memory"; @@ -64,7 +62,7 @@ protected String buildOptimizerStartupArgsString(Resource resource) { @Override public void releaseOptimizer(Resource resource) { long jobId = Long.parseLong(PropertyUtil.checkAndGetProperty(resource.getProperties(), - JOB_ID_PROPERTY)); + Optimizer.PROPERTY_JOB_ID)); String os = System.getProperty("os.name").toLowerCase(); String cmd; diff --git a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/util/PropertyUtil.java b/ams/optimizer-container/src/main/java/com/netease/arctic/optimizer/container/PropertyUtil.java similarity index 93% rename from ams/optimizer/src/main/java/com/netease/arctic/optimizer/util/PropertyUtil.java rename to ams/optimizer-container/src/main/java/com/netease/arctic/optimizer/container/PropertyUtil.java index 18a99548a1..e4a9460fab 100644 --- a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/util/PropertyUtil.java +++ b/ams/optimizer-container/src/main/java/com/netease/arctic/optimizer/container/PropertyUtil.java @@ -1,4 +1,4 @@ -package com.netease.arctic.optimizer.util; +package com.netease.arctic.optimizer.container; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; diff --git a/ams/optimizer/src/main/resources/META-INF/services/com.netease.arctic.ams.api.resource.ResourceContainer b/ams/optimizer-container/src/main/resources/META-INF/services/com.netease.arctic.ams.api.resource.ResourceContainer similarity index 86% rename from ams/optimizer/src/main/resources/META-INF/services/com.netease.arctic.ams.api.resource.ResourceContainer rename to ams/optimizer-container/src/main/resources/META-INF/services/com.netease.arctic.ams.api.resource.ResourceContainer index 16090a8631..528c20101b 100644 --- a/ams/optimizer/src/main/resources/META-INF/services/com.netease.arctic.ams.api.resource.ResourceContainer +++ b/ams/optimizer-container/src/main/resources/META-INF/services/com.netease.arctic.ams.api.resource.ResourceContainer @@ -16,5 +16,5 @@ # limitations under the License. # -com.netease.arctic.optimizer.FlinkOptimizerContainer -com.netease.arctic.optimizer.LocalOptimizerContainer \ No newline at end of file +com.netease.arctic.optimizer.container.FlinkOptimizerContainer +com.netease.arctic.optimizer.container.LocalOptimizerContainer \ No newline at end of file diff --git a/ams/optimizer/src/test/java/com/netease/arctic/optimizer/TestFlinkOptimizerContainer.java b/ams/optimizer-container/src/test/java/com/netease/arctic/optimizer/container/TestFlinkOptimizerContainer.java similarity index 98% rename from ams/optimizer/src/test/java/com/netease/arctic/optimizer/TestFlinkOptimizerContainer.java rename to ams/optimizer-container/src/test/java/com/netease/arctic/optimizer/container/TestFlinkOptimizerContainer.java index 5304b26813..61c7fdf240 100644 --- a/ams/optimizer/src/test/java/com/netease/arctic/optimizer/TestFlinkOptimizerContainer.java +++ b/ams/optimizer-container/src/test/java/com/netease/arctic/optimizer/container/TestFlinkOptimizerContainer.java @@ -16,10 +16,9 @@ * limitations under the License. */ -package com.netease.arctic.optimizer; +package com.netease.arctic.optimizer.container; import com.netease.arctic.ams.api.PropertyNames; -import com.netease.arctic.optimizer.util.FlinkConf; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Test; diff --git a/ams/optimizer/common/pom.xml b/ams/optimizer/common/pom.xml new file mode 100644 index 0000000000..540da48d7d --- /dev/null +++ b/ams/optimizer/common/pom.xml @@ -0,0 +1,72 @@ + + + + + + amoro-ams + com.netease.amoro + 0.6.0-SNAPSHOT + ../../pom.xml + + 4.0.0 + + optimizer-common + Amoro Project Optimizer Common + https://amoro.netease.com + + + + com.netease.amoro + amoro-core + + + + com.netease.amoro + amoro-hive + + + + args4j + args4j + + + + junit + junit + + + + com.netease.amoro + amoro-core + ${project.version} + tests + test + + + + com.netease.amoro + amoro-ams-api + ${project.version} + tests + test + + + diff --git a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/AbstractOptimizerOperator.java b/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/AbstractOptimizerOperator.java similarity index 99% rename from ams/optimizer/src/main/java/com/netease/arctic/optimizer/AbstractOptimizerOperator.java rename to ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/AbstractOptimizerOperator.java index aace5d5c0d..98e50cfa43 100644 --- a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/AbstractOptimizerOperator.java +++ b/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/AbstractOptimizerOperator.java @@ -1,4 +1,4 @@ -package com.netease.arctic.optimizer; +package com.netease.arctic.optimizer.common; import com.netease.arctic.ams.api.ArcticException; import com.netease.arctic.ams.api.ErrorCodes; diff --git a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/Optimizer.java b/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/Optimizer.java similarity index 90% rename from ams/optimizer/src/main/java/com/netease/arctic/optimizer/Optimizer.java rename to ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/Optimizer.java index e6ef7f9d2b..24b276e2bb 100644 --- a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/Optimizer.java +++ b/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/Optimizer.java @@ -1,4 +1,4 @@ -package com.netease.arctic.optimizer; +package com.netease.arctic.optimizer.common; import com.netease.arctic.ams.api.PropertyNames; import org.slf4j.Logger; @@ -8,6 +8,11 @@ import java.util.stream.IntStream; public class Optimizer { + + /** + * Job-Id, This property must be included when registering the optimizer. + */ + public static final String PROPERTY_JOB_ID = "job-id"; private static final Logger LOG = LoggerFactory.getLogger(Optimizer.class); private final OptimizerConfig config; diff --git a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/OptimizerConfig.java b/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/OptimizerConfig.java similarity index 99% rename from ams/optimizer/src/main/java/com/netease/arctic/optimizer/OptimizerConfig.java rename to ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/OptimizerConfig.java index 2b5416d4d8..7372eb75e6 100644 --- a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/OptimizerConfig.java +++ b/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/OptimizerConfig.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package com.netease.arctic.optimizer; +package com.netease.arctic.optimizer.common; import com.netease.arctic.ams.api.PropertyNames; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; diff --git a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/OptimizerExecutor.java b/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/OptimizerExecutor.java similarity index 99% rename from ams/optimizer/src/main/java/com/netease/arctic/optimizer/OptimizerExecutor.java rename to ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/OptimizerExecutor.java index b6bcfc1a23..cc4b120aac 100644 --- a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/OptimizerExecutor.java +++ b/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/OptimizerExecutor.java @@ -1,4 +1,4 @@ -package com.netease.arctic.optimizer; +package com.netease.arctic.optimizer.common; import com.netease.arctic.ams.api.OptimizingTask; import com.netease.arctic.ams.api.OptimizingTaskResult; diff --git a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/OptimizerToucher.java b/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/OptimizerToucher.java similarity index 93% rename from ams/optimizer/src/main/java/com/netease/arctic/optimizer/OptimizerToucher.java rename to ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/OptimizerToucher.java index 0187a75ea4..81a5efa325 100644 --- a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/OptimizerToucher.java +++ b/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/OptimizerToucher.java @@ -1,4 +1,4 @@ -package com.netease.arctic.optimizer; +package com.netease.arctic.optimizer.common; import com.netease.arctic.ams.api.ArcticException; import com.netease.arctic.ams.api.ErrorCodes; @@ -14,7 +14,7 @@ public class OptimizerToucher extends AbstractOptimizerOperator { private static final Logger LOG = LoggerFactory.getLogger(OptimizerToucher.class); - private OptimizerToucher.TokenChangeListener tokenChangeListener; + private TokenChangeListener tokenChangeListener; private final Map registerProperties = Maps.newHashMap(); private long startTime; @@ -23,7 +23,7 @@ public OptimizerToucher(OptimizerConfig config) { this.startTime = System.currentTimeMillis(); } - public OptimizerToucher withTokenChangeListener(OptimizerToucher.TokenChangeListener tokenChangeListener) { + public OptimizerToucher withTokenChangeListener(TokenChangeListener tokenChangeListener) { this.tokenChangeListener = tokenChangeListener; return this; } diff --git a/ams/optimizer/src/test/java/com/netease/arctic/optimizer/OptimizerTestBase.java b/ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/OptimizerTestBase.java similarity index 97% rename from ams/optimizer/src/test/java/com/netease/arctic/optimizer/OptimizerTestBase.java rename to ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/OptimizerTestBase.java index 00e037d88d..5e14c9d4a2 100644 --- a/ams/optimizer/src/test/java/com/netease/arctic/optimizer/OptimizerTestBase.java +++ b/ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/OptimizerTestBase.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package com.netease.arctic.optimizer; +package com.netease.arctic.optimizer.common; import com.netease.arctic.TestAms; import org.junit.Before; diff --git a/ams/optimizer/src/test/java/com/netease/arctic/optimizer/OptimizerTestHelpers.java b/ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/OptimizerTestHelpers.java similarity index 94% rename from ams/optimizer/src/test/java/com/netease/arctic/optimizer/OptimizerTestHelpers.java rename to ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/OptimizerTestHelpers.java index da69ac602c..68fa8c0cbc 100644 --- a/ams/optimizer/src/test/java/com/netease/arctic/optimizer/OptimizerTestHelpers.java +++ b/ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/OptimizerTestHelpers.java @@ -16,13 +16,13 @@ * limitations under the License. */ -package com.netease.arctic.optimizer; +package com.netease.arctic.optimizer.common; import org.apache.iceberg.common.DynFields; import org.kohsuke.args4j.CmdLineException; public class OptimizerTestHelpers { - public static final int CALL_AMS_INTERVAL = 500; + public static final long CALL_AMS_INTERVAL = 500; public static OptimizerConfig buildOptimizerConfig(String amsUrl) { String[] optimizerArgs = new String[]{"-a", amsUrl, "-p", "2", "-g", "g1", diff --git a/ams/optimizer/src/test/java/com/netease/arctic/optimizer/TestOptimizer.java b/ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/TestOptimizer.java similarity index 93% rename from ams/optimizer/src/test/java/com/netease/arctic/optimizer/TestOptimizer.java rename to ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/TestOptimizer.java index b2133f2331..bbc458cc97 100644 --- a/ams/optimizer/src/test/java/com/netease/arctic/optimizer/TestOptimizer.java +++ b/ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/TestOptimizer.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package com.netease.arctic.optimizer; +package com.netease.arctic.optimizer.common; import com.netease.arctic.ams.api.OptimizingTaskResult; import org.junit.Assert; @@ -35,7 +35,7 @@ public static void reduceCallAmsInterval() { @Test public void testStartOptimizer() throws InterruptedException { - OptimizerConfig optimizerConfig = OptimizerTestHelpers.buildOptimizerConfig(TEST_AMS.getServerUrl()); + OptimizerConfig optimizerConfig = OptimizerTestHelpers.buildOptimizerConfig(TEST_AMS.getServerUrl()); Optimizer optimizer = new Optimizer(optimizerConfig); new Thread(optimizer::startOptimizing).start(); TimeUnit.SECONDS.sleep(1); diff --git a/ams/optimizer/src/test/java/com/netease/arctic/optimizer/TestOptimizerConfig.java b/ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/TestOptimizerConfig.java similarity index 98% rename from ams/optimizer/src/test/java/com/netease/arctic/optimizer/TestOptimizerConfig.java rename to ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/TestOptimizerConfig.java index caebc365f5..cad7f85ba1 100644 --- a/ams/optimizer/src/test/java/com/netease/arctic/optimizer/TestOptimizerConfig.java +++ b/ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/TestOptimizerConfig.java @@ -1,4 +1,4 @@ -package com.netease.arctic.optimizer; +package com.netease.arctic.optimizer.common; import org.junit.Assert; import org.junit.Test; diff --git a/ams/optimizer/src/test/java/com/netease/arctic/optimizer/TestOptimizerExecutor.java b/ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/TestOptimizerExecutor.java similarity index 99% rename from ams/optimizer/src/test/java/com/netease/arctic/optimizer/TestOptimizerExecutor.java rename to ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/TestOptimizerExecutor.java index 438e2291ff..027d800f26 100644 --- a/ams/optimizer/src/test/java/com/netease/arctic/optimizer/TestOptimizerExecutor.java +++ b/ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/TestOptimizerExecutor.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package com.netease.arctic.optimizer; +package com.netease.arctic.optimizer.common; import com.google.common.collect.Maps; import com.netease.arctic.ams.api.OptimizerRegisterInfo; diff --git a/ams/optimizer/src/test/java/com/netease/arctic/optimizer/TestOptimizerToucher.java b/ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/TestOptimizerToucher.java similarity index 98% rename from ams/optimizer/src/test/java/com/netease/arctic/optimizer/TestOptimizerToucher.java rename to ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/TestOptimizerToucher.java index 85c371d30f..86f4de5937 100644 --- a/ams/optimizer/src/test/java/com/netease/arctic/optimizer/TestOptimizerToucher.java +++ b/ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/TestOptimizerToucher.java @@ -1,4 +1,4 @@ -package com.netease.arctic.optimizer; +package com.netease.arctic.optimizer.common; import com.netease.arctic.ams.api.OptimizerRegisterInfo; import org.apache.iceberg.relocated.com.google.common.collect.Lists; diff --git a/ams/optimizer/pom.xml b/ams/optimizer/flink-optimizer/pom.xml similarity index 94% rename from ams/optimizer/pom.xml rename to ams/optimizer/flink-optimizer/pom.xml index 395612b2ad..d613cdf7c9 100644 --- a/ams/optimizer/pom.xml +++ b/ams/optimizer/flink-optimizer/pom.xml @@ -24,11 +24,12 @@ amoro-ams com.netease.amoro 0.6.0-SNAPSHOT + ../../pom.xml 4.0.0 - amoro-ams-optimizer - Amoro Project Optimizer + flink-optimizer + Amoro Project Flink Optimizer https://amoro.netease.com @@ -39,12 +40,8 @@ com.netease.amoro - amoro-core - - - - com.netease.amoro - amoro-hive + optimizer-common + ${project.version} @@ -228,14 +225,6 @@ - - org.apache.maven.plugins - maven-compiler-plugin - - ${java.version} - ${java.version} - - org.apache.maven.plugins maven-assembly-plugin diff --git a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkExecutor.java b/ams/optimizer/flink-optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkExecutor.java similarity index 96% rename from ams/optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkExecutor.java rename to ams/optimizer/flink-optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkExecutor.java index e73cf56044..0c54f1831e 100644 --- a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkExecutor.java +++ b/ams/optimizer/flink-optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkExecutor.java @@ -18,7 +18,7 @@ package com.netease.arctic.optimizer.flink; -import com.netease.arctic.optimizer.OptimizerExecutor; +import com.netease.arctic.optimizer.common.OptimizerExecutor; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; diff --git a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkOptimizer.java b/ams/optimizer/flink-optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkOptimizer.java similarity index 94% rename from ams/optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkOptimizer.java rename to ams/optimizer/flink-optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkOptimizer.java index 1f77251bd9..806b2c5520 100644 --- a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkOptimizer.java +++ b/ams/optimizer/flink-optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkOptimizer.java @@ -1,7 +1,7 @@ package com.netease.arctic.optimizer.flink; -import com.netease.arctic.optimizer.Optimizer; -import com.netease.arctic.optimizer.OptimizerConfig; +import com.netease.arctic.optimizer.common.Optimizer; +import com.netease.arctic.optimizer.common.OptimizerConfig; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; @@ -17,8 +17,6 @@ public class FlinkOptimizer { private static final Logger LOG = LoggerFactory.getLogger(FlinkOptimizer.class); - public static final String JOB_ID_PROPERTY = "flink-job-id"; - private static final String JOB_NAME = "arctic-flink-optimizer"; public static void main(String[] args) throws CmdLineException { diff --git a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkToucher.java b/ams/optimizer/flink-optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkToucher.java similarity index 90% rename from ams/optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkToucher.java rename to ams/optimizer/flink-optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkToucher.java index 0df718b5f1..2e06cd9497 100644 --- a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkToucher.java +++ b/ams/optimizer/flink-optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkToucher.java @@ -18,7 +18,8 @@ package com.netease.arctic.optimizer.flink; -import com.netease.arctic.optimizer.OptimizerToucher; +import com.netease.arctic.optimizer.common.Optimizer; +import com.netease.arctic.optimizer.common.OptimizerToucher; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -41,7 +42,7 @@ public FlinkToucher(OptimizerToucher toucher) { public void run(SourceContext sourceContext) { String jobId = FLINK_TASK_RUNTIME_FIELDS.bind(getRuntimeContext()).get().getJobID().toString(); toucher.withTokenChangeListener(sourceContext::collect) - .withRegisterProperty(FlinkOptimizer.JOB_ID_PROPERTY, jobId) + .withRegisterProperty(Optimizer.PROPERTY_JOB_ID, jobId) .start(); } diff --git a/ams/optimizer/src/main/resources/log4j2.xml b/ams/optimizer/src/main/resources/log4j2.xml deleted file mode 100644 index aff1cd4791..0000000000 --- a/ams/optimizer/src/main/resources/log4j2.xml +++ /dev/null @@ -1,41 +0,0 @@ - - - - - - - logs - - - - - - - - - - - - - - - - - diff --git a/ams/optimizer/src/test/resources/log4j2.xml b/ams/optimizer/src/test/resources/log4j2.xml deleted file mode 100644 index 8de92f8019..0000000000 --- a/ams/optimizer/src/test/resources/log4j2.xml +++ /dev/null @@ -1,36 +0,0 @@ - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/ams/optimizer/standalone-optimizer/pom.xml b/ams/optimizer/standalone-optimizer/pom.xml new file mode 100644 index 0000000000..400a851e76 --- /dev/null +++ b/ams/optimizer/standalone-optimizer/pom.xml @@ -0,0 +1,98 @@ + + + + + + amoro-ams + com.netease.amoro + 0.6.0-SNAPSHOT + ../../pom.xml + + 4.0.0 + + standalone-optimizer + Amoro Project Standalone Optimizer + https://amoro.netease.com + + + + + com.netease.amoro + optimizer-common + ${project.version} + + + + args4j + args4j + + + + junit + junit + + + + com.netease.amoro + amoro-core + ${project.version} + tests + test + + + + com.netease.amoro + amoro-ams-api + ${project.version} + tests + test + + + + org.apache.logging.log4j + log4j-slf4j-impl + test + + + + org.apache.logging.log4j + log4j-api + test + + + + org.apache.logging.log4j + log4j-core + test + + + + org.apache.logging.log4j + log4j-1.2-api + test + + + + org.yaml + snakeyaml + + + diff --git a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/local/LocalOptimizer.java b/ams/optimizer/standalone-optimizer/src/main/java/com/netease/arctic/optimizer/standalone/StandaloneOptimizer.java similarity index 66% rename from ams/optimizer/src/main/java/com/netease/arctic/optimizer/local/LocalOptimizer.java rename to ams/optimizer/standalone-optimizer/src/main/java/com/netease/arctic/optimizer/standalone/StandaloneOptimizer.java index 1c76db7ccb..f5e020cfc6 100644 --- a/ams/optimizer/src/main/java/com/netease/arctic/optimizer/local/LocalOptimizer.java +++ b/ams/optimizer/standalone-optimizer/src/main/java/com/netease/arctic/optimizer/standalone/StandaloneOptimizer.java @@ -1,14 +1,13 @@ -package com.netease.arctic.optimizer.local; +package com.netease.arctic.optimizer.standalone; -import com.netease.arctic.optimizer.LocalOptimizerContainer; -import com.netease.arctic.optimizer.Optimizer; -import com.netease.arctic.optimizer.OptimizerConfig; +import com.netease.arctic.optimizer.common.Optimizer; +import com.netease.arctic.optimizer.common.OptimizerConfig; import org.kohsuke.args4j.CmdLineException; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; -public class LocalOptimizer { +public class StandaloneOptimizer { public static void main(String[] args) throws CmdLineException { OptimizerConfig optimizerConfig = new OptimizerConfig(args); Optimizer optimizer = new Optimizer(optimizerConfig); @@ -19,7 +18,7 @@ public static void main(String[] args) throws CmdLineException { RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); String processId = runtimeMXBean.getName().split("@")[0]; - optimizer.getToucher().withRegisterProperty(LocalOptimizerContainer.JOB_ID_PROPERTY, processId); + optimizer.getToucher().withRegisterProperty(Optimizer.PROPERTY_JOB_ID, processId); optimizer.startOptimizing(); } } diff --git a/ams/pom.xml b/ams/pom.xml index 9b46c050f0..a7186d9229 100644 --- a/ams/pom.xml +++ b/ams/pom.xml @@ -33,7 +33,10 @@ api - optimizer + optimizer-container + optimizer/flink-optimizer + optimizer/standalone-optimizer + optimizer/common server dashboard diff --git a/ams/server/pom.xml b/ams/server/pom.xml index 9c74998255..76495d27a7 100644 --- a/ams/server/pom.xml +++ b/ams/server/pom.xml @@ -53,7 +53,7 @@ com.netease.amoro - amoro-ams-optimizer + amoro-optimizer-container ${project.version} @@ -293,6 +293,13 @@ test-jar + + com.netease.amoro + standalone-optimizer + ${project.version} + test + + org.apache.curator curator-test diff --git a/ams/server/src/test/java/com/netease/arctic/server/AmsEnvironment.java b/ams/server/src/test/java/com/netease/arctic/server/AmsEnvironment.java index bf0f35e3c9..aa16ab5fdb 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/AmsEnvironment.java +++ b/ams/server/src/test/java/com/netease/arctic/server/AmsEnvironment.java @@ -10,7 +10,7 @@ import com.netease.arctic.catalog.CatalogLoader; import com.netease.arctic.catalog.CatalogTestHelpers; import com.netease.arctic.hive.HMSMockServer; -import com.netease.arctic.optimizer.local.LocalOptimizer; +import com.netease.arctic.optimizer.standalone.StandaloneOptimizer; import com.netease.arctic.server.resource.OptimizerManager; import com.netease.arctic.server.resource.ResourceContainers; import com.netease.arctic.server.table.DefaultTableService; @@ -252,7 +252,7 @@ public void startOptimizer() { new Thread(() -> { String[] startArgs = {"-a", getOptimizingServiceUrl(), "-p", "1", "-g", "default"}; try { - LocalOptimizer.main(startArgs); + StandaloneOptimizer.main(startArgs); } catch (CmdLineException e) { throw new RuntimeException(e); } @@ -394,7 +394,7 @@ private String getAmsConfig() { "\n" + "containers:\n" + " - name: localContainer\n" + - " container-impl: com.netease.arctic.optimizer.LocalOptimizerContainer\n" + + " container-impl: com.netease.arctic.optimizer.container.LocalOptimizerContainer\n" + " properties:\n" + " memory: \"1024\"\n" + " hadoop_home: /opt/hadoop\n" + diff --git a/charts/amoro/templates/amoro-configmap.yaml b/charts/amoro/templates/amoro-configmap.yaml index 23762b74a8..09810fd71d 100644 --- a/charts/amoro/templates/amoro-configmap.yaml +++ b/charts/amoro/templates/amoro-configmap.yaml @@ -111,13 +111,13 @@ data: containers: - name: localContainer - container-impl: com.netease.arctic.optimizer.LocalOptimizerContainer + container-impl: com.netease.arctic.optimizer.container.LocalOptimizerContainer properties: export.JAVA_HOME: "/opt/java" # JDK environment #containers: # - name: flinkContainer - # container-impl: com.netease.arctic.optimizer.FlinkOptimizerContainer + # container-impl: com.netease.arctic.optimizer.container.FlinkOptimizerContainer # properties: # flink-home: "/opt/flink/" # Flink install home # export.JVM_ARGS: "-Djava.security.krb5.conf=/opt/krb5.conf" # Flink launch jvm args, like kerberos config when ues kerberos diff --git a/dist/pom.xml b/dist/pom.xml index 6d2cc211ff..9e44f71f1c 100644 --- a/dist/pom.xml +++ b/dist/pom.xml @@ -40,6 +40,17 @@ amoro-ams-server ${project.version} + + com.netease.amoro + standalone-optimizer + ${project.version} + + + com.netease.amoro + flink-optimizer + ${project.version} + compile + diff --git a/dist/src/main/arctic-bin/bin/optimizer.sh b/dist/src/main/arctic-bin/bin/optimizer.sh index 19bcb7e80e..59763a5fee 100755 --- a/dist/src/main/arctic-bin/bin/optimizer.sh +++ b/dist/src/main/arctic-bin/bin/optimizer.sh @@ -45,6 +45,6 @@ if [ ! -f $STDERR_LOG ];then fi JAVA_OPTS="-Xmx$1m -Dlog.home=${OPTIMIZER_LOG_DIR}" -RUN_SERVER="com.netease.arctic.optimizer.local.LocalOptimizer" +RUN_SERVER="com.netease.arctic.optimizer.standalone.StandaloneOptimizer" CMDS="$JAVA_RUN $JAVA_OPTS $RUN_SERVER $ARGS" nohup ${CMDS} >/dev/null 2>${STDERR_LOG} & diff --git a/dist/src/main/arctic-bin/conf/config.yaml b/dist/src/main/arctic-bin/conf/config.yaml index 05df44bc36..f1bc9d7020 100644 --- a/dist/src/main/arctic-bin/conf/config.yaml +++ b/dist/src/main/arctic-bin/conf/config.yaml @@ -86,13 +86,13 @@ ams: containers: - name: localContainer - container-impl: com.netease.arctic.optimizer.LocalOptimizerContainer + container-impl: com.netease.arctic.optimizer.container.LocalOptimizerContainer properties: export.JAVA_HOME: "/opt/java" # JDK environment #containers: # - name: flinkContainer -# container-impl: com.netease.arctic.optimizer.FlinkOptimizerContainer +# container-impl: com.netease.arctic.optimizer.container.FlinkOptimizerContainer # properties: # flink-home: "/opt/flink/" # Flink install home # target: "yarn-per-job" # Flink run target, (yarn-per-job, yarn-application, kubernetes-application) diff --git a/dist/src/main/assemblies/bin.xml b/dist/src/main/assemblies/bin.xml index 5201d30a0c..8125c2c6fa 100644 --- a/dist/src/main/assemblies/bin.xml +++ b/dist/src/main/assemblies/bin.xml @@ -29,9 +29,9 @@ - ../ams/optimizer/target/amoro-ams-optimizer-${project.version}-jar-with-dependencies.jar - plugin/optimize - OptimizeJob.jar + ../ams/optimizer/flink-optimizer/target/flink-optimizer-${project.version}-jar-with-dependencies.jar + plugin/optimizer/flink + optimizer-job.jar 0644 diff --git a/docs/admin-guides/managing-optimizers.md b/docs/admin-guides/managing-optimizers.md index 8b56cea1d1..caabbbf763 100644 --- a/docs/admin-guides/managing-optimizers.md +++ b/docs/admin-guides/managing-optimizers.md @@ -25,7 +25,7 @@ Local container is a way to start Optimizer by local process and supports multi- ```yaml containers: - name: localContainer - container-impl: com.netease.arctic.optimizer.LocalOptimizerContainer + container-impl: com.netease.arctic.optimizer.container.LocalOptimizerContainer properties: export.JAVA_HOME: "/opt/java" # JDK environment ``` @@ -33,7 +33,7 @@ containers: ### Flink container Flink container is a way to start Optimizer through Flink jobs. With Flink, you can easily deploy Optimizer on yarn clusters or kubernetes clusters to support large-scale data scenarios. To use flink container, -you need to add a new container configuration. with container-impl as `com.netease.arctic.optimizer.FlinkOptimizerContainer` +you need to add a new container configuration. with container-impl as `com.netease.arctic.optimizer.container.FlinkOptimizerContainer` FlinkOptimizerContainer support the following properties: @@ -64,7 +64,7 @@ An example for yarn-per-job mode: ```yaml containers: - name: flinkContainer - container-impl: com.netease.arctic.optimizer.FlinkOptimizerContainer + container-impl: com.netease.arctic.optimizer.container.FlinkOptimizerContainer properties: flink-home: /opt/flink/ #flink install home export.HADOOP_CONF_DIR: /etc/hadoop/conf/ #hadoop config dir @@ -78,7 +78,7 @@ An example for kubernetes-application mode: ```yaml containers: - name: flinkContainer - container-impl: com.netease.arctic.optimizer.FlinkOptimizerContainer + container-impl: com.netease.arctic.optimizer.container.FlinkOptimizerContainer properties: flink-home: /opt/flink/ #flink install home target: kubernetes-application #flink run as native kubernetes @@ -171,7 +171,7 @@ You can submit optimizer in your own Flink task development platform or local Fl -Dtaskmanager.memory.network.max=32mb \ -Dtaskmanager.memory.network.min=32mb \ -c com.netease.arctic.optimizer.flink.FlinkOptimizer \ - ${ARCTIC_HOME}/plugin/optimize/OptimizeJob.jar \ + ${ARCTIC_HOME}/plugin/flink/optimizer-job.jar \ -a 127.0.0.1:1261 \ -g flinkGroup \ -p 1 \