Skip to content

Commit

Permalink
expose new message with different schema (apache#5517)
Browse files Browse the repository at this point in the history
Master Issue: apache#5141 

Expose new message with different schema interface, which not required same parameterized type with the producer.
Since the producer and messages sent by it may have different inner types, it's better to have a type agnostic producer interceptor with a checkin method.
  • Loading branch information
yittg authored and sijie committed Nov 5, 2019
1 parent 7386130 commit 669b916
Show file tree
Hide file tree
Showing 17 changed files with 379 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@
*/
package org.apache.pulsar.client.api;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -53,67 +58,72 @@ protected void cleanup() throws Exception {

@Test
public void testProducerInterceptor() throws PulsarClientException {
ProducerInterceptor<String> interceptor1 = new ProducerInterceptor<String>() {
@Override
public void close() {
Map<MessageId, List<String>> ackCallback = new HashMap<>();

abstract class BaseInterceptor implements
org.apache.pulsar.client.api.interceptor.ProducerInterceptor {
private static final String set = "set";
private String tag;
private BaseInterceptor(String tag) {
this.tag = tag;
}

@Override
public Message<String> beforeSend(Producer<String> producer, Message<String> message) {
MessageImpl<String> msg = (MessageImpl<String>) message;
log.info("Before send message: {}", new String(msg.getData()));
java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> properties = msg.getMessageBuilder().getPropertiesList();
for (int i = 0; i < properties.size(); i++) {
if ("key".equals(properties.get(i).getKey())) {
msg.getMessageBuilder().setProperties(i, PulsarApi.KeyValue.newBuilder().setKey("key").setValue("after").build());
}
}
return msg;
}
public void close() {}

@Override
public void onSendAcknowledgement(Producer<String> producer, Message<String> message, MessageId msgId, Throwable cause) {
message.getProperties();
Assert.assertEquals("complete", message.getProperty("key"));
log.info("Send acknowledgement message: {}, msgId: {}", new String(message.getData()), msgId, cause);
public Message beforeSend(Producer producer, Message message) {
MessageImpl msg = (MessageImpl) message;
msg.getMessageBuilder()
.addProperties(KeyValue.newBuilder().setKey(tag).setValue(set));
return message;
}
};

ProducerInterceptor<String> interceptor2 = new ProducerInterceptor<String>() {
@Override
public void close() {

public void onSendAcknowledgement(Producer producer, Message message,
MessageId msgId, Throwable exception) {
if (!set.equals(message.getProperties().get(tag))) {
return;
}
ackCallback.computeIfAbsent(msgId, k -> new ArrayList<>()).add(tag);
}
}

BaseInterceptor interceptor1 = new BaseInterceptor("int1") {
@Override
public Message<String> beforeSend(Producer<String> producer, Message<String> message) {
MessageImpl<String> msg = (MessageImpl<String>) message;
log.info("Before send message: {}", new String(msg.getData()));
java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> properties = msg.getMessageBuilder().getPropertiesList();
for (int i = 0; i < properties.size(); i++) {
if ("key".equals(properties.get(i).getKey())) {
msg.getMessageBuilder().setProperties(i, PulsarApi.KeyValue.newBuilder().setKey("key").setValue("complete").build());
}
}
return msg;
public boolean eligible(Message message) {
return true;
}

};
BaseInterceptor interceptor2 = new BaseInterceptor("int2") {
@Override
public void onSendAcknowledgement(Producer<String> producer, Message<String> message, MessageId msgId, Throwable cause) {
message.getProperties();
Assert.assertEquals("complete", message.getProperty("key"));
log.info("Send acknowledgement message: {}, msgId: {}", new String(message.getData()), msgId, cause);
public boolean eligible(Message message) {
return SchemaType.STRING.equals(
((MessageImpl)message).getSchema().getSchemaInfo().getType());
}
};
BaseInterceptor interceptor3 = new BaseInterceptor("int3") {
@Override
public boolean eligible(Message message) {
return SchemaType.INT32.equals(
((MessageImpl)message).getSchema().getSchemaInfo().getType());
}
};

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.intercept(interceptor1, interceptor2)
.intercept(interceptor1, interceptor2, interceptor3)
.create();

MessageId messageId = producer.newMessage().property("key", "before").value("Hello Pulsar!").send();
MessageId messageId = producer.newMessage().property("STR", "Y")
.value("Hello Pulsar!").send();
Assert.assertEquals(ackCallback.get(messageId),
Arrays.asList(interceptor1.tag, interceptor2.tag));
log.info("Send result messageId: {}", messageId);
MessageId messageId2 = producer.newMessage(Schema.INT32).property("INT", "Y")
.value(18).send();
Assert.assertEquals(ackCallback.get(messageId2),
Arrays.asList(interceptor1.tag, interceptor3.tag));
log.info("Send result messageId: {}", messageId2);
producer.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidMessageException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.KeyValue;
Expand Down Expand Up @@ -231,13 +230,13 @@ public void newProducerForMessageSchemaOnTopicWithMultiVersionSchema() throws Ex
byte[] contentV2 = v2Writer.write(dataV2);
try (Producer<byte[]> p = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
.topic(topic).create();
Consumer<V2Data> c = pulsarClient.newConsumer(v2Schema)
.topic(topic)
.subscriptionName("sub1").subscribe()) {
Consumer<V2Data> c = pulsarClient.newConsumer(v2Schema)
.topic(topic)
.subscriptionName("sub1").subscribe()) {
Assert.expectThrows(SchemaSerializationException.class, () -> p.send(contentV1));

((ProducerBase<byte[]>)p).newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
.value(contentV1).send();
p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
.value(contentV1).send();
p.send(contentV2);
Message<V2Data> msg1 = c.receive();
V2Data msg1Value = msg1.getValue();
Expand All @@ -250,10 +249,10 @@ public void newProducerForMessageSchemaOnTopicWithMultiVersionSchema() throws Ex
Assert.assertEquals(msg2.getSchemaVersion(), new LongSchemaVersion(1).bytes());

try {
((ProducerBase<byte[]>)p).newMessage(Schema.BYTES).value(contentV1).send();
p.newMessage(Schema.BYTES).value(contentV1).send();
if (schemaValidationEnforced) {
Assert.fail("Shouldn't be able to send to a schema'd topic with no schema"
+ " if SchemaValidationEnabled is enabled");
+ " if SchemaValidationEnabled is enabled");
}
Message<V2Data> msg3 = c.receive();
Assert.assertEquals(msg3.getSchemaVersion(), SchemaVersion.Empty.bytes());
Expand All @@ -262,12 +261,44 @@ public void newProducerForMessageSchemaOnTopicWithMultiVersionSchema() throws Ex
Assert.assertTrue(e instanceof IncompatibleSchemaException);
} else {
Assert.fail("Shouldn't throw IncompatibleSchemaException"
+ " if SchemaValidationEnforced is disabled");
+ " if SchemaValidationEnforced is disabled");
}
}
}
}

@Test
public void newProducerForMessageOnTopicWithDifferentSchemaType() throws Exception {
String topic = "my-property/my-ns/schema-test";
V1Data data1 = new V1Data(2);
V2Data data2 = new V2Data(3, 5);
V1Data data3 = new V1Data(8);
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
.topic(topic).create();
Consumer<V2Data> c = pulsarClient.newConsumer(Schema.AVRO(V2Data.class))
.topic(topic)
.subscriptionName("sub1").subscribe()) {
p.newMessage().value(data1).send();
p.newMessage(Schema.AVRO(V2Data.class)).value(data2).send();
p.newMessage(Schema.AVRO(V1Data.class)).value(data3).send();
Message<V2Data> msg1 = c.receive();
V2Data msg1Value = msg1.getValue();
Assert.assertEquals(data1.i, msg1Value.i);
Assert.assertNull(msg1Value.j);
Assert.assertEquals(msg1.getSchemaVersion(), new LongSchemaVersion(0).bytes());

Message<V2Data> msg2 = c.receive();
Assert.assertEquals(data2, msg2.getValue());
Assert.assertEquals(msg2.getSchemaVersion(), new LongSchemaVersion(1).bytes());

Message<V2Data> msg3 = c.receive();
V2Data msg3Value = msg3.getValue();
Assert.assertEquals(data3.i, msg3Value.i);
Assert.assertNull(msg3Value.j);
Assert.assertEquals(msg3.getSchemaVersion(), new LongSchemaVersion(0).bytes());
}
}

@Test
public void newProducerForMessageSchemaOnTopicInitialWithNoSchema() throws Exception {
String topic = "my-property/my-ns/schema-test";
Expand All @@ -289,13 +320,11 @@ public void newProducerForMessageSchemaOnTopicInitialWithNoSchema() throws Excep
V2Data dataV2 = new V2Data(i, -i);
byte[] contentV1 = v1Writer.write(dataV1);
byte[] contentV2 = v2Writer.write(dataV2);
((ProducerBase<byte[]>) p).newMessage(Schema.AUTO_PRODUCE_BYTES(v1Schema))
.value(contentV1).send();
p.newMessage(Schema.AUTO_PRODUCE_BYTES(v1Schema)).value(contentV1).send();
Message<byte[]> msg1 = c.receive();
Assert.assertEquals(msg1.getSchemaVersion(), new LongSchemaVersion(0).bytes());
Assert.assertEquals(msg1.getData(), contentV1);
((ProducerBase<byte[]>) p).newMessage(Schema.AUTO_PRODUCE_BYTES(v2Schema))
.value(contentV2).send();
p.newMessage(Schema.AUTO_PRODUCE_BYTES(v2Schema)).value(contentV2).send();
Message<byte[]> msg2 = c.receive();
Assert.assertEquals(msg2.getSchemaVersion(), new LongSchemaVersion(1).bytes());
Assert.assertEquals(msg2.getData(), contentV2);
Expand Down Expand Up @@ -329,18 +358,18 @@ public void newProducerForMessageSchemaWithBatch() throws Exception {
for (int i = 0; i < total; ++i) {
if (i / batch % 2 == 0) {
byte[] content = v1DataAvroWriter.write(new V1Data(i));
((ProducerBase<byte[]>)p).newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
.value(content).sendAsync();
p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
.value(content).sendAsync();
} else {
byte[] content = v2DataAvroWriter.write(new V2Data(i, i + total));
((ProducerBase<byte[]>)p).newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V2Data.class)))
.value(content).sendAsync();
p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V2Data.class)))
.value(content).sendAsync();
}
if ((i + 1) % incompatible == 0) {
byte[] content = incompatibleDataAvroWriter.write(new IncompatibleData(-i, -i));
try {
((ProducerBase<byte[]>)p).newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(IncompatibleData.class)))
.value(content).send();
p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(IncompatibleData.class)))
.value(content).send();
} catch (Exception e) {
Assert.assertTrue(e instanceof IncompatibleSchemaException, e.getMessage());
}
Expand Down Expand Up @@ -368,8 +397,8 @@ public void newProducerWithMultipleSchemaDisabled() throws Exception {
.topic(topic)
.enableMultiSchema(false).create()) {
Assert.assertThrows(InvalidMessageException.class,
() -> ((ProducerBase<byte[]>)p).newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
.value(v1DataAvroWriter.write(new V1Data(0))).send());
() -> p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
.value(v1DataAvroWriter.write(new V1Data(0))).send());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,19 @@ public interface Producer<T> extends Closeable {
*/
TypedMessageBuilder<T> newMessage();

/**
* Create a new message builder with schema, not required same parameterized type with the producer.
*
* @return a typed message builder that can be used to construct the message to be sent through this producer
* @see #newMessage()
*/
<V> TypedMessageBuilder<V> newMessage(Schema<V> schema);

/**
* Get the last sequence id that was published by this producer.
*
* <p>This represent either the automatically assigned or custom sequence id (set on the {@link MessageBuilder})
* <p>This represent either the automatically assigned
* or custom sequence id (set on the {@link TypedMessageBuilder})
* that was published and acknowledged by the broker.
*
* <p>After recreating a producer with the same producer name, this will return the last message that was
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,20 @@ public interface ProducerBuilder<T> extends Cloneable {
* the list of interceptors to intercept the producer created by this builder.
* @return the producer builder instance
*/
@Deprecated
ProducerBuilder<T> intercept(ProducerInterceptor<T> ... interceptors);

/**
* Add a set of {@link org.apache.pulsar.client.api.interceptor.ProducerInterceptor} to the producer.
*
* <p>Interceptors can be used to trace the publish and acknowledgments operation happening in a producer.
*
* @param interceptors
* the list of interceptors to intercept the producer created by this builder.
* @return the producer builder instance
*/
ProducerBuilder<T> intercept(org.apache.pulsar.client.api.interceptor.ProducerInterceptor... interceptors);

/**
* If enabled, partitioned producer will automatically discover new partitions at runtime. This is only applied on
* partitioned topics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* <p>ProducerInterceptor callbacks may be called from multiple threads. Interceptor
* implementation must ensure thread-safety, if needed.
*/
@Deprecated
public interface ProducerInterceptor<T> extends AutoCloseable {

/**
Expand Down
Loading

0 comments on commit 669b916

Please sign in to comment.