Skip to content

Commit

Permalink
Added an explicit field in the function details for componenttype (ap…
Browse files Browse the repository at this point in the history
…ache#4250)

* Added an explicit field in the function details for componenttype

* Fixed unittests

* Updated the defn of python pb file

* Added licence

* Took feedback into account

* Added unittest
  • Loading branch information
srkukarni authored May 15, 2019
1 parent 6b2eaa8 commit ad4c9f3
Show file tree
Hide file tree
Showing 22 changed files with 500 additions and 331 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
import org.apache.pulsar.functions.instance.stats.FunctionStatsManager;
import org.apache.pulsar.functions.instance.stats.SinkStatsManager;
import org.apache.pulsar.functions.instance.stats.SourceStatsManager;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.ComponentType;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
Expand Down Expand Up @@ -91,11 +91,11 @@ class ContextImpl implements Context, SinkContext, SourceContext {
userMetricsLabelNames[ComponentStatsManager.metricsLabelNames.length] = "metric";
}

private final ComponentType componentType;
private final Function.FunctionDetails.ComponentType componentType;

public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
ComponentType componentType, ComponentStatsManager statsManager) {
Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager) {
this.config = config;
this.logger = logger;
this.publishProducers = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
import static org.apache.pulsar.functions.utils.ComponentType.SINK;
import static org.apache.pulsar.functions.utils.ComponentType.SOURCE;

import lombok.experimental.UtilityClass;

Expand All @@ -31,7 +28,6 @@
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.utils.ComponentType;
import org.apache.pulsar.functions.utils.Reflections;

import net.jodah.typetools.TypeResolver;
Expand Down Expand Up @@ -89,23 +85,26 @@ private static <T> T createInstance(String className, ClassLoader clsLoader, Cla
}
}

public ComponentType calculateSubjectType(Function.FunctionDetails functionDetails) {
public Function.FunctionDetails.ComponentType calculateSubjectType(Function.FunctionDetails functionDetails) {
if (functionDetails.getComponentType() != Function.FunctionDetails.ComponentType.UNKNOWN) {
return functionDetails.getComponentType();
}
Function.SourceSpec sourceSpec = functionDetails.getSource();
Function.SinkSpec sinkSpec = functionDetails.getSink();
if (sourceSpec.getInputSpecsCount() == 0) {
return SOURCE;
return Function.FunctionDetails.ComponentType.SOURCE;
}
// Now its between sink and function

if (!isEmpty(sinkSpec.getBuiltin())) {
// if its built in, its a sink
return SINK;
return Function.FunctionDetails.ComponentType.SINK;
}

if (isEmpty(sinkSpec.getClassName()) || sinkSpec.getClassName().equals(PulsarSink.class.getName())) {
return FUNCTION;
return Function.FunctionDetails.ComponentType.FUNCTION;
}
return SINK;
return Function.FunctionDetails.ComponentType.SINK;
}

public static String getDefaultSubscriptionName(String tenant, String namespace, String name) {
Expand All @@ -119,7 +118,7 @@ public static String getDefaultSubscriptionName(Function.FunctionDetails functio
functionDetails.getName());
}

public static Map<String, String> getProperties(ComponentType componentType,
public static Map<String, String> getProperties(Function.FunctionDetails.ComponentType componentType,
String fullyQualifiedName, int instanceId) {
Map<String, String> properties = new HashMap<>();
switch (componentType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import org.apache.pulsar.functions.sink.PulsarSinkDisable;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
import org.apache.pulsar.functions.utils.ComponentType;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
Expand Down Expand Up @@ -129,7 +128,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {

private InstanceCache instanceCache;

private final ComponentType componentType;
private final org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType componentType;

private final Map<String, String> properties;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@

import com.google.common.collect.EvictingQueue;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.common.TextFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.ComponentType;
import org.apache.pulsar.functions.proto.Function;

import java.io.IOException;
import java.io.StringWriter;
Expand Down Expand Up @@ -58,7 +57,7 @@ public abstract class ComponentStatsManager implements AutoCloseable {
public static ComponentStatsManager getStatsManager(CollectorRegistry collectorRegistry,
String[] metricsLabels,
ScheduledExecutorService scheduledExecutorService,
ComponentType componentType) {
Function.FunctionDetails.ComponentType componentType) {
switch (componentType) {
case FUNCTION:
return new FunctionStatsManager(collectorRegistry, metricsLabels, scheduledExecutorService);
Expand Down
Loading

0 comments on commit ad4c9f3

Please sign in to comment.