Skip to content

Commit

Permalink
SAMZA-2420: Update CommandLine to use config loader for local config …
Browse files Browse the repository at this point in the history
…file (apache#1256)

Design:
https://cwiki.apache.org/confluence/display/SAMZA/SEP-23%3A+Simplify+Job+Runner

Changes:
1. Update CommandLine to use config loader instead of config factory
2. Removed common.properties and adds its values to each job config
3. Update CheckpointTool to read new offsets from local file only.

API Changes:
1. Add config-loader-factory and config-loader-properties in CommandLine to support specifying ConfigLoaderFactory and its properties needed to load config.
2. Remove config-factory and config-path in CommandLine to discountinue the usage of ConfigFactory
3. Update CheckpointTool to read new offsets from local file only.

Upgrade Instructions:
All usages in CommandLine and its subclasses will switch from --config-factory & --config-path to --config-loader-factory & --config-loader-properties, including job launch.
  • Loading branch information
kw2542 authored Jan 30, 2020
1 parent e6e0f46 commit 1a03a6a
Show file tree
Hide file tree
Showing 19 changed files with 218 additions and 159 deletions.
2 changes: 1 addition & 1 deletion bin/setup-int-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ $KAFKA_DIR/bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 1 --repli
# Start the jobs
for job in checker joiner emitter watcher
do
$SAMZA_DIR/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$SAMZA_DIR/config/join/common.properties --config-path=file://$SAMZA_DIR/config/join/$job.samza --config job.foo=$job
$SAMZA_DIR/bin/run-job.sh --config-loader-factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory --config-loader-properties path=$SAMZA_DIR/config/join/$job.samza --config job.foo=$job
done


Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public MapConfig(List<Map<String, String>> maps) {
}
}

@SafeVarargs
public MapConfig(Map<String, String>... maps) {
this(Arrays.asList(maps));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private void sendSetConfigMessage(String key, String value) {
* Main function for using the CoordinatorStreamWriter. The main function starts a CoordinatorStreamWriter
* and sends control messages.
* To run the code use the following command:
* {path to samza deployment}/samza/bin/run-coordinator-stream-writer.sh --config-factory={config-factory} --config-path={path to config file of a job} --type={type of the message} --key={[optional] key of the message} --value={[optional] value of the message}
* {path to samza deployment}/samza/bin/run-coordinator-stream-writer.sh --config-loader-factory={config--loader-factory} --config-loader-properties={properties needed for config loader to load config} --type={type of the message} --key={[optional] key of the message} --value={[optional] value of the message}
*
* @param args input arguments for running the writer. These arguments are:
* "config-factory" = The config file factory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@

import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionSet;

import org.apache.samza.config.MapConfig;
import org.apache.samza.config.Config;
import org.apache.samza.util.CommandLine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,8 +35,8 @@ public class StateStorageTool extends CommandLine {
private Logger log = LoggerFactory.getLogger(StateStorageTool.class);

@Override
public MapConfig loadConfig(OptionSet options) {
MapConfig config = super.loadConfig(options);
public Config loadConfig(OptionSet options) {
Config config = super.loadConfig(options);
if (options.has(newPathArgu)) {
newPath = options.valueOf(newPathArgu);
log.info("new state storage is " + newPath);
Expand All @@ -52,7 +51,7 @@ public String getPath() {
public static void main(String[] args) {
StateStorageTool tool = new StateStorageTool();
OptionSet options = tool.parser().parse(args);
MapConfig config = tool.loadConfig(options);
Config config = tool.loadConfig(options);
String path = tool.getPath();

StorageRecovery storageRecovery = new StorageRecovery(config, path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Clock;
import org.apache.samza.util.CommandLine;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.StreamUtil;
Expand All @@ -63,11 +62,11 @@


/**
* Recovers the state storages from the changelog streams and store the storages
* Recovers the state storage from the changelog streams and stores the state
* in the directory provided by the users. The changelog streams are derived
* from the job's config file.
*/
public class StorageRecovery extends CommandLine {
public class StorageRecovery {
private static final Logger LOG = LoggerFactory.getLogger(StorageRecovery.class);

private final Config jobConfig;
Expand Down Expand Up @@ -108,7 +107,7 @@ private void setup() {
}

/**
* run the setup phase and restore all the task storages
* run the setup phase and restore all the task storage
*/
public void run() {
setup();
Expand All @@ -126,9 +125,7 @@ public void run() {
+ " Proceeding with the next container", containerName);
}
});
this.containerStorageManagers.forEach((containerName, containerStorageManager) -> {
containerStorageManager.shutdown();
});
this.containerStorageManagers.forEach((containerName, containerStorageManager) -> containerStorageManager.shutdown());
systemAdmins.stop();

LOG.info("successfully recovered in " + storeBaseDir.toString());
Expand Down Expand Up @@ -169,13 +166,15 @@ private void getChangeLogSystemStreamsAndStorageFactories() {

LOG.info("stream name for " + storeName + " is " + streamName.orElse(null));

if (streamName.isPresent()) {
changeLogSystemStreams.put(storeName, StreamUtil.getSystemStreamFromNames(streamName.get()));
}
streamName.ifPresent(name -> changeLogSystemStreams.put(storeName, StreamUtil.getSystemStreamFromNames(name)));

Optional<String> factoryClass = config.getStorageFactoryClassName(storeName);
if (factoryClass.isPresent()) {
storageEngineFactories.put(storeName, ReflectionUtil.getObj(factoryClass.get(), StorageEngineFactory.class));
@SuppressWarnings("unchecked")
StorageEngineFactory<Object, Object> factory =
(StorageEngineFactory<Object, Object>) ReflectionUtil.getObj(factoryClass.get(), StorageEngineFactory.class);

storageEngineFactories.put(storeName, factory);
} else {
throw new SamzaException("Missing storage factory for " + storeName + ".");
}
Expand Down Expand Up @@ -204,7 +203,8 @@ private Map<String, Serde<Object>> getSerdes() {
.forEach(serdeName -> {
String serdeClassName = serializerConfig.getSerdeFactoryClass(serdeName)
.orElseGet(() -> SerializerConfig.getPredefinedSerdeFactoryName(serdeName));
Serde serde =
@SuppressWarnings("unchecked")
Serde<Object> serde =
ReflectionUtil.getObj(serdeClassName, SerdeFactory.class).getSerde(serdeName, serializerConfig);
serdeMap.put(serdeName, serde);
});
Expand All @@ -216,7 +216,7 @@ private Map<String, Serde<Object>> getSerdes() {
* create one TaskStorageManager for each task. Add all of them to the
* List<TaskStorageManager>
*/
@SuppressWarnings({"unchecked", "rawtypes"})
@SuppressWarnings("rawtypes")
private void getContainerStorageManagers() {
Clock clock = SystemClock.instance();
StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, clock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

package org.apache.samza.checkpoint

import java.io.FileInputStream
import java.net.URI
import java.util
import java.util.function.Supplier
import java.util.Properties
import java.util.regex.Pattern

import joptsimple.ArgumentAcceptingOptionSpec
Expand All @@ -32,7 +33,7 @@ import org.apache.samza.container.TaskName
import org.apache.samza.job.JobRunner.info
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.util.{CommandLine, ConfigUtil, CoordinatorStreamUtil, Logging, ReflectionUtil, Util}
import org.apache.samza.util.{CommandLine, ConfigUtil, CoordinatorStreamUtil, Logging}
import org.apache.samza.Partition
import org.apache.samza.SamzaException

Expand Down Expand Up @@ -91,7 +92,7 @@ object CheckpointTool {

var newOffsets: TaskNameToCheckpointMap = _

def parseOffsets(propertiesFile: Config): TaskNameToCheckpointMap = {
def parseOffsets(propertiesFile: Properties): TaskNameToCheckpointMap = {
var checkpoints : ListBuffer[(TaskName, Map[SystemStreamPartition, String])] = ListBuffer()
propertiesFile.asScala.foreach { case (key, value) =>
val matcher = SSP_REGEX.matcher(key)
Expand All @@ -117,10 +118,15 @@ object CheckpointTool {
.mapValues(m => m.reduce( _ ++ _)) // Merge all the maps of SSPs->Offset into one for the whole taskname
}

override def loadConfig(options: OptionSet): MapConfig = {
override def loadConfig(options: OptionSet): Config = {
val config = super.loadConfig(options)
if (options.has(newOffsetsOpt)) {
val properties = configFactory.getConfig(options.valueOf(newOffsetsOpt))
val newOffsetsInputStream = new FileInputStream(options.valueOf(newOffsetsOpt).getPath)
val properties = new Properties()

properties.load(newOffsetsInputStream)
newOffsetsInputStream.close()

newOffsets = parseOffsets(properties)
}
config
Expand Down Expand Up @@ -192,12 +198,11 @@ class CheckpointTool(newOffsets: TaskNameToCheckpointMap, coordinatorStreamStore

if (newOffsets != null) {
newOffsets.foreach {
case (taskName: TaskName, offsets: Map[SystemStreamPartition, String]) => {
case (taskName: TaskName, offsets: Map[SystemStreamPartition, String]) =>
logCheckpoint(taskName, offsets, "New offset to be written for task: " + taskName)
val checkpoint = new Checkpoint(offsets.asJava)
checkpointManager.writeCheckpoint(taskName, checkpoint)
info(s"Updated the checkpoint of the task: $taskName to: $offsets")
}
}
}
} finally {
Expand All @@ -207,7 +212,7 @@ class CheckpointTool(newOffsets: TaskNameToCheckpointMap, coordinatorStreamStore
}

def getConfigFromCoordinatorStream(coordinatorStreamStore: CoordinatorStreamStore): Config = {
return CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
}

def logCheckpoint(tn: TaskName, checkpoint: Map[SystemStreamPartition, String], prefix: String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,31 @@
package org.apache.samza.coordinator.stream

import org.apache.samza.util.CommandLine
import joptsimple.OptionSet
import joptsimple.{ArgumentAcceptingOptionSpec, OptionSet}

class CoordinatorStreamWriterCommandLine extends CommandLine {

val messageType =
val messageType: ArgumentAcceptingOptionSpec[String] =
parser.accepts("type", "the type of the message being sent.")
.withRequiredArg
.ofType(classOf[java.lang.String])
.describedAs("Required field. This field is the type of the message being sent." +
" The possible values are {\"set-config\"}")


val messageKey =
val messageKey: ArgumentAcceptingOptionSpec[String] =
parser.accepts("key", "the type of the message being sent")
.withRequiredArg
.ofType(classOf[java.lang.String])
.describedAs("key of the message")

val messageValue =
val messageValue: ArgumentAcceptingOptionSpec[String] =
parser.accepts("value", "the type of the message being sent")
.withRequiredArg
.ofType(classOf[java.lang.String])
.describedAs("value of the message")

def loadType(options: OptionSet) = {
def loadType(options: OptionSet): String = {
if (!options.has(messageType)) {
parser.printHelpOn(System.err)
System.exit(-1)
Expand All @@ -60,7 +60,7 @@ class CoordinatorStreamWriterCommandLine extends CommandLine {
}
}

def loadValue(options: OptionSet) = {
def loadValue(options: OptionSet): String = {
var value: java.lang.String = null
if (options.has(messageValue)) {
value = options.valueOf(messageValue)
Expand Down
57 changes: 28 additions & 29 deletions samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,55 +19,54 @@

package org.apache.samza.util

import java.net.URI
import joptsimple.{OptionParser, OptionSet}
import joptsimple.{ArgumentAcceptingOptionSpec, OptionParser, OptionSet}
import joptsimple.util.KeyValuePair
import org.apache.samza.config.{ConfigFactory, MapConfig}
import org.apache.samza.config.factories.PropertiesConfigFactory
import scala.collection.mutable.Buffer
import org.apache.samza.config.{Config, ConfigLoaderFactory, JobConfig, MapConfig}
import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory

import scala.collection.JavaConverters._
import scala.collection.mutable

/**
* Defines a basic set of command-line options for Samza tasks. Tools can use this
* class directly, or subclass it to add their own options.
*/
class CommandLine {
val parser = new OptionParser()
val configFactoryOpt =
parser.accepts("config-factory", "The config factory to use to read your config file.")
val configLoaderFactoryOpt: ArgumentAcceptingOptionSpec[String] =
parser.accepts("config-loader-factory", "The config loader factory to use to read full job config file.")
.withRequiredArg
.ofType(classOf[java.lang.String])
.describedAs("com.foo.bar.ClassName")
.defaultsTo(classOf[PropertiesConfigFactory].getName)
val configPathOpt =
parser.accepts("config-path", "URI location to a config file (e.g. file:///some/local/path.properties). " +
"If multiple files are given they are all used with later files overriding any values that appear in earlier files.")
.defaultsTo(classOf[PropertiesConfigLoaderFactory].getName)
val configLoaderPropertiesOpt: ArgumentAcceptingOptionSpec[KeyValuePair] =
parser.accepts("config-loader-properties", "A config loader property in the form key=value. Config loader properties will be passed to " +
"designated config loader factory to load full job config.")
.withRequiredArg
.ofType(classOf[URI])
.describedAs("path")
val configOverrideOpt =
.ofType(classOf[KeyValuePair])
.describedAs("key=value")
val configOverrideOpt: ArgumentAcceptingOptionSpec[KeyValuePair] =
parser.accepts("config", "A configuration value in the form key=value. Command line properties override any configuration values given.")
.withRequiredArg
.ofType(classOf[KeyValuePair])
.describedAs("key=value")

var configFactory: ConfigFactory = null

def loadConfig(options: OptionSet) = {
// Verify legitimate parameters.
if (!options.has(configPathOpt)) {
parser.printHelpOn(System.err)
System.exit(-1)
}
var configLoaderFactory: ConfigLoaderFactory = _

def loadConfig(options: OptionSet): Config = {
// Set up the job parameters.
val configFactoryClassName = options.valueOf(configFactoryOpt)
val configPaths = options.valuesOf(configPathOpt)
configFactory = ReflectionUtil.getObj(configFactoryClassName, classOf[ConfigFactory])
val configOverrides = options.valuesOf(configOverrideOpt).asScala.map(kv => (kv.key, kv.value)).toMap
val configLoaderFactoryClassName = options.valueOf(configLoaderFactoryOpt)
val configLoaderProperties = options.valuesOf(configLoaderPropertiesOpt).asScala
.map(kv => (ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + kv.key, kv.value))
.toMap
val configOverrides = options.valuesOf(configOverrideOpt).asScala
.map(kv => (kv.key, kv.value))
.toMap
val original = mutable.HashMap[String, String]()
original += JobConfig.CONFIG_LOADER_FACTORY -> configLoaderFactoryClassName
original ++= configLoaderProperties
original ++= configOverrides

val configs: Buffer[java.util.Map[String, String]] = configPaths.asScala.map(configFactory.getConfig)
configs += configOverrides.asJava
new MapConfig(configs.asJava)
ConfigUtil.loadConfig(new MapConfig(original.asJava))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ public class TestApplicationRunnerMain {
public void TestRunOperation() throws Exception {
assertEquals(0, TestApplicationRunnerInvocationCounts.runCount);
ApplicationRunnerMain.main(new String[]{
"--config-factory",
"org.apache.samza.config.factories.PropertiesConfigFactory",
"--config-path",
getClass().getResource("/test.properties").getPath(),
"--config-loader-factory",
"org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
"--config-loader-properties",
"path=" + getClass().getResource("/test.properties").getPath(),
"-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()),
"-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()),
});
Expand All @@ -51,10 +51,10 @@ public void TestRunOperation() throws Exception {
public void TestKillOperation() throws Exception {
assertEquals(0, TestApplicationRunnerInvocationCounts.killCount);
ApplicationRunnerMain.main(new String[]{
"--config-factory",
"org.apache.samza.config.factories.PropertiesConfigFactory",
"--config-path",
getClass().getResource("/test.properties").getPath(),
"--config-loader-factory",
"org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
"--config-loader-properties",
"path=" + getClass().getResource("/test.properties").getPath(),
"-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()),
"-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()),
"--operation=kill"
Expand All @@ -67,10 +67,10 @@ public void TestKillOperation() throws Exception {
public void TestStatusOperation() throws Exception {
assertEquals(0, TestApplicationRunnerInvocationCounts.statusCount);
ApplicationRunnerMain.main(new String[]{
"--config-factory",
"org.apache.samza.config.factories.PropertiesConfigFactory",
"--config-path",
getClass().getResource("/test.properties").getPath(),
"--config-loader-factory",
"org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
"--config-loader-properties",
"path=" + getClass().getResource("/test.properties").getPath(),
"-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()),
"-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()),
"--operation=status"
Expand Down
Loading

0 comments on commit 1a03a6a

Please sign in to comment.