Skip to content

Commit

Permalink
Revert "Enable pulsar function to send message to external pulsar clu… (
Browse files Browse the repository at this point in the history
apache#10843)

This reverts commit 66231e3.

### Motivation

Reverting since the external cluster configuration was not not needed as there are better ways to interact with external pulsar clusters.  A user can simply instantiate a Pulsar client, producer, or consumer in a function to interact with another pulsar cluster.  This feature is also a security risk as it does not security handle authentication data and it will be written as plain text in the internal function's metadata topic.
  • Loading branch information
merlimat authored Jun 7, 2021
1 parent 02d5eb3 commit ed10d82
Show file tree
Hide file tree
Showing 52 changed files with 220 additions and 342 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
*/
package org.apache.pulsar.common.functions;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Configuration to aggregate various authentication params.
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class AuthenticationConfig {
private String clientAuthenticationPlugin;
private String clientAuthenticationParameters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public enum Runtime {
private String batchBuilder;
private Boolean forwardSourceMessageProperty;
private Map<String, Object> userConfig;
private Map<String, ExternalPulsarConfig> externalPulsars;
// This is a map of secretName(aka how the secret is going to be
// accessed in the function via context) to an object that
// encapsulates how the secret is fetched by the underlying
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.ExternalPulsarConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
Expand Down Expand Up @@ -322,8 +321,6 @@ abstract class FunctionDetailsCommand extends BaseCommand {
protected String customRuntimeOptions;
@Parameter(names = "--dead-letter-topic", description = "The topic where messages that are not processed successfully are sent to")
protected String deadLetterTopic;
@Parameter(names = "--external-pulsars", description = "The map of external pulsar cluster name to its configuration (as a JSON string)")
protected String externalPulsars;
protected FunctionConfig functionConfig;
protected String userCodeFile;

Expand Down Expand Up @@ -402,11 +399,6 @@ void processArguments() throws Exception {
if (null != output) {
functionConfig.setOutput(output);
}
if (null != externalPulsars) {
Type type = new TypeToken<Map<String, ExternalPulsarConfig>>() {
}.getType();
functionConfig.setExternalPulsars(new Gson().fromJson(externalPulsars, type));
}
if (null != producerConfig) {
Type type = new TypeToken<ProducerConfig>() {}.getType();
functionConfig.setProducerConfig(new Gson().fromJson(producerConfig, type));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static TopicName get(String domain, String tenant, String namespace, Stri
}

public static TopicName get(String domain, String tenant, String cluster, String namespace,
String topic) {
String topic) {
String name = domain + "://" + tenant + '/' + cluster + '/' + namespace + '/' + topic;
return TopicName.get(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,6 @@ public interface Context {
*/
PulsarAdmin getPulsarAdmin();

/**
* Get the pulsar admin client by cluster name.
*
* @param clusterName The name of the cluster name for pulsar admin client
* @return The instance of pulsar admin client
*/
PulsarAdmin getPulsarAdmin(String clusterName);

/**
* Record a user defined metric.
*
Expand Down Expand Up @@ -320,18 +312,6 @@ public interface Context {
*/
<O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException;

/**
* New output message using schema for serializing to the topic in the cluster
*
* @param clusterName the name of the cluster for topic
* @param topicName The name of the topic for output message
* @param schema provide a way to convert between serialized data and domain objects
* @param <O>
* @return the message builder instance
* @throws PulsarClientException
*/
<O> TypedMessageBuilder<O> newOutputMessage(String clusterName, String topicName, Schema<O> schema) throws PulsarClientException;

/**
* Create a ConsumerBuilder with the schema.
*
Expand All @@ -341,4 +321,4 @@ public interface Context {
* @throws PulsarClientException
*/
<O> ConsumerBuilder<O> newConsumerBuilder(Schema<O> schema) throws PulsarClientException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
*/
package org.apache.pulsar.functions.api;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.instance;

package org.apache.pulsar.common.functions;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

/**
* Configuration of extra pulsar clusters to sent output message.
* Configuration to aggregate various authentication params.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
public class ExternalPulsarConfig {
private String name;
private String serviceURL;
private String webServiceURL;
private AuthenticationConfig authConfig;
private ProducerConfig producerConfig;
public class AuthenticationConfig {
private String clientAuthenticationPlugin;
private String clientAuthenticationParameters;
private String tlsTrustCertsFilePath;
private boolean useTls;
private boolean tlsAllowInsecureConnection;
private boolean tlsHostnameVerificationEnable;
}
Loading

0 comments on commit ed10d82

Please sign in to comment.