Skip to content

Commit

Permalink
After apache#3228, removed usages of deprecated client API (apache#3272)
Browse files Browse the repository at this point in the history
* After apache#3228, removed usages of deprecated client API

* Removed ProducerConfiguration from flink connector

* Use pulsar-client-1x in pulsar-spark

* Fixed log4j adapter

* Fixed flink examples

* Fixed storm integration tests

* Fixed storm example

* Wrapping message listeners in v1 lib wrapper

* Fixed unit tests

* Moved spark receiver test to test-containers to avoid shading issues

* Fixed PulsarAvroTableSinkTest

* Only run spark receiver test in integration tests
  • Loading branch information
merlimat authored Jan 5, 2019
1 parent 8ab80bb commit 4a85bb7
Show file tree
Hide file tree
Showing 55 changed files with 1,340 additions and 3,286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer;
import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
import org.apache.pulsar.client.api.ProducerConfiguration;

/**
* Implements a streaming wordcount program on pulsar topics.
Expand Down Expand Up @@ -99,7 +98,6 @@ public static void main(String[] args) throws Exception {
serviceUrl,
outputTopic,
wordWithCount -> wordWithCount.toString().getBytes(UTF_8),
new ProducerConfiguration(),
wordWithCount -> wordWithCount.word
)).setParallelism(parallelism);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.pulsar.client.api.ProducerConfiguration;

/**
* Implements a streaming wordcount program on pulsar topics.
Expand Down Expand Up @@ -108,7 +107,7 @@ public static void main(String[] args) throws Exception {
table.printSchema();
TableSink sink = null;
if (null != outputTopic) {
sink = new PulsarAvroTableSink(serviceUrl, outputTopic, new ProducerConfiguration(), ROUTING_KEY, WordWithCount.class);
sink = new PulsarAvroTableSink(serviceUrl, outputTopic, ROUTING_KEY, WordWithCount.class);
} else {
// print the results with a csv file
sink = new CsvTableSink("./examples/file", "|");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.pulsar.client.api.ProducerConfiguration;

/**
* Implements a streaming wordcount program on pulsar topics.
Expand Down Expand Up @@ -109,7 +108,7 @@ public static void main(String[] args) throws Exception {
table.printSchema();
TableSink sink = null;
if (null != outputTopic) {
sink = new PulsarJsonTableSink(serviceUrl, outputTopic, new ProducerConfiguration(), ROUTING_KEY);
sink = new PulsarJsonTableSink(serviceUrl, outputTopic, ROUTING_KEY);
} else {
// print the results with a csv file
sink = new CsvTableSink("./examples/file", "|");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,7 @@ public void testDeleteNamespace() throws Exception {
final String topicName = "persistent://" + namespace + "/my-topic";
TopicName topic = TopicName.get(topicName);

Producer producer = pulsarClient.createProducer(topicName);
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
producer.close();
NamespaceBundle bundle1 = pulsar.getNamespaceService().getBundle(topic);
// (2) Delete topic
Expand All @@ -1171,7 +1171,7 @@ public void testSubscribeRate() throws Exception {

admin.topics().createPartitionedTopic(topicName, 2);
pulsar.getConfiguration().setAuthorizationEnabled(false);
Consumer consumer = pulsarClient.newConsumer()
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("subscribe-rate")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,20 @@
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
Expand Down Expand Up @@ -1168,19 +1169,22 @@ public void testPayloadCorruptionDetection() throws Exception {
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();

Message<byte[]> msg1 = MessageBuilder.create().setContent("message-1".getBytes()).build();
CompletableFuture<MessageId> future1 = producer.sendAsync(msg1);
CompletableFuture<MessageId> future1 = producer.newMessage().value("message-1".getBytes()).sendAsync();

// Stop the broker, and publishes messages. Messages are accumulated in the producer queue and they're checksums
// would have already been computed. If we change the message content at that point, it should result in a
// checksum validation error
stopBroker();

Message<byte[]> msg2 = MessageBuilder.create().setContent("message-2".getBytes()).build();
CompletableFuture<MessageId> future2 = producer.sendAsync(msg2);

// Taint msg2
msg2.getData()[msg2.getData().length - 1] = '3'; // new content would be 'message-3'
byte[] a2 = "message-2".getBytes();
TypedMessageBuilder<byte[]> msg2 = producer.newMessage().value(a2);


CompletableFuture<MessageId> future2 = msg2.sendAsync();

// corrupt the message, new content would be 'message-3'
((TypedMessageBuilderImpl<byte[]>) msg2).getContent().put(a2.length - 1, (byte) '3');

// Restart the broker to have the messages published
startBroker();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
Expand Down Expand Up @@ -418,11 +417,10 @@ public Void call() throws Exception {
log.info("--- Starting Consumer --- " + url3);

// Produce a message that isn't replicated
producer1.produce(1, MessageBuilder.create().disableReplication());
producer1.produce(1, producer1.newMessage().disableReplication());

// Produce a message not replicated to r2
producer1.produce(1,
MessageBuilder.create().setReplicationClusters(Lists.newArrayList("r1", "r3")));
producer1.produce(1, producer1.newMessage().replicationClusters(Lists.newArrayList("r1", "r3")));

// Produce a default replicated message
producer1.produce(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.testng.Assert.assertEquals;

import com.google.common.collect.Sets;

import java.net.URL;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -35,12 +37,12 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
Expand All @@ -50,9 +52,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

public class ReplicatorTestBase {
URL url1;
URL urlTls1;
Expand Down Expand Up @@ -309,12 +308,15 @@ void produce(int messages) throws Exception {

}

void produce(int messages, MessageBuilder<byte[]> messageBuilder) throws Exception {
TypedMessageBuilder<byte[]> newMessage() {
return producer.newMessage();
}

void produce(int messages, TypedMessageBuilder<byte[]> messageBuilder) throws Exception {
log.info("Start sending messages");
for (int i = 0; i < messages; i++) {
final String m = new String("test-builder-" + i);
messageBuilder.setContent(m.getBytes());
producer.send(messageBuilder.build());
messageBuilder.value(m.getBytes()).send();
log.info("Sent message {}", m);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
Expand Down Expand Up @@ -418,11 +417,11 @@ public Void call() throws Exception {
log.info("--- Starting Consumer --- " + url3);

// Produce a message that isn't replicated
producer1.produce(1, MessageBuilder.create().disableReplication());
producer1.produce(1, producer1.newMessage().disableReplication());

// Produce a message not replicated to r2
producer1.produce(1,
MessageBuilder.create().setReplicationClusters(Lists.newArrayList("r1", "r3")));
producer1.newMessage().replicationClusters(Lists.newArrayList("r1", "r3")));

// Produce a default replicated message
producer1.produce(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
Expand Down Expand Up @@ -305,16 +305,20 @@ void produce(int messages) throws Exception {

}

void produce(int messages, MessageBuilder<byte[]> messageBuilder) throws Exception {
void produce(int messages, TypedMessageBuilder<byte[]> messageBuilder) throws Exception {
log.info("Start sending messages");
for (int i = 0; i < messages; i++) {
final String m = new String("test-builder-" + i);
messageBuilder.setContent(m.getBytes());
producer.send(messageBuilder.build());
messageBuilder.value(m.getBytes());
messageBuilder.send();
log.info("Sent message {}", m);
}
}

TypedMessageBuilder<byte[]> newMessage() {
return producer.newMessage();
}

void close() throws Exception {
client.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.atomic.AtomicReference;

import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -334,9 +335,15 @@ public void testInvalidSequence() throws Exception {
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName.toString())
.subscriptionName("my-subscriber-name").subscribe();

Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString())
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

try {
Message<byte[]> msg = MessageBuilder.create().setContent("InvalidMessage".getBytes()).build();
consumer.acknowledge(msg);
TypedMessageBuilderImpl<byte[]> mb = (TypedMessageBuilderImpl<byte[]>) producer.newMessage()
.value("InvalidMessage".getBytes());
consumer.acknowledge(mb.getMessage());
} catch (PulsarClientException.InvalidMessageException e) {
// ok
}
Expand All @@ -357,10 +364,7 @@ public void testInvalidSequence() throws Exception {
// ok
}

Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString())
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

producer.close();

try {
Expand Down
Loading

0 comments on commit 4a85bb7

Please sign in to comment.