Skip to content

Commit

Permalink
Separate pulsar client API interfaces in different module (apache#3309)
Browse files Browse the repository at this point in the history
* Separate pulsar client API interfaces in different module

* Added provided protobuf dep

* Fixed build and tests

* Fixed import name

* Fixed AuthenticationTls class name

* Fixed AutoProduceSchema name
  • Loading branch information
merlimat authored Jan 9, 2019
1 parent aae3755 commit 918dc68
Show file tree
Hide file tree
Showing 70 changed files with 663 additions and 347 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaInfoUtil;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -1248,7 +1249,7 @@ protected void handleGetSchema(CommandGetSchema commandGetSchema) {
"Topic not found or no-schema"));
} else {
ctx.writeAndFlush(Commands.newGetSchemaResponse(requestId,
new SchemaInfo(schemaName, schemaAndMetadata.schema), schemaAndMetadata.version));
SchemaInfoUtil.newSchemaInfo(schemaName, schemaAndMetadata.schema), schemaAndMetadata.version));
}
}).exceptionally(ex -> {
ctx.writeAndFlush(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2647,8 +2647,7 @@ private String decryptMessage(TopicMessageImpl<byte[]> msg, String encryptionKey
String version = metadata.get("version");
assertEquals(version, "1.0");

org.apache.pulsar.common.api.proto.PulsarApi.CompressionType compressionType = encryptionCtx
.getCompressionType();
CompressionType compressionType = encryptionCtx.getCompressionType();
int uncompressedSize = encryptionCtx.getUncompressedMessageSize();
byte[] encrParam = encryptionCtx.getParam();
String encAlgo = encryptionCtx.getAlgorithm();
Expand All @@ -2669,7 +2668,7 @@ private String decryptMessage(TopicMessageImpl<byte[]> msg, String encryptionKey
metadataBuilder.setSequenceId(123);
metadataBuilder.setPublishTime(12333453454L);
metadataBuilder.addEncryptionKeys(encKey);
metadataBuilder.setCompression(compressionType);
metadataBuilder.setCompression(CompressionCodecProvider.convertToWireProtocol(compressionType));
metadataBuilder.setUncompressedSize(uncompressedSize);
ByteBuf decryptedPayload = crypto.decrypt(metadataBuilder.build(), payloadBuf, reader);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Lists;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
Expand All @@ -37,8 +39,8 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.slf4j.Logger;
Expand All @@ -47,8 +49,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.google.common.collect.Lists;

public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
private static final long testTimeout = 90000; // 1.5 min
private static final Logger log = LoggerFactory.getLogger(PatternTopicsConsumerImplTest.class);
Expand Down Expand Up @@ -260,7 +260,7 @@ public void testBinaryProtoToGetTopicsOfNamespaceNonPersistent() throws Exceptio
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.subscriptionTopicsMode(PulsarApi.CommandGetTopicsOfNamespace.Mode.NON_PERSISTENT)
.subscriptionTopicsMode(RegexSubscriptionMode.NonPersistentOnly)
.subscribe();

// 4. verify consumer get methods, to get right number of partitions and topics.
Expand Down Expand Up @@ -349,7 +349,7 @@ public void testBinaryProtoToGetTopicsOfNamespaceAll() throws Exception {
.patternAutoDiscoveryPeriod(2)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.subscriptionTopicsMode(PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
.subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.subscribe();

Expand Down

This file was deleted.

1 change: 1 addition & 0 deletions pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
<artifactSet>
<includes>
<include>org.apache.pulsar:pulsar-client</include>
<include>org.apache.pulsar:pulsar-client-api</include>
</includes>
</artifactSet>
<filters>
Expand Down
8 changes: 8 additions & 0 deletions pulsar-client-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,12 @@
<artifactId>pulsar-client-api</artifactId>
<name>Pulsar Client :: API</name>

<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf3.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import java.util.Map;
import java.util.function.Supplier;

import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.internal.DefaultImplementation;

public final class AuthenticationFactory {

/**
* Create an authentication provider for token based authentication.
*
* @param token
* the client auth token
*/
public static Authentication token(String token) {
return DefaultImplementation.newAuthenticationToken(token);
}

/**
* Create an authentication provider for token based authentication.
*
* @param tokenSupplier
* a supplier of the client auth token
*/
public static Authentication token(Supplier<String> tokenSupplier) {
return DefaultImplementation.newAuthenticationToken(tokenSupplier);
}

/**
* Create an authentication provider for TLS based authentication.
*
* @param certFilePath
* the path to the TLS client public key
* @param keyFilePath
* the path to the TLS client private key
*/
public static Authentication TLS(String certFilePath, String keyFilePath) {
return DefaultImplementation.newAuthenticationTLS(certFilePath, keyFilePath);
}

/**
* Create an instance of the Authentication-Plugin
*
* @param authPluginClassName
* name of the Authentication-Plugin you want to use
* @param authParamsString
* string which represents parameters for the Authentication-Plugin, e.g., "key1:val1,key2:val2"
* @return instance of the Authentication-Plugin
* @throws UnsupportedAuthenticationException
*/
public static Authentication create(String authPluginClassName, String authParamsString)
throws UnsupportedAuthenticationException {
try {
return DefaultImplementation.createAuthentication(authPluginClassName, authParamsString);
} catch (Throwable t) {
throw new UnsupportedAuthenticationException(t);
}
}

/**
* Create an instance of the Authentication-Plugin
*
* @param authPluginClassName name of the Authentication-Plugin you want to use
* @param authParams map which represents parameters for the Authentication-Plugin
* @return instance of the Authentication-Plugin
* @throws UnsupportedAuthenticationException
*/
public static final Authentication create(String authPluginClassName, Map<String, String> authParams)
throws UnsupportedAuthenticationException {
try {
return DefaultImplementation.createAuthentication(authPluginClassName, authParams);
} catch (Throwable t) {
throw new UnsupportedAuthenticationException(t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;

/**
* {@link ConsumerBuilder} is used to configure and create instances of {@link Consumer}.
*
Expand Down Expand Up @@ -330,12 +328,13 @@ public interface ConsumerBuilder<T> extends Cloneable {
ConsumerBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition);

/**
* Determines to which topics this consumer should be subscribed to - Persistent, Non-Persistent, or both.
* Only used with pattern subscriptions.
* Determines to which topics this consumer should be subscribed to - Persistent, Non-Persistent, or both. Only used
* with pattern subscriptions.
*
* @param mode Pattern subscription mode
* @param regexSubscriptionMode
* Pattern subscription mode
*/
ConsumerBuilder<T> subscriptionTopicsMode(Mode mode);
ConsumerBuilder<T> subscriptionTopicsMode(RegexSubscriptionMode regexSubscriptionMode);

/**
* Intercept {@link Consumer}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,20 @@

package org.apache.pulsar.client.api;

import org.apache.pulsar.common.api.EncryptionContext;

public enum ConsumerCryptoFailureAction {
FAIL, // This is the default option to fail consume until crypto succeeds
DISCARD, // Message is silently acknowledged and not delivered to the application
/**
*
*
* <pre>
* Deliver the encrypted message to the application. It's the application's responsibility to decrypt the message.
* If message is also compressed, decompression will fail. If message contain batch messages, client will not be
* able to retrieve individual messages in the batch.
* </pre>
*
*
* Delivered encrypted message contains {@link EncryptionContext} which contains encryption and compression
* information in it using which application can decrypt consumed message payload.
*
*
*/
CONSUME;
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Expand Down Expand Up @@ -156,7 +155,7 @@ public interface Message<T> {
/**
* {@link EncryptionContext} contains encryption and compression information in it using which application can
* decrypt consumed message with encrypted-payload.
*
*
* @return
*/
Optional<EncryptionContext> getEncryptionCtx();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
package org.apache.pulsar.client.api;

import java.io.IOException;

import java.io.Serializable;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;

import org.apache.pulsar.client.internal.DefaultImplementation;

/**
* Opaque unique identifier of a single message
Expand All @@ -47,14 +46,14 @@ public interface MessageId extends Comparable<MessageId>, Serializable {
* @return the de-serialized messageId object
*/
public static MessageId fromByteArray(byte[] data) throws IOException {
return MessageIdImpl.fromByteArray(data);
return DefaultImplementation.newMessageIdFromByteArray(data);
}

public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName) throws IOException {
return MessageIdImpl.fromByteArrayWithTopic(data, topicName);
public static MessageId fromByteArrayWithTopic(byte[] data, String topicName) throws IOException {
return DefaultImplementation.newMessageIdFromByteArrayWithTopic(data, topicName);
}

public static final MessageId earliest = new MessageIdImpl(-1, -1, -1);
public static final MessageId earliest = DefaultImplementation.newMessageId(-1, -1, -1);

public static final MessageId latest = new MessageIdImpl(Long.MAX_VALUE, Long.MAX_VALUE, -1);
public static final MessageId latest = DefaultImplementation.newMessageId(Long.MAX_VALUE, Long.MAX_VALUE, -1);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.internal.DefaultImplementation;

/**
* Class that provides a client interface to Pulsar.
Expand All @@ -40,7 +40,7 @@ public interface PulsarClient extends Closeable {
* @since 2.0.0
*/
public static ClientBuilder builder() {
return new ClientBuilderImpl();
return DefaultImplementation.newClientBuilder();
}

/**
Expand Down
Loading

0 comments on commit 918dc68

Please sign in to comment.