Skip to content

Commit

Permalink
Enable pulsar function to send message to external pulsar cluster (ap…
Browse files Browse the repository at this point in the history
…ache#8434)

### Motivation

Enable pulsar function to send message to external pulsar cluster

### Modifications

*Describe the modifications you've done.*
  • Loading branch information
nlu90 authored Nov 21, 2020
1 parent 68759ff commit 66231e3
Show file tree
Hide file tree
Showing 31 changed files with 362 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.ExternalPulsarConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
Expand Down Expand Up @@ -318,6 +319,8 @@ abstract class FunctionDetailsCommand extends BaseCommand {
protected String customRuntimeOptions;
@Parameter(names = "--dead-letter-topic", description = "The topic where messages that are not processed successfully are sent to")
protected String deadLetterTopic;
@Parameter(names = "--external-pulsars", description = "The map of external pulsar cluster name to its configuration (as a JSON string)")
protected String externalPulsars;
protected FunctionConfig functionConfig;
protected String userCodeFile;

Expand Down Expand Up @@ -396,6 +399,11 @@ void processArguments() throws Exception {
if (null != output) {
functionConfig.setOutput(output);
}
if (null != externalPulsars) {
Type type = new TypeToken<Map<String, ExternalPulsarConfig>>() {
}.getType();
functionConfig.setExternalPulsars(new Gson().fromJson(externalPulsars, type));
}
if (null != producerConfig) {
Type type = new TypeToken<ProducerConfig>() {}.getType();
functionConfig.setProducerConfig(new Gson().fromJson(producerConfig, type));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.instance;
package org.apache.pulsar.common.functions;

import lombok.Builder;
import lombok.Data;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.common.functions;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

/**
* Configuration of extra pulsar clusters to sent output message.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
public class ExternalPulsarConfig {
private String name;
private String serviceURL;
private AuthenticationConfig authConfig;
private ProducerConfig producerConfig;
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public enum Runtime {
private String batchBuilder;
private Boolean forwardSourceMessageProperty;
private Map<String, Object> userConfig;
private Map<String, ExternalPulsarConfig> externalPulsars;
// This is a map of secretName(aka how the secret is going to be
// accessed in the function via context) to an object that
// encapsulates how the secret is fetched by the underlying
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,18 @@ public interface Context {
*/
<O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException;

/**
* New output message using schema for serializing to the topic in the cluster
*
* @parem clusterName the name of the cluster for topic
* @param topicName The name of the topic for output message
* @param schema provide a way to convert between serialized data and domain objects
* @param <O>
* @return the message builder instance
* @throws PulsarClientException
*/
<O> TypedMessageBuilder<O> newOutputMessage(String clusterName, String topicName, Schema<O> schema) throws PulsarClientException;

/**
* Create a ConsumerBuilder with the schema.
*
Expand All @@ -312,4 +324,4 @@ public interface Context {
* @throws PulsarClientException
*/
<O> ConsumerBuilder<O> newConsumerBuilder(Schema<O> schema) throws PulsarClientException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -46,6 +47,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.common.functions.ExternalPulsarConfig;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Record;
Expand All @@ -59,8 +61,8 @@
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.ProducerConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
Expand All @@ -78,12 +80,8 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
// Per Message related
private Record<?> record;

private PulsarClient client;
private Map<String, Producer<?>> publishProducers;
private ThreadLocal<Map<String, Producer<?>>> tlPublishProducers;
private ProducerBuilderImpl<?> producerBuilder;

private final TopicSchema topicSchema;
private String defaultPulsarCluster;
private Map<String, PulsarCluster> externalPulsarClusters;

private final SecretsProvider secretsProvider;
private final Map<String, Object> secretsMap;
Expand Down Expand Up @@ -117,27 +115,25 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
StateManager stateManager) {
this.config = config;
this.logger = logger;
this.client = client;
this.topicSchema = new TopicSchema(client);
this.statsManager = statsManager;

this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
boolean useThreadLocalProducers = false;
if (config.getFunctionDetails().getSink().getProducerSpec() != null) {
if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages() != 0) {
this.producerBuilder.maxPendingMessages(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages());
}
if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) {
this.producerBuilder.maxPendingMessagesAcrossPartitions(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions());
this.externalPulsarClusters = new HashMap<>();
if (!config.getFunctionDetails().getExternalPulsarsMap().isEmpty()) {
Map<String, ExternalPulsarConfig> externalPulsarConfig = new Gson().fromJson(config.getFunctionDetails().getExternalPulsarsMap(),
new TypeToken<Map<String, ExternalPulsarConfig>>() {
}.getType());
for (Map.Entry<String, ExternalPulsarConfig> entry : externalPulsarConfig.entrySet()) {
try {
this.externalPulsarClusters.put(entry.getKey(),
new PulsarCluster(InstanceUtils.createPulsarClient(entry.getValue().getServiceURL(), entry.getValue().getAuthConfig()),
ProducerConfigUtils.convert(entry.getValue().getProducerConfig())));
} catch (PulsarClientException ex) {
throw new RuntimeException("failed to create pulsar client for external cluster: " + entry.getKey(), ex);
}
}
useThreadLocalProducers = config.getFunctionDetails().getSink().getProducerSpec().getUseThreadLocalProducers();
}
if (useThreadLocalProducers) {
tlPublishProducers = new ThreadLocal<>();
} else {
publishProducers = new HashMap<>();
}
this.defaultPulsarCluster = "default-" + UUID.randomUUID();
this.externalPulsarClusters.put(defaultPulsarCluster, new PulsarCluster(client, config.getFunctionDetails().getSink().getProducerSpec()));

if (config.getFunctionDetails().getUserConfig().isEmpty()) {
userConfigs = new HashMap<>();
Expand Down Expand Up @@ -392,25 +388,34 @@ public <O> CompletableFuture<Void> publish(String topicName, O object) {
@SuppressWarnings("unchecked")
@Override
public <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName) {
return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false));
return publish(topicName, object, (Schema<O>) externalPulsarClusters.get(defaultPulsarCluster).getTopicSchema().getSchema(topicName, object, schemaOrSerdeClassName, false));
}

@Override
public <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException {
return newOutputMessage(defaultPulsarCluster, topicName, schema);
}

@Override
public <O> TypedMessageBuilder<O> newOutputMessage(String pulsarName, String topicName, Schema<O> schema) throws PulsarClientException {
MessageBuilderImpl<O> messageBuilder = new MessageBuilderImpl<>();
TypedMessageBuilder<O> typedMessageBuilder = getProducer(topicName, schema).newMessage();
TypedMessageBuilder<O> typedMessageBuilder = getProducer(pulsarName, topicName, schema).newMessage();
messageBuilder.setUnderlyingBuilder(typedMessageBuilder);
return messageBuilder;
}

@Override
public <O> ConsumerBuilder<O> newConsumerBuilder(Schema<O> schema) throws PulsarClientException {
return this.client.newConsumer(schema);
return this.externalPulsarClusters.get(defaultPulsarCluster).getClient().newConsumer(schema);
}

public <O> CompletableFuture<Void> publish(String topicName, O object, Schema<O> schema) {
return publish(defaultPulsarCluster, topicName, object, schema);
}

public <O> CompletableFuture<Void> publish(String pulsarName, String topicName, O object, Schema<O> schema) {
try {
return newOutputMessage(topicName, schema).value(object).sendAsync().thenApply(msgId -> null);
return newOutputMessage(pulsarName, topicName, schema).value(object).sendAsync().thenApply(msgId -> null);
} catch (PulsarClientException e) {
logger.error("Failed to create Producer while doing user publish", e);
return FutureUtil.failedFuture(e);
Expand All @@ -432,22 +437,23 @@ public void recordMetric(String metricName, double value) {
}
}

private <O> Producer<O> getProducer(String topicName, Schema<O> schema) throws PulsarClientException {
private <O> Producer<O> getProducer(String pulsarName, String topicName, Schema<O> schema) throws PulsarClientException {
Producer<O> producer;
if (tlPublishProducers != null) {
Map<String, Producer<?>> producerMap = tlPublishProducers.get();
PulsarCluster pulsar = externalPulsarClusters.get(pulsarName);
if (pulsar.getTlPublishProducers() != null) {
Map<String, Producer<?>> producerMap = pulsar.getTlPublishProducers().get();
if (producerMap == null) {
producerMap = new HashMap<>();
tlPublishProducers.set(producerMap);
pulsar.getTlPublishProducers().set(producerMap);
}
producer = (Producer<O>) producerMap.get(topicName);
} else {
producer = (Producer<O>) publishProducers.get(topicName);
producer = (Producer<O>) pulsar.getPublishProducers().get(topicName);
}

if (producer == null) {

Producer<O> newProducer = ((ProducerBuilderImpl<O>) producerBuilder.clone())
Producer<O> newProducer = ((ProducerBuilderImpl<O>) pulsar.getProducerBuilder().clone())
.schema(schema)
.blockIfQueueFull(true)
.enableBatching(true)
Expand All @@ -468,10 +474,10 @@ private <O> Producer<O> getProducer(String topicName, Schema<O> schema) throws P
this.config.getInstanceId()))
.create();

if (tlPublishProducers != null) {
tlPublishProducers.get().put(topicName, newProducer);
if (pulsar.getTlPublishProducers() != null) {
pulsar.getTlPublishProducers().get().put(topicName, newProducer);
} else {
Producer<O> existingProducer = (Producer<O>) publishProducers.putIfAbsent(topicName, newProducer);
Producer<O> existingProducer = (Producer<O>) pulsar.getPublishProducers().putIfAbsent(topicName, newProducer);

if (existingProducer != null) {
// The value in the map was not updated after the concurrent put
Expand Down Expand Up @@ -621,15 +627,18 @@ public void setUnderlyingBuilder(TypedMessageBuilder<O> underlyingBuilder) {
public void close() {
List<CompletableFuture> futures = new LinkedList<>();

if (publishProducers != null) {
for (Producer<?> producer : publishProducers.values()) {
futures.add(producer.closeAsync());
for (Map.Entry<String, PulsarCluster> pulsarEntry : externalPulsarClusters.entrySet()) {
PulsarCluster pulsar = pulsarEntry.getValue();
if (pulsar.getPublishProducers() != null) {
for (Producer<?> producer : pulsar.getPublishProducers().values()) {
futures.add(producer.closeAsync());
}
}
}

if (tlPublishProducers != null) {
for (Producer<?> producer : tlPublishProducers.get().values()) {
futures.add(producer.closeAsync());
if (pulsar.getTlPublishProducers() != null) {
for (Producer<?> producer : pulsar.getTlPublishProducers().get().values()) {
futures.add(producer.closeAsync());
}
}
}

Expand All @@ -639,4 +648,4 @@ public void close() {
logger.warn("Failed to close producers", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;

import lombok.experimental.UtilityClass;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.sink.PulsarSink;
Expand Down Expand Up @@ -145,4 +150,25 @@ public static Map<String, String> getProperties(Function.FunctionDetails.Compone
}
return properties;
}

public static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig) throws PulsarClientException {
ClientBuilder clientBuilder = null;
if (isNotBlank(pulsarServiceUrl)) {
clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl);
if (authConfig != null) {
if (isNotBlank(authConfig.getClientAuthenticationPlugin())
&& isNotBlank(authConfig.getClientAuthenticationParameters())) {
clientBuilder.authentication(authConfig.getClientAuthenticationPlugin(),
authConfig.getClientAuthenticationParameters());
}
clientBuilder.enableTls(authConfig.isUseTls());
clientBuilder.allowTlsInsecureConnection(authConfig.isTlsAllowInsecureConnection());
clientBuilder.enableTlsHostnameVerification(authConfig.isTlsHostnameVerificationEnable());
clientBuilder.tlsTrustCertsFilePath(authConfig.getTlsTrustCertsFilePath());
}
clientBuilder.ioThreads(Runtime.getRuntime().availableProcessors());
return clientBuilder.build();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {

// input topic consumer & output topic producer
private final PulsarClientImpl client;
//private final Map<String, PulsarClient> pulsarClientMap;

private LogAppender logAppender;

Expand Down
Loading

0 comments on commit 66231e3

Please sign in to comment.