Skip to content

Commit

Permalink
Standardize on input/output terminology for Pulsar Functions (apache#…
Browse files Browse the repository at this point in the history
…1378)

* fix usage of 'sink' terminology

* fix usages of 'source'

* fix additional usages
  • Loading branch information
lucperkins authored and merlimat committed Mar 15, 2018
1 parent cad2039 commit dc34ab6
Show file tree
Hide file tree
Showing 19 changed files with 243 additions and 243 deletions.
4 changes: 2 additions & 2 deletions pulsar-client-cpp/python/functions/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ def publish(self, topic_name, message):
pass

@abstractmethod
def get_sink_topic(self):
'''Returns the sink topic of function'''
def get_output_topic(self):
'''Returns the output topic of function'''
pass

@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,13 @@ public void testLocalRunnerCmdYaml() throws Exception {
@Test
public void testCreateFunction() throws Exception {
String fnName = TEST_NAME + "-function";
String sourceTopicName = TEST_NAME + "-source-topic";
String sinkTopicName = TEST_NAME + "-sink-topic";
String inputTopicName = TEST_NAME + "-input-topic";
String outputTopicName = TEST_NAME + "-output-topic";
cmd.run(new String[] {
"create",
"--name", fnName,
"--inputs", sourceTopicName,
"--output", sinkTopicName,
"--inputs", inputTopicName,
"--output", outputTopicName,
"--jar", "SomeJar.jar",
"--tenant", "sample",
"--namespace", "ns1",
Expand All @@ -185,8 +185,8 @@ public void testCreateFunction() throws Exception {

CreateFunction creater = cmd.getCreater();
assertEquals(fnName, creater.getFunctionName());
assertEquals(sourceTopicName, creater.getInputs());
assertEquals(sinkTopicName, creater.getOutput());
assertEquals(inputTopicName, creater.getInputs());
assertEquals(outputTopicName, creater.getOutput());

verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());

Expand All @@ -195,13 +195,13 @@ public void testCreateFunction() throws Exception {
@Test
public void testCreateWithoutTenant() throws Exception {
String fnName = TEST_NAME + "-function";
String sourceTopicName = "persistent://tenant/standalone/namespace/source-topic";
String sinkTopicName = "persistent://tenant/standalone/namespace/sink-topic";
String inputTopicName = "persistent://tenant/standalone/namespace/input-topic";
String outputTopicName = "persistent://tenant/standalone/namespace/output-topic";
cmd.run(new String[] {
"create",
"--name", fnName,
"--inputs", sourceTopicName,
"--output", sinkTopicName,
"--inputs", inputTopicName,
"--output", outputTopicName,
"--jar", "SomeJar.jar",
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
Expand All @@ -215,13 +215,13 @@ public void testCreateWithoutTenant() throws Exception {
@Test
public void testCreateWithoutNamespace() throws Exception {
String fnName = TEST_NAME + "-function";
String sourceTopicName = "persistent://tenant/standalone/namespace/source-topic";
String sinkTopicName = "persistent://tenant/standalone/namespace/sink-topic";
String inputTopicName = "persistent://tenant/standalone/namespace/input-topic";
String outputTopicName = "persistent://tenant/standalone/namespace/output-topic";
cmd.run(new String[] {
"create",
"--name", fnName,
"--inputs", sourceTopicName,
"--output", sinkTopicName,
"--inputs", inputTopicName,
"--output", outputTopicName,
"--jar", "SomeJar.jar",
"--className", DummyFunction.class.getName(),
});
Expand All @@ -234,12 +234,12 @@ public void testCreateWithoutNamespace() throws Exception {

@Test
public void testCreateWithoutFunctionName() throws Exception {
String sourceTopicName = TEST_NAME + "-source-topic";
String sinkTopicName = TEST_NAME + "-sink-topic";
String inputTopicName = TEST_NAME + "-input-topic";
String outputTopicName = TEST_NAME + "-output-topic";
cmd.run(new String[] {
"create",
"--inputs", sourceTopicName,
"--output", sinkTopicName,
"--inputs", inputTopicName,
"--output", outputTopicName,
"--jar", "SomeJar.jar",
"--tenant", "sample",
"--namespace", "ns1",
Expand All @@ -252,19 +252,19 @@ public void testCreateWithoutFunctionName() throws Exception {
}

@Test
public void testCreateWithoutSinkTopic() throws Exception {
String sourceTopicName = TEST_NAME + "-source-topic";
public void testCreateWithoutOutputTopic() throws Exception {
String inputTopicName = TEST_NAME + "-input-topic";
cmd.run(new String[] {
"create",
"--inputs", sourceTopicName,
"--inputs", inputTopicName,
"--jar", "SomeJar.jar",
"--tenant", "sample",
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
});

CreateFunction creater = cmd.getCreater();
assertEquals(sourceTopicName + "-" + "CmdFunctionsTest$DummyFunction" + "-output", creater.getFunctionConfig().getOutput());
assertEquals(inputTopicName + "-" + "CmdFunctionsTest$DummyFunction" + "-output", creater.getFunctionConfig().getOutput());
verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
}

Expand Down Expand Up @@ -313,16 +313,16 @@ public void testDeleteFunction() throws Exception {
@Test
public void testUpdateFunction() throws Exception {
String fnName = TEST_NAME + "-function";
String sourceTopicName = TEST_NAME + "-source-topic";
String sinkTopicName = TEST_NAME + "-sink-topic";
String inputTopicName = TEST_NAME + "-input-topic";
String outputTopicName = TEST_NAME + "-output-topic";



cmd.run(new String[] {
"update",
"--name", fnName,
"--inputs", sourceTopicName,
"--output", sinkTopicName,
"--inputs", inputTopicName,
"--output", outputTopicName,
"--jar", "SomeJar.jar",
"--tenant", "sample",
"--namespace", "ns1",
Expand All @@ -331,8 +331,8 @@ public void testUpdateFunction() throws Exception {

UpdateFunction updater = cmd.getUpdater();
assertEquals(fnName, updater.getFunctionName());
assertEquals(sourceTopicName, updater.getInputs());
assertEquals(sinkTopicName, updater.getOutput());
assertEquals(inputTopicName, updater.getInputs());
assertEquals(outputTopicName, updater.getOutput());

verify(functions, times(1)).updateFunction(any(FunctionConfig.class), anyString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ public interface Context {
String getTopicName();

/**
* Get a list of all source topics
* @return a list of all source topics
* Get a list of all input topics
* @return a list of all input topics
*/
Collection<String> getSourceTopics();
Collection<String> getInputTopics();

/**
* Get sink topic of function
* @return sink topic name
* Get the output topic of the function
* @return output topic name
*/
String getSinkTopic();
String getOutputTopic();

/**
* Get output Serde class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,21 @@ public void update(double value) {
private ProducerConfiguration producerConfiguration;
private PulsarClient pulsarClient;
private ClassLoader classLoader;
private Map<String, Consumer> sourceConsumers;
private Map<String, Consumer> inputConsumers;
@Getter
@Setter
private StateContextImpl stateContext;

public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
ClassLoader classLoader, Map<String, Consumer> sourceConsumers) {
ClassLoader classLoader, Map<String, Consumer> inputConsumers) {
this.config = config;
this.logger = logger;
this.pulsarClient = client;
this.classLoader = classLoader;
this.accumulatedMetrics = new ConcurrentHashMap<>();
this.publishProducers = new HashMap<>();
this.publishSerializers = new HashMap<>();
this.sourceConsumers = sourceConsumers;
this.inputConsumers = inputConsumers;
producerConfiguration = new ProducerConfiguration();
producerConfiguration.setBlockIfQueueFull(true);
producerConfiguration.setBatchingEnabled(true);
Expand All @@ -124,12 +124,12 @@ public String getTopicName() {
}

@Override
public Collection<String> getSourceTopics() {
return sourceConsumers.keySet();
public Collection<String> getInputTopics() {
return inputConsumers.keySet();
}

@Override
public String getSinkTopic() {
public String getOutputTopic() {
return config.getFunctionConfig().getOutput();
}

Expand Down Expand Up @@ -236,7 +236,7 @@ public <O> CompletableFuture<Void> publish(String topicName, O object, String se

@Override
public CompletableFuture<Void> ack(byte[] messageId, String topic) {
if (!sourceConsumers.containsKey(topic)) {
if (!inputConsumers.containsKey(topic)) {
throw new RuntimeException("No such input topic " + topic);
}

Expand All @@ -246,7 +246,7 @@ public CompletableFuture<Void> ack(byte[] messageId, String topic) {
} catch (IOException e) {
throw new RuntimeException("Invalid message id to ack", e);
}
return sourceConsumers.get(topic).acknowledgeAsync(actualMessageId);
return inputConsumers.get(topic).acknowledgeAsync(actualMessageId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ public class JavaInstance implements AutoCloseable {
public JavaInstance(InstanceConfig config, Object userClassObject,
ClassLoader clsLoader,
PulsarClient pulsarClient,
Map<String, Consumer> sourceConsumers) {
Map<String, Consumer> inputConsumers) {
// TODO: cache logger instances by functions?
Logger instanceLog = LoggerFactory.getLogger("function-" + config.getFunctionConfig().getName());

this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, sourceConsumers);
this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, inputConsumers);

// create the functions
if (userClassObject instanceof Function) {
Expand Down
Loading

0 comments on commit dc34ab6

Please sign in to comment.