Skip to content

Commit

Permalink
allow users to update output topics for functions and sources (apach…
Browse files Browse the repository at this point in the history
…e#4092)

* allow users to update output topics for functions and sources
  • Loading branch information
jerrypeng authored Apr 20, 2019
1 parent 1a4095e commit e5fbb89
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
Expand Down Expand Up @@ -398,26 +399,47 @@ private void testE2EPulsarFunction(String jarFilePathUrl) throws Exception {
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
final String sinkTopic = "persistent://" + replNamespace + "/output";
final String sinkTopic2 = "persistent://" + replNamespace + "/output2";
final String propertyKey = "key";
final String propertyValue = "value";
final String functionName = "PulsarSink-test";
final String functionName = "PulsarFunction-test";
final String subscriptionName = "test-sub";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);

// create a producer that creates a topic at broker
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic2).subscriptionName("sub").subscribe();

FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
"my.*", sinkTopic, subscriptionName);
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);

// try to update function to test: update-function functionality
functionConfig.setParallelism(2);
functionConfig.setOutput(sinkTopic2);
admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);

retryStrategically((test) -> {
try {
TopicStats topicStats = admin.topics().getStats(sinkTopic2);
return topicStats.publishers.size() == 2
&& topicStats.publishers.get(0).metadata != null
&& topicStats.publishers.get(0).metadata.containsKey("id")
&& topicStats.publishers.get(0).metadata.get("id").equals(String.format("%s/%s/%s", tenant, namespacePortion, functionName));
} catch (PulsarAdminException e) {
return false;
}
}, 50, 150);

TopicStats topicStats = admin.topics().getStats(sinkTopic2);
assertEquals(topicStats.publishers.size(), 2);
assertTrue(topicStats.publishers.get(0).metadata != null);
assertTrue(topicStats.publishers.get(0).metadata.containsKey("id"));
assertEquals(topicStats.publishers.get(0).metadata.get("id"), String.format("%s/%s/%s", tenant, namespacePortion, functionName));

retryStrategically((test) -> {
try {
return admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
Expand Down Expand Up @@ -712,12 +734,12 @@ private void testPulsarSourceStats(String jarFilePathUrl) throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sinkTopic = "persistent://" + replNamespace + "/output";
final String functionName = "PulsarSource-test";
final String sourceName = "PulsarSource-test";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);

SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, functionName, sinkTopic);
SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, sourceName, sinkTopic);
admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);

retryStrategically((test) -> {
Expand All @@ -728,16 +750,36 @@ private void testPulsarSourceStats(String jarFilePathUrl) throws Exception {
}
}, 10, 150);

final String sinkTopic2 = "persistent://" + replNamespace + "/output2";
sourceConfig.setTopicName(sinkTopic2);
admin.source().updateSourceWithUrl(sourceConfig, jarFilePathUrl);

retryStrategically((test) -> {
try {
return (admin.topics().getStats(sinkTopic).publishers.size() == 1) && (admin.topics().getInternalStats(sinkTopic).numberOfEntries > 4);
TopicStats sourceStats = admin.topics().getStats(sinkTopic2);
return sourceStats.publishers.size() == 1
&& sourceStats.publishers.get(0).metadata != null
&& sourceStats.publishers.get(0).metadata.containsKey("id")
&& sourceStats.publishers.get(0).metadata.get("id").equals(String.format("%s/%s/%s", tenant, namespacePortion, sourceName));
} catch (PulsarAdminException e) {
return false;
}
}, 50, 150);

TopicStats sourceStats = admin.topics().getStats(sinkTopic2);
assertEquals(sourceStats.publishers.size(), 1);
assertTrue(sourceStats.publishers.get(0).metadata != null);
assertTrue(sourceStats.publishers.get(0).metadata.containsKey("id"));
assertEquals(sourceStats.publishers.get(0).metadata.get("id"), String.format("%s/%s/%s", tenant, namespacePortion, sourceName));

retryStrategically((test) -> {
try {
return (admin.topics().getStats(sinkTopic2).publishers.size() == 1) && (admin.topics().getInternalStats(sinkTopic2).numberOfEntries > 4);
} catch (PulsarAdminException e) {
return false;
}
}, 50, 150);
assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), 1);
assertEquals(admin.topics().getStats(sinkTopic2).publishers.size(), 1);

String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
log.info("prometheusMetrics: {}", prometheusMetrics);
Expand All @@ -746,65 +788,65 @@ private void testPulsarSourceStats(String jarFilePathUrl) throws Exception {
Metric m = metrics.get("pulsar_source_received_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sourceName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
assertTrue(m.value > 0.0);
m = metrics.get("pulsar_source_received_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sourceName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
assertTrue(m.value > 0.0);
m = metrics.get("pulsar_source_written_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sourceName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
assertTrue(m.value > 0.0);
m = metrics.get("pulsar_source_written_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sourceName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
assertTrue(m.value > 0.0);
m = metrics.get("pulsar_source_source_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sourceName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_source_source_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sourceName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_source_system_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sourceName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_source_system_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sourceName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_source_last_invocation");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sourceName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
assertTrue(m.value > 0.0);

// make sure all temp files are deleted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,9 +657,6 @@ public static FunctionConfig validateUpdate(FunctionConfig existingConfig, Funct
mergedConfig.getInputSpecs().put(topicName, consumerConfig);
});
}
if (!StringUtils.isEmpty(newConfig.getOutput()) && !newConfig.getOutput().equals(existingConfig.getOutput())) {
throw new IllegalArgumentException("Output topics differ");
}
if (!StringUtils.isEmpty(newConfig.getOutputSchemaType()) && !newConfig.getOutputSchemaType().equals(existingConfig.getOutputSchemaType())) {
throw new IllegalArgumentException("Output Serde mismatch");
}
Expand All @@ -675,6 +672,9 @@ public static FunctionConfig validateUpdate(FunctionConfig existingConfig, Funct
if (newConfig.getRetainOrdering() != null && !newConfig.getRetainOrdering().equals(existingConfig.getRetainOrdering())) {
throw new IllegalArgumentException("Retain Orderning cannot be altered");
}
if (!StringUtils.isEmpty(newConfig.getOutput())) {
mergedConfig.setOutput(newConfig.getOutput());
}
if (newConfig.getUserConfig() != null) {
mergedConfig.setUserConfig(newConfig.getUserConfig());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ public static SourceConfig validateUpdate(SourceConfig existingConfig, SourceCon
if (!StringUtils.isEmpty(newConfig.getClassName())) {
mergedConfig.setClassName(newConfig.getClassName());
}
if (!StringUtils.isEmpty(newConfig.getTopicName()) && !newConfig.getTopicName().equals(existingConfig.getTopicName())) {
throw new IllegalArgumentException("Destination topics differ");
if (!StringUtils.isEmpty(newConfig.getTopicName())) {
mergedConfig.setTopicName(newConfig.getTopicName());
}
if (!StringUtils.isEmpty(newConfig.getSerdeClassName())) {
mergedConfig.setSerdeClassName(newConfig.getSerdeClassName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,6 @@ public void testMergeDifferentInputSpec() {
assertEquals(mergedConfig.getInputSpecs().get("test-input"), newFunctionConfig.getInputSpecs().get("test-input"));
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Output topics differ")
public void testMergeDifferentOutput() {
FunctionConfig functionConfig = createFunctionConfig();
FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("output", "Different");
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
}

@Test
public void testMergeDifferentLogTopic() {
FunctionConfig functionConfig = createFunctionConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,6 @@ public void testMergeDifferentClassName() {
);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Destination topics differ")
public void testMergeDifferentOutput() {
SourceConfig sourceConfig = createSourceConfig();
SourceConfig newSourceConfig = createUpdatedSourceConfig("topicName", "Different");
SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be altered")
public void testMergeDifferentProcessingGuarantees() {
SourceConfig sourceConfig = createSourceConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,30 +837,25 @@ public void testUpdateFunctionChangedParallelism() throws Exception {
}
}

@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Output topics differ")
@Test
public void testUpdateFunctionChangedInputs() throws Exception {
try {
mockStatic(WorkerUtils.class);
doNothing().when(WorkerUtils.class);
WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
mockStatic(WorkerUtils.class);
doNothing().when(WorkerUtils.class);
WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();

testUpdateFunctionMissingArguments(
tenant,
namespace,
function,
null,
topicsToSerDeClassName,
mockedFormData,
"DifferentOutput",
outputSerdeClassName,
null,
parallelism,
"Output topics differ");
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
testUpdateFunctionMissingArguments(
tenant,
namespace,
function,
null,
topicsToSerDeClassName,
mockedFormData,
"DifferentOutput",
outputSerdeClassName,
null,
parallelism,
null);
}

@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Input Topics cannot be altered")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,30 +812,25 @@ public void testUpdateFunctionChangedParallelism() throws Exception {
}
}

@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Output topics differ")
@Test
public void testUpdateFunctionChangedInputs() throws Exception {
try {
mockStatic(WorkerUtils.class);
doNothing().when(WorkerUtils.class);
WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
mockStatic(WorkerUtils.class);
doNothing().when(WorkerUtils.class);
WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();

testUpdateFunctionMissingArguments(
tenant,
namespace,
function,
null,
topicsToSerDeClassName,
mockedFormData,
"DifferentOutput",
outputSerdeClassName,
null,
parallelism,
"Output topics differ");
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
testUpdateFunctionMissingArguments(
tenant,
namespace,
function,
null,
topicsToSerDeClassName,
mockedFormData,
"DifferentOutput",
outputSerdeClassName,
null,
parallelism,
null);
}

@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Input Topics cannot be altered")
Expand Down
Loading

0 comments on commit e5fbb89

Please sign in to comment.