diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/AuthenticationConfig.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/AuthenticationConfig.java index 7543635b0a5e9..017747458f3e9 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/AuthenticationConfig.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/AuthenticationConfig.java @@ -18,14 +18,18 @@ */ package org.apache.pulsar.common.functions; +import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; /** * Configuration to aggregate various authentication params. */ @Data @Builder +@AllArgsConstructor +@NoArgsConstructor public class AuthenticationConfig { private String clientAuthenticationPlugin; private String clientAuthenticationParameters; diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java index 8a83b378a8765..527f9b854e43b 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java @@ -95,7 +95,6 @@ public enum Runtime { private String batchBuilder; private Boolean forwardSourceMessageProperty; private Map userConfig; - private Map externalPulsars; // This is a map of secretName(aka how the secret is going to be // accessed in the function via context) to an object that // encapsulates how the secret is fetched by the underlying diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 95e30022b267b..722aa6b1d821f 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -52,7 +52,6 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.common.functions.ConsumerConfig; -import org.apache.pulsar.common.functions.ExternalPulsarConfig; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.ProducerConfig; import org.apache.pulsar.common.functions.Resources; @@ -322,8 +321,6 @@ abstract class FunctionDetailsCommand extends BaseCommand { protected String customRuntimeOptions; @Parameter(names = "--dead-letter-topic", description = "The topic where messages that are not processed successfully are sent to") protected String deadLetterTopic; - @Parameter(names = "--external-pulsars", description = "The map of external pulsar cluster name to its configuration (as a JSON string)") - protected String externalPulsars; protected FunctionConfig functionConfig; protected String userCodeFile; @@ -402,11 +399,6 @@ void processArguments() throws Exception { if (null != output) { functionConfig.setOutput(output); } - if (null != externalPulsars) { - Type type = new TypeToken>() { - }.getType(); - functionConfig.setExternalPulsars(new Gson().fromJson(externalPulsars, type)); - } if (null != producerConfig) { Type type = new TypeToken() {}.getType(); functionConfig.setProducerConfig(new Gson().fromJson(producerConfig, type)); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 8e30854d7c8a5..fd183dfcab76d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -78,7 +78,7 @@ public static TopicName get(String domain, String tenant, String namespace, Stri } public static TopicName get(String domain, String tenant, String cluster, String namespace, - String topic) { + String topic) { String name = domain + "://" + tenant + '/' + cluster + '/' + namespace + '/' + topic; return TopicName.get(name); } diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java index 856b1b0ca44f3..5b5b5f185754c 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java @@ -271,14 +271,6 @@ public interface Context { */ PulsarAdmin getPulsarAdmin(); - /** - * Get the pulsar admin client by cluster name. - * - * @param clusterName The name of the cluster name for pulsar admin client - * @return The instance of pulsar admin client - */ - PulsarAdmin getPulsarAdmin(String clusterName); - /** * Record a user defined metric. * @@ -320,18 +312,6 @@ public interface Context { */ TypedMessageBuilder newOutputMessage(String topicName, Schema schema) throws PulsarClientException; - /** - * New output message using schema for serializing to the topic in the cluster - * - * @param clusterName the name of the cluster for topic - * @param topicName The name of the topic for output message - * @param schema provide a way to convert between serialized data and domain objects - * @param - * @return the message builder instance - * @throws PulsarClientException - */ - TypedMessageBuilder newOutputMessage(String clusterName, String topicName, Schema schema) throws PulsarClientException; - /** * Create a ConsumerBuilder with the schema. * @@ -341,4 +321,4 @@ public interface Context { * @throws PulsarClientException */ ConsumerBuilder newConsumerBuilder(Schema schema) throws PulsarClientException; -} +} \ No newline at end of file diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java index 79aab6c864a6d..1d7f23ac682bc 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java @@ -18,12 +18,11 @@ */ package org.apache.pulsar.functions.api; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Schema; - import java.util.Collections; import java.util.Map; import java.util.Optional; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ExternalPulsarConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AuthenticationConfig.java similarity index 64% rename from pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ExternalPulsarConfig.java rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AuthenticationConfig.java index d6be5df0479ec..a3461cc3e8b52 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ExternalPulsarConfig.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AuthenticationConfig.java @@ -16,27 +16,21 @@ * specific language governing permissions and limitations * under the License. */ +package org.apache.pulsar.functions.instance; -package org.apache.pulsar.common.functions; - -import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.NoArgsConstructor; /** - * Configuration of extra pulsar clusters to sent output message. + * Configuration to aggregate various authentication params. */ @Data @Builder -@NoArgsConstructor -@AllArgsConstructor -@EqualsAndHashCode -public class ExternalPulsarConfig { - private String name; - private String serviceURL; - private String webServiceURL; - private AuthenticationConfig authConfig; - private ProducerConfig producerConfig; +public class AuthenticationConfig { + private String clientAuthenticationPlugin; + private String clientAuthenticationParameters; + private String tlsTrustCertsFilePath; + private boolean useTls; + private boolean tlsAllowInsecureConnection; + private boolean tlsHostnameVerificationEnable; } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index eb9f541694572..fad49f2b35552 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -54,8 +53,6 @@ import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.ProducerBuilderImpl; -import org.apache.pulsar.client.impl.TopicMessageIdImpl; -import org.apache.pulsar.common.functions.ExternalPulsarConfig; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.functions.api.Context; @@ -71,8 +68,8 @@ import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.secretsprovider.SecretsProvider; +import org.apache.pulsar.functions.source.TopicSchema; import org.apache.pulsar.functions.utils.FunctionCommon; -import org.apache.pulsar.functions.utils.ProducerConfigUtils; import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.SourceContext; import org.slf4j.Logger; @@ -83,7 +80,7 @@ /** * This class implements the Context interface exposed to the user. */ -@ToString +@ToString(exclude = {"pulsarAdmin"}) class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable { private InstanceConfig config; private Logger logger; @@ -91,10 +88,13 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable // Per Message related private Record record; - @VisibleForTesting - private String defaultPulsarCluster; - @VisibleForTesting - private Map externalPulsarClusters; + private final PulsarClient client; + private final PulsarAdmin pulsarAdmin; + private Map> publishProducers; + private ThreadLocal>> tlPublishProducers; + private ProducerBuilderImpl producerBuilder; + + private final TopicSchema topicSchema; private final SecretsProvider secretsProvider; private final Map secretsMap; @@ -135,26 +135,28 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, StateManager stateManager, PulsarAdmin pulsarAdmin) { this.config = config; this.logger = logger; + this.client = client; + this.pulsarAdmin = pulsarAdmin; + this.topicSchema = new TopicSchema(client); this.statsManager = statsManager; - this.externalPulsarClusters = new HashMap<>(); - if (!config.getFunctionDetails().getExternalPulsarsMap().isEmpty()) { - Map externalPulsarConfig = new Gson().fromJson(config.getFunctionDetails().getExternalPulsarsMap(), - new TypeToken>() { - }.getType()); - for (Map.Entry entry : externalPulsarConfig.entrySet()) { - try { - this.externalPulsarClusters.put(entry.getKey(), - new PulsarCluster(InstanceUtils.createPulsarClient(entry.getValue().getServiceURL(), entry.getValue().getAuthConfig()), - config.isExposePulsarAdminClientEnabled() ? InstanceUtils.createPulsarAdminClient(entry.getValue().getWebServiceURL(), entry.getValue().getAuthConfig()) : null, - ProducerConfigUtils.convert(entry.getValue().getProducerConfig()))); - } catch (PulsarClientException ex) { - throw new RuntimeException("failed to create pulsar client for external cluster: " + entry.getKey(), ex); - } + this.producerBuilder = (ProducerBuilderImpl) client.newProducer().blockIfQueueFull(true).enableBatching(true) + .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS); + boolean useThreadLocalProducers = false; + if (config.getFunctionDetails().getSink().getProducerSpec() != null) { + if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages() != 0) { + this.producerBuilder.maxPendingMessages(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages()); + } + if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) { + this.producerBuilder.maxPendingMessagesAcrossPartitions(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions()); } + useThreadLocalProducers = config.getFunctionDetails().getSink().getProducerSpec().getUseThreadLocalProducers(); + } + if (useThreadLocalProducers) { + tlPublishProducers = new ThreadLocal<>(); + } else { + publishProducers = new HashMap<>(); } - this.defaultPulsarCluster = "default-" + UUID.randomUUID(); - this.externalPulsarClusters.put(defaultPulsarCluster, new PulsarCluster(client, config.isExposePulsarAdminClientEnabled() ? pulsarAdmin : null, config.getFunctionDetails().getSink().getProducerSpec())); if (config.getFunctionDetails().getUserConfig().isEmpty()) { userConfigs = new HashMap<>(); @@ -339,19 +341,8 @@ public String getSecret(String secretName) { @Override public PulsarAdmin getPulsarAdmin() { - return getPulsarAdmin(defaultPulsarCluster); - } - - @Override - public PulsarAdmin getPulsarAdmin(String clusterName) { if (exposePulsarAdminClientEnabled) { - PulsarCluster pulsarCluster = externalPulsarClusters.get(clusterName); - if (pulsarCluster != null) { - return pulsarCluster.getAdminClient(); - } else { - throw new IllegalArgumentException("PulsarAdmin for cluster " + clusterName + " is not available, only " - + externalPulsarClusters.keySet()); - } + return pulsarAdmin; } else { throw new IllegalStateException("PulsarAdmin is not enabled in function worker"); } @@ -445,19 +436,14 @@ public CompletableFuture publish(String topicName, O object) { @SuppressWarnings("unchecked") @Override public CompletableFuture publish(String topicName, O object, String schemaOrSerdeClassName) { - return publish(topicName, object, (Schema) externalPulsarClusters.get(defaultPulsarCluster).getTopicSchema().getSchema(topicName, object, schemaOrSerdeClassName, false)); + return publish(topicName, object, (Schema) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false)); } @Override public TypedMessageBuilder newOutputMessage(String topicName, Schema schema) throws PulsarClientException { - return newOutputMessage(defaultPulsarCluster, topicName, schema); - } - - @Override - public TypedMessageBuilder newOutputMessage(String pulsarName, String topicName, Schema schema) throws PulsarClientException { MessageBuilderImpl messageBuilder = new MessageBuilderImpl<>(); TypedMessageBuilder typedMessageBuilder; - Producer producer = getProducer(pulsarName, topicName, schema); + Producer producer = getProducer(topicName, schema); if (schema != null) { typedMessageBuilder = producer.newMessage(schema); } else { @@ -469,7 +455,7 @@ public TypedMessageBuilder newOutputMessage(String pulsarName, String top @Override public ConsumerBuilder newConsumerBuilder(Schema schema) throws PulsarClientException { - return this.externalPulsarClusters.get(defaultPulsarCluster).getClient().newConsumer(schema); + return this.client.newConsumer(schema); } @Override @@ -478,12 +464,8 @@ public SubscriptionType getSubscriptionType() { } public CompletableFuture publish(String topicName, O object, Schema schema) { - return publish(defaultPulsarCluster, topicName, object, schema); - } - - public CompletableFuture publish(String pulsarName, String topicName, O object, Schema schema) { try { - return newOutputMessage(pulsarName, topicName, schema).value(object).sendAsync().thenApply(msgId -> null); + return newOutputMessage(topicName, schema).value(object).sendAsync().thenApply(msgId -> null); } catch (PulsarClientException e) { logger.error("Failed to create Producer while doing user publish", e); return FutureUtil.failedFuture(e); @@ -505,23 +487,22 @@ public void recordMetric(String metricName, double value) { } } - private Producer getProducer(String pulsarName, String topicName, Schema schema) throws PulsarClientException { + private Producer getProducer(String topicName, Schema schema) throws PulsarClientException { Producer producer; - PulsarCluster pulsar = externalPulsarClusters.get(pulsarName); - if (pulsar.getTlPublishProducers() != null) { - Map> producerMap = pulsar.getTlPublishProducers().get(); + if (tlPublishProducers != null) { + Map> producerMap = tlPublishProducers.get(); if (producerMap == null) { producerMap = new HashMap<>(); - pulsar.getTlPublishProducers().set(producerMap); + tlPublishProducers.set(producerMap); } producer = (Producer) producerMap.get(topicName); } else { - producer = (Producer) pulsar.getPublishProducers().get(topicName); + producer = (Producer) publishProducers.get(topicName); } if (producer == null) { - Producer newProducer = ((ProducerBuilderImpl) pulsar.getProducerBuilder().clone()) + Producer newProducer = ((ProducerBuilderImpl) producerBuilder.clone()) .schema(schema) .blockIfQueueFull(true) .enableBatching(true) @@ -542,10 +523,10 @@ private Producer getProducer(String pulsarName, String topicName, Schema< this.config.getInstanceId())) .create(); - if (pulsar.getTlPublishProducers() != null) { - pulsar.getTlPublishProducers().get().put(topicName, newProducer); + if (tlPublishProducers != null) { + tlPublishProducers.get().put(topicName, newProducer); } else { - Producer existingProducer = (Producer) pulsar.getPublishProducers().putIfAbsent(topicName, newProducer); + Producer existingProducer = (Producer) publishProducers.putIfAbsent(topicName, newProducer); if (existingProducer != null) { // The value in the map was not updated after the concurrent put @@ -695,23 +676,20 @@ public void setUnderlyingBuilder(TypedMessageBuilder underlyingBuilder) { public void close() { List futures = new LinkedList<>(); - for (Map.Entry pulsarEntry : externalPulsarClusters.entrySet()) { - PulsarCluster pulsar = pulsarEntry.getValue(); - if (pulsar.getPublishProducers() != null) { - for (Producer producer : pulsar.getPublishProducers().values()) { - futures.add(producer.closeAsync()); - } + if (publishProducers != null) { + for (Producer producer : publishProducers.values()) { + futures.add(producer.closeAsync()); } + } - if (pulsar.getTlPublishProducers() != null) { - for (Producer producer : pulsar.getTlPublishProducers().get().values()) { - futures.add(producer.closeAsync()); - } + if (tlPublishProducers != null) { + for (Producer producer : tlPublishProducers.get().values()) { + futures.add(producer.closeAsync()); } + } - if (exposePulsarAdminClientEnabled && pulsar.getAdminClient() != null) { - pulsar.getAdminClient().close(); - } + if (pulsarAdmin != null) { + pulsarAdmin.close(); } try { @@ -795,5 +773,4 @@ Consumer getConsumer(String topic, int partition) throws PulsarClientExceptio throw new PulsarClientException("Consumer for topic " + topic + " partition " + partition + " is not found"); } - } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java index aeeaf6c2be64c..40a2b1cf24ab6 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java @@ -19,7 +19,6 @@ package org.apache.pulsar.functions.instance; import com.google.common.annotations.VisibleForTesting; - import java.time.Clock; import java.util.concurrent.ThreadLocalRandom; import org.apache.pulsar.client.api.HashingScheme; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java index a7f88ce4f10c4..5501a316516aa 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java @@ -33,7 +33,6 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SizeUnit; -import org.apache.pulsar.common.functions.AuthenticationConfig; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.sink.PulsarSink; @@ -157,7 +156,9 @@ public static Map getProperties(Function.FunctionDetails.Compone return properties; } - public static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig) throws PulsarClientException { + + public static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig) + throws PulsarClientException { return createPulsarClient(pulsarServiceUrl, authConfig, Optional.empty()); } @@ -188,7 +189,8 @@ && isNotBlank(authConfig.getClientAuthenticationParameters())) { return null; } - public static PulsarAdmin createPulsarAdminClient(String pulsarWebServiceUrl, AuthenticationConfig authConfig) throws PulsarClientException { + public static PulsarAdmin createPulsarAdminClient(String pulsarWebServiceUrl, AuthenticationConfig authConfig) + throws PulsarClientException { PulsarAdminBuilder pulsarAdminBuilder = null; if (isNotBlank(pulsarWebServiceUrl)) { pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(pulsarWebServiceUrl); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java index 741a817493581..83b64dc6ad32c 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java @@ -31,7 +31,6 @@ import lombok.Data; import lombok.Getter; import lombok.extern.slf4j.Slf4j; - import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.Record; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java index 13ab7a42d259c..83b3689e80fa9 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java @@ -20,10 +20,8 @@ import java.util.Map; import java.util.Optional; - import lombok.AllArgsConstructor; import lombok.Data; - import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Schema; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java index d822aa06c0a39..be5c479625240 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java @@ -19,17 +19,15 @@ package org.apache.pulsar.functions.instance.stats; import com.google.common.collect.EvictingQueue; -import io.prometheus.client.CollectorRegistry; -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.functions.proto.InstanceCommunication; -import org.apache.pulsar.functions.proto.Function; - import java.io.IOException; import java.io.StringWriter; import java.util.Arrays; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.InstanceCommunication; @Slf4j public abstract class ComponentStatsManager implements AutoCloseable { @@ -70,8 +68,8 @@ public static ComponentStatsManager getStatsManager(FunctionCollectorRegistry co } public ComponentStatsManager(FunctionCollectorRegistry collectorRegistry, - String[] metricsLabels, - ScheduledExecutorService scheduledExecutorService) { + String[] metricsLabels, + ScheduledExecutorService scheduledExecutorService) { this.collectorRegistry = collectorRegistry; this.metricsLabels = metricsLabels; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index 80d82a12df60e..552aa97060031 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -18,9 +18,19 @@ */ package org.apache.pulsar.functions.sink; +import static org.apache.commons.lang.StringUtils.isEmpty; import com.google.common.annotations.VisibleForTesting; - import java.nio.charset.StandardCharsets; +import java.security.Security; +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; import lombok.Builder; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -47,31 +57,18 @@ import org.apache.pulsar.common.functions.ProducerConfig; import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.common.util.Reflections; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.instance.FunctionResultRouter; import org.apache.pulsar.functions.instance.SinkRecord; import org.apache.pulsar.functions.instance.stats.ComponentStatsManager; import org.apache.pulsar.functions.source.PulsarRecord; import org.apache.pulsar.functions.source.TopicSchema; -import org.apache.pulsar.common.util.Reflections; import org.apache.pulsar.functions.utils.CryptoUtils; import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.SinkContext; import org.bouncycastle.jce.provider.BouncyCastleProvider; -import java.security.Security; -import java.util.ArrayList; -import java.util.Base64; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - -import static org.apache.commons.lang.StringUtils.isEmpty; - @Slf4j public class PulsarSink implements Sink { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java index 2999a04bb59c6..40a53c3f3708d 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java @@ -18,14 +18,13 @@ */ package org.apache.pulsar.functions.sink; +import java.util.Map; import lombok.Getter; import lombok.Setter; import lombok.ToString; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.ProducerConfig; -import java.util.Map; - @Getter @Setter @ToString diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java index 35ad82e8b97f9..e53e7fe9ace80 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java @@ -20,12 +20,10 @@ import java.util.Map; import java.util.Optional; - import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.ToString; - import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Schema; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/RecordWithEncryptionContext.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/RecordWithEncryptionContext.java index 5ca78e29a8de6..58557c9150e09 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/RecordWithEncryptionContext.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/RecordWithEncryptionContext.java @@ -19,7 +19,6 @@ package org.apache.pulsar.functions.source; import java.util.Optional; - import org.apache.pulsar.common.api.EncryptionContext; import org.apache.pulsar.functions.api.Record; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java index 7ed6b6cd7245b..5b7aed2f0d4b5 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java @@ -23,7 +23,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; - import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.PulsarClient; diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java index 9a228bd996fd8..5bb61fd7bd2af 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java @@ -189,11 +189,6 @@ public void testGetPulsarAdmin() throws Exception { assertEquals(context.getPulsarAdmin(), pulsarAdmin); } - @Test(expectedExceptions = IllegalArgumentException.class) - public void testGetPulsarAdminWithNonExistClusterName() { - assertNull(context.getPulsarAdmin("foo")); - } - @Test(expectedExceptions = IllegalStateException.class) public void testGetPulsarAdminWithExposePulsarAdminDisabled() { config.setExposePulsarAdminClientEnabled(false); diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishExternalFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishExternalFunction.java deleted file mode 100644 index 962322c4765d8..0000000000000 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishExternalFunction.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pulsar.functions.api.examples; - -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.functions.api.Context; -import org.apache.pulsar.functions.api.Function; - -/** - * Example function that uses the built in publish function in the context - * to publish to a desired cluster & topic based on config. - */ -public class PublishExternalFunction implements Function { - @Override - public Void process(String input, Context context) { - // Ensure the desired cluster `external` config is provided when creating the function - String externalCluster = "external"; - String publishTopic = (String) context.getUserConfigValueOrDefault("external-topic", "default-external-topic"); - String output = String.format("%s!", input); - try { - context.newOutputMessage(externalCluster, publishTopic, Schema.STRING).value(output).sendAsync(); - } catch (PulsarClientException e) { - context.getLogger().error(e.toString()); - } - return null; - } -} diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java index fb6c626cfeb89..d672b4bc2d2eb 100644 --- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java +++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java @@ -52,7 +52,7 @@ import lombok.Builder; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.common.functions.AuthenticationConfig; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Utils; import org.apache.pulsar.common.io.SinkConfig; @@ -69,6 +69,7 @@ import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory; import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory; import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; +import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider; import org.apache.pulsar.functions.secretsprovider.SecretsProvider; import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator; import org.apache.pulsar.functions.secretsproviderconfigurator.NameAndConfigBasedSecretsProviderConfigurator; diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index 77396eb017059..4b840a3daadfd 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -88,7 +88,6 @@ message FunctionDetails { bool retainOrdering = 21; bool retainKeyOrdering = 22; SubscriptionPosition subscriptionPosition = 23; - string externalPulsarsMap = 24; } message ConsumerSpec { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProvider.java index 929affd5738dd..5a0bd79e06020 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProvider.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProvider.java @@ -20,7 +20,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.client.impl.auth.AuthenticationToken; -import org.apache.pulsar.common.functions.AuthenticationConfig; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.proto.Function; import java.util.Optional; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java index 08ea6ab934bf2..00a8ee9531510 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java @@ -19,7 +19,7 @@ package org.apache.pulsar.functions.auth; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; -import org.apache.pulsar.common.functions.AuthenticationConfig; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.common.util.Reflections; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java index 4e222e6842270..214ebf7ca1e80 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1DeleteOptions; import io.kubernetes.client.openapi.models.V1ObjectMeta; import io.kubernetes.client.openapi.models.V1PodSpec; import io.kubernetes.client.openapi.models.V1Secret; @@ -32,7 +33,7 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.client.impl.auth.AuthenticationToken; -import org.apache.pulsar.common.functions.AuthenticationConfig; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.utils.Actions; import org.apache.pulsar.functions.utils.FunctionCommon; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java index 170e6060dd41d..1881b5556008d 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java @@ -34,7 +34,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.nar.NarClassLoader; -import org.apache.pulsar.common.functions.AuthenticationConfig; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceCache; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java index 0f034d096472d..f620951c8e9f1 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java @@ -20,7 +20,7 @@ package org.apache.pulsar.functions.runtime; import org.apache.pulsar.functions.auth.FunctionAuthProvider; -import org.apache.pulsar.common.functions.AuthenticationConfig; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index 8c359a90623f7..60ba6b339de3c 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -44,7 +44,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.util.ObjectMapperFactory; -import org.apache.pulsar.common.functions.AuthenticationConfig; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.instance.go.GoInstanceConfig; import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java index a463738ff9190..a483bd07fbd00 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java @@ -57,7 +57,7 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider; -import org.apache.pulsar.common.functions.AuthenticationConfig; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.instance.InstanceUtils; import org.apache.pulsar.functions.proto.Function; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java index e1b7970158f4b..2a47da66ae742 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java @@ -36,7 +36,7 @@ import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.functions.auth.FunctionAuthProvider; import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider; -import org.apache.pulsar.common.functions.AuthenticationConfig; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.runtime.RuntimeCustomizer; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java index 3aaaa21258ed8..7d585bcd77b22 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java @@ -30,7 +30,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.common.functions.AuthenticationConfig; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceCache; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function.FunctionDetails; @@ -51,6 +51,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; /** * A function container implemented using java thread. diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java index aaa000462b998..b8338fd392f93 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java @@ -27,11 +27,12 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.auth.FunctionAuthProvider; -import org.apache.pulsar.common.functions.AuthenticationConfig; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.runtime.RuntimeCustomizer; import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.runtime.RuntimeUtils; +import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry; import org.apache.pulsar.functions.worker.ConnectorsManager; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java index ad4d40064c8af..e0a9c6472264d 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java @@ -173,6 +173,7 @@ public void start() throws Exception { secretsProvider, collectorRegistry, functionClassLoader); + log.info("ThreadContainer starting function with instance config {}", instanceConfig); this.fnThread = new Thread(threadGroup, javaInstanceRunnable, String.format("%s-%s", diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java index d1f450c958141..c020bf9ce9ec9 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java @@ -26,10 +26,11 @@ import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.functions.auth.FunctionAuthProvider; -import org.apache.pulsar.common.functions.AuthenticationConfig; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceCache; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.instance.InstanceUtils; @@ -47,6 +48,8 @@ import java.util.Optional; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + /** * Thread based function container factory implementation. */ diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProviderTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProviderTest.java index 3f3afddba2140..924653fec1189 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProviderTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProviderTest.java @@ -20,7 +20,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.client.impl.auth.AuthenticationToken; -import org.apache.pulsar.common.functions.AuthenticationConfig; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.proto.Function; import org.testng.Assert; import org.testng.annotations.Test; diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java index d7b2b4526c50a..38dd91426b46b 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java @@ -38,7 +38,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.client.impl.auth.AuthenticationToken; -import org.apache.pulsar.common.functions.AuthenticationConfig; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.proto.Function; import org.testng.Assert; import org.testng.annotations.Test; diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java index 176d5e901dda9..1b8946b64b41d 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java @@ -32,7 +32,7 @@ import org.apache.pulsar.functions.auth.FunctionAuthProvider; import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider; import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider; -import org.apache.pulsar.common.functions.AuthenticationConfig; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.runtime.RuntimeCustomizer; diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java index 8346e9d7e3f9f..6f6662da4583e 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java @@ -24,7 +24,7 @@ import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SizeUnit; -import org.apache.pulsar.common.functions.AuthenticationConfig; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.RestException; import org.apache.pulsar.functions.instance.InstanceUtils; diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/CryptoUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/CryptoUtils.java index 1025113394bdb..82181141b9f04 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/CryptoUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/CryptoUtils.java @@ -19,14 +19,14 @@ package org.apache.pulsar.functions.utils; +import static org.apache.commons.lang.StringUtils.isEmpty; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Type; import java.util.Arrays; import java.util.Map; - -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.ProducerCryptoFailureAction; @@ -34,8 +34,6 @@ import org.apache.pulsar.common.util.ClassLoaderUtils; import org.apache.pulsar.functions.proto.Function; -import static org.apache.commons.lang.StringUtils.isEmpty; - public final class CryptoUtils { public static Function.CryptoSpec convert(CryptoConfig config) { @@ -94,7 +92,7 @@ public static CryptoKeyReader getCryptoKeyReaderInstance(String className, Map ctor = cryptoClass.getConstructor(java.util.Map.class); + Constructor ctor = cryptoClass.getConstructor(Map.class); return (CryptoKeyReader) ctor.newInstance(configs); } catch (NoSuchMethodException e) { throw new RuntimeException("Key reader class does not have constructor accepts map", e); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java index 1b4f45d7fea24..2e433adccb0f0 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java @@ -18,13 +18,28 @@ */ package org.apache.pulsar.functions.utils; +import static org.apache.commons.lang3.StringUtils.isEmpty; import com.google.protobuf.AbstractMessage.Builder; import com.google.protobuf.MessageOrBuilder; import com.google.protobuf.util.JsonFormat; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; import java.io.InputStream; -import java.nio.file.CopyOption; +import java.io.ObjectOutputStream; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.net.MalformedURLException; +import java.net.ServerSocket; +import java.net.URISyntaxException; +import java.net.URL; import java.nio.file.Files; import java.nio.file.StandardCopyOption; +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -45,27 +60,6 @@ import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.Source; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; -import java.net.MalformedURLException; -import java.net.ServerSocket; -import java.net.URISyntaxException; -import java.net.URL; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; -import java.util.Collection; -import java.util.Collections; -import java.util.UUID; - -import static org.apache.commons.lang3.StringUtils.isEmpty; - /** * Utils used for runtime. */ diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index c6a3cada08520..13089e43193a7 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -212,7 +212,24 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader sinkSpecBuilder.setTypeClassName(typeArgs[1].getName()); } if (functionConfig.getProducerConfig() != null) { - sinkSpecBuilder.setProducerSpec(ProducerConfigUtils.convert(functionConfig.getProducerConfig())); + ProducerConfig producerConf = functionConfig.getProducerConfig(); + Function.ProducerSpec.Builder pbldr = Function.ProducerSpec.newBuilder(); + if (producerConf.getMaxPendingMessages() != null) { + pbldr.setMaxPendingMessages(producerConf.getMaxPendingMessages()); + } + if (producerConf.getMaxPendingMessagesAcrossPartitions() != null) { + pbldr.setMaxPendingMessagesAcrossPartitions(producerConf.getMaxPendingMessagesAcrossPartitions()); + } + if (producerConf.getUseThreadLocalProducers() != null) { + pbldr.setUseThreadLocalProducers(producerConf.getUseThreadLocalProducers()); + } + if (producerConf.getCryptoConfig() != null) { + pbldr.setCryptoSpec(CryptoUtils.convert(producerConf.getCryptoConfig())); + } + if (producerConf.getBatchBuilder() != null) { + pbldr.setBatchBuilder(producerConf.getBatchBuilder()); + } + sinkSpecBuilder.setProducerSpec(pbldr.build()); } functionDetailsBuilder.setSink(sinkSpecBuilder); @@ -277,10 +294,6 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader functionDetailsBuilder.setSecretsMap(new Gson().toJson(functionConfig.getSecrets())); } - if (functionConfig.getExternalPulsars() != null && !functionConfig.getExternalPulsars().isEmpty()) { - functionDetailsBuilder.setExternalPulsarsMap(new Gson().toJson(functionConfig.getExternalPulsars())); - } - if (functionConfig.getAutoAck() != null) { functionDetailsBuilder.setAutoAck(functionConfig.getAutoAck()); } else { @@ -367,7 +380,22 @@ public static FunctionConfig convertFromDetails(FunctionDetails functionDetails) functionConfig.setOutputSchemaType(functionDetails.getSink().getSchemaType()); } if (functionDetails.getSink().getProducerSpec() != null) { - functionConfig.setProducerConfig(ProducerConfigUtils.convertFromSpec(functionDetails.getSink().getProducerSpec())); + Function.ProducerSpec spec = functionDetails.getSink().getProducerSpec(); + ProducerConfig producerConfig = new ProducerConfig(); + if (spec.getMaxPendingMessages() != 0) { + producerConfig.setMaxPendingMessages(spec.getMaxPendingMessages()); + } + if (spec.getMaxPendingMessagesAcrossPartitions() != 0) { + producerConfig.setMaxPendingMessagesAcrossPartitions(spec.getMaxPendingMessagesAcrossPartitions()); + } + if (spec.hasCryptoSpec()) { + producerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(spec.getCryptoSpec())); + } + if (spec.getBatchBuilder() != null) { + producerConfig.setBatchBuilder(spec.getBatchBuilder()); + } + producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers()); + functionConfig.setProducerConfig(producerConfig); } if (!isEmpty(functionDetails.getLogTopic())) { functionConfig.setLogTopic(functionDetails.getLogTopic()); @@ -409,13 +437,6 @@ public static FunctionConfig convertFromDetails(FunctionDetails functionDetails) functionConfig.setSecrets(secretsMap); } - if (isNotEmpty(functionDetails.getExternalPulsarsMap())) { - Type type = new TypeToken>() { - }.getType(); - Map externalPulsarsMap = new Gson().fromJson(functionDetails.getExternalPulsarsMap(), type); - functionConfig.setExternalPulsars(externalPulsarsMap); - } - if (functionDetails.hasResources()) { Resources resources = new Resources(); resources.setCpu(functionDetails.getResources().getCpu()); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ProducerConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ProducerConfigUtils.java deleted file mode 100644 index 8e73ff80bbb67..0000000000000 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ProducerConfigUtils.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pulsar.functions.utils; - -import org.apache.pulsar.common.functions.ProducerConfig; -import org.apache.pulsar.functions.proto.Function; - -public class ProducerConfigUtils { - public static Function.ProducerSpec convert(ProducerConfig conf) { - Function.ProducerSpec.Builder pbldr = Function.ProducerSpec.newBuilder(); - if (conf.getMaxPendingMessages() != null) { - pbldr.setMaxPendingMessages(conf.getMaxPendingMessages()); - } - if (conf.getMaxPendingMessagesAcrossPartitions() != null) { - pbldr.setMaxPendingMessagesAcrossPartitions(conf.getMaxPendingMessagesAcrossPartitions()); - } - if (conf.getUseThreadLocalProducers() != null) { - pbldr.setUseThreadLocalProducers(conf.getUseThreadLocalProducers()); - } - if (conf.getBatchBuilder() != null) { - pbldr.setBatchBuilder(conf.getBatchBuilder()); - } - - return pbldr.build(); - } - - public static ProducerConfig convertFromSpec(Function.ProducerSpec spec) { - ProducerConfig producerConfig = new ProducerConfig(); - if (spec.getMaxPendingMessages() != 0) { - producerConfig.setMaxPendingMessages(spec.getMaxPendingMessages()); - } - if (spec.getMaxPendingMessagesAcrossPartitions() != 0) { - producerConfig.setMaxPendingMessagesAcrossPartitions(spec.getMaxPendingMessagesAcrossPartitions()); - } - if (spec.getBatchBuilder() != null) { - producerConfig.setBatchBuilder(spec.getBatchBuilder()); - } - producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers()); - return producerConfig; - } -} diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index f2c51474ff143..9049eb6f8ce9a 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -29,6 +29,7 @@ import lombok.extern.slf4j.Slf4j; import net.jodah.typetools.TypeResolver; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.functions.ProducerConfig; import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.io.BatchSourceConfig; import org.apache.pulsar.common.io.ConnectorDefinition; @@ -51,6 +52,7 @@ import java.util.Map; import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.pulsar.functions.utils.FunctionCommon.convertProcessingGuarantee; import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType; @@ -145,7 +147,24 @@ public static FunctionDetails convert(SourceConfig sourceConfig, ExtractedSource } if (sourceConfig.getProducerConfig() != null) { - sinkSpecBuilder.setProducerSpec(ProducerConfigUtils.convert(sourceConfig.getProducerConfig())); + ProducerConfig conf = sourceConfig.getProducerConfig(); + Function.ProducerSpec.Builder pbldr = Function.ProducerSpec.newBuilder(); + if (conf.getMaxPendingMessages() != null) { + pbldr.setMaxPendingMessages(conf.getMaxPendingMessages()); + } + if (conf.getMaxPendingMessagesAcrossPartitions() != null) { + pbldr.setMaxPendingMessagesAcrossPartitions(conf.getMaxPendingMessagesAcrossPartitions()); + } + if (conf.getUseThreadLocalProducers() != null) { + pbldr.setUseThreadLocalProducers(conf.getUseThreadLocalProducers()); + } + if (conf.getCryptoConfig() != null) { + pbldr.setCryptoSpec(CryptoUtils.convert(conf.getCryptoConfig())); + } + if (conf.getBatchBuilder() != null) { + pbldr.setBatchBuilder(conf.getBatchBuilder()); + } + sinkSpecBuilder.setProducerSpec(pbldr.build()); } sinkSpecBuilder.setForwardSourceMessageProperty(true); @@ -220,7 +239,22 @@ public static SourceConfig convertFromDetails(FunctionDetails functionDetails) { sourceConfig.setSerdeClassName(sinkSpec.getSerDeClassName()); } if (sinkSpec.getProducerSpec() != null) { - sourceConfig.setProducerConfig(ProducerConfigUtils.convertFromSpec(sinkSpec.getProducerSpec())); + Function.ProducerSpec spec = sinkSpec.getProducerSpec(); + ProducerConfig producerConfig = new ProducerConfig(); + if (spec.getMaxPendingMessages() != 0) { + producerConfig.setMaxPendingMessages(spec.getMaxPendingMessages()); + } + if (spec.getMaxPendingMessagesAcrossPartitions() != 0) { + producerConfig.setMaxPendingMessagesAcrossPartitions(spec.getMaxPendingMessagesAcrossPartitions()); + } + if (spec.hasCryptoSpec()) { + producerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(spec.getCryptoSpec())); + } + if (spec.getBatchBuilder() != null) { + producerConfig.setBatchBuilder(spec.getBatchBuilder()); + } + producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers()); + sourceConfig.setProducerConfig(producerConfig); } if (functionDetails.hasResources()) { Resources resources = new Resources(); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java index bcde225bf6e6d..e2e6e39e273ff 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java @@ -18,14 +18,13 @@ */ package org.apache.pulsar.functions.utils.io; +import java.nio.file.Path; +import java.util.List; import lombok.Builder; import lombok.Data; import org.apache.pulsar.common.io.ConfigFieldDefinition; import org.apache.pulsar.common.io.ConnectorDefinition; -import java.nio.file.Path; -import java.util.List; - @Builder @Data public class Connector { diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java index 3d378e8ce3441..39cfc25a45aba 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java @@ -27,11 +27,14 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.*; - +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; - import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.io.ConfigFieldDefinition; import org.apache.pulsar.common.io.ConnectorDefinition; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 5b896f936f846..e074323bc970f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -48,7 +48,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Reader; -import org.apache.pulsar.common.functions.AuthenticationConfig; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.common.functions.WorkerInfo; import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.policies.data.FunctionInstanceStatsImpl; diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index 1739c91d739ee..afc75f5c12420 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -53,7 +53,7 @@ import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; -import org.apache.pulsar.common.functions.AuthenticationConfig; +import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.functions.WorkerInfo; import org.apache.pulsar.common.util.ObjectMapperFactory; diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchSource.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchSource.java index ae58f06620d7f..7d43893be63bf 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchSource.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchSource.java @@ -19,13 +19,12 @@ package org.apache.pulsar.io.core; +import java.util.Map; +import java.util.function.Consumer; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; import org.apache.pulsar.functions.api.Record; -import java.util.Map; -import java.util.function.Consumer; - /** * Interface for writing Batch sources * diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java index 0c76cbded38f1..5e4de64e8cc7b 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java @@ -20,7 +20,6 @@ import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; - import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; import org.apache.pulsar.functions.api.StateStore; diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java index d3dbff50b0fe4..f3fc12d59bfdf 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java @@ -19,7 +19,6 @@ package org.apache.pulsar.io.core; import java.util.Map; - import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; import org.apache.pulsar.functions.api.Record; diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java index cbc3fd877b8db..b283a090990ad 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java @@ -19,10 +19,9 @@ package org.apache.pulsar.io.core; import java.util.Collection; - -import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java index b5461d82c1a72..0578b5de55b93 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java @@ -19,7 +19,6 @@ package org.apache.pulsar.io.core; import java.util.Map; - import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; import org.apache.pulsar.functions.api.Record;