Skip to content

Commit

Permalink
Re-implement DoubleByteBuf as a simple holder of a pair of buffers (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Jan 19, 2018
1 parent cccc0bd commit 9f775b4
Show file tree
Hide file tree
Showing 15 changed files with 217 additions and 622 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.util.Pair;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.api.DoubleByteBuf;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
import org.apache.zookeeper.CreateMode;
Expand Down Expand Up @@ -2133,7 +2133,7 @@ public ByteBuf getMessageWithMetadata(byte[] data) throws IOException {
headers.writeInt(msgMetadataSize);
messageData.writeTo(outStream);
outStream.recycle();
return DoubleByteBuf.get(headers, payload);
return ByteBufPair.coalesce(ByteBufPair.get(headers, payload));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.File;

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.PulsarDecoder;

import io.netty.channel.ChannelInitializer;
Expand Down Expand Up @@ -70,6 +71,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
SslContext sslCtx = builder.clientAuth(ClientAuth.OPTIONAL).build();
ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
}

ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
ch.pipeline().addLast("handler", new ServerCnx(brokerService));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
*/
package org.apache.pulsar.client.api;

import io.netty.buffer.ByteBuf;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;

import io.netty.buffer.ByteBuf;

/**
* A representation of a message in a topic in its raw form (i.e. as it is stored in a managed ledger).
* RawMessages hold a refcount to the contains ByteBuf, so they must be closed for the ByteBuf to be freed.
Expand All @@ -45,10 +46,10 @@ public interface RawMessage extends AutoCloseable {
ByteBuf getHeadersAndPayload();

/**
* Serialize a raw message to a ByteBuf. The caller is responsible for releasing
* the returned ByteBuf.
* Serialize a raw message to a ByteBufPair. The caller is responsible for releasing
* the returned ByteBufPair.
*/
ByteBuf serialize();
ByteBufPair serialize();

@Override
void close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,18 @@

import java.io.IOException;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.common.api.DoubleByteBuf;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;

public class RawMessageImpl implements RawMessage {
private static final Logger log = LoggerFactory.getLogger(RawMessageImpl.class);

Expand Down Expand Up @@ -67,7 +65,7 @@ public void close() {
}

@Override
public ByteBuf serialize() {
public ByteBufPair serialize() {
// Format: [IdSize][Id][PayloadAndMetadataSize][PayloadAndMetadata]
int idSize = id.getSerializedSize();
int headerSize = 4 /* IdSize */ + idSize + 4 /* PayloadAndMetadataSize */;
Expand All @@ -85,7 +83,7 @@ public ByteBuf serialize() {
}
headers.writeInt(headersAndPayload.readableBytes());

return DoubleByteBuf.get(headers, headersAndPayload);
return ByteBufPair.get(headers, headersAndPayload);
}

static public RawMessage deserializeFrom(ByteBuf buffer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.common.api.DoubleByteBuf;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -68,7 +68,7 @@ public static byte[] createMessageWrittenToLedger(String msg) throws Exception {
ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(headers);
headers.writeInt(msgMetadataSize);
messageMetadata.writeTo(outStream);
ByteBuf headersAndPayload = DoubleByteBuf.get(headers, data);
ByteBuf headersAndPayload = ByteBufPair.coalesce(ByteBufPair.get(headers, data));
byte[] byteMessage = headersAndPayload.nioBuffer().array();
headersAndPayload.release();
return byteMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.pulsar.broker.service.ServerCnx.State;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.api.Commands.ChecksumType;
Expand Down Expand Up @@ -158,7 +159,7 @@ public void setup() throws Exception {
doReturn(Optional.empty()).when(zkDataCache).get(anyObject());
doReturn(zkDataCache).when(configCacheService).policiesCache();
doReturn(configCacheService).when(pulsar).getConfigurationCache();

LocalZooKeeperCacheService zkCache = mock(LocalZooKeeperCacheService.class);
doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zkDataCache).getAsync(any());
doReturn(zkDataCache).when(zkCache).policiesCache();
Expand Down Expand Up @@ -571,7 +572,7 @@ public void testSendCommand() throws Exception {
.setProducerName("prod-name").setSequenceId(0).build();
ByteBuf data = Unpooled.buffer(1024);

clientCommand = Commands.newSend(1, 0, 1, ChecksumType.None, messageMetadata, data);
clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1, ChecksumType.None, messageMetadata, data));
channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
clientCommand.release();

Expand Down Expand Up @@ -1292,7 +1293,7 @@ public void testSendSuccessOnEncryptionRequiredTopic() throws Exception {
.build();
ByteBuf data = Unpooled.buffer(1024);

clientCommand = Commands.newSend(1, 0, 1, ChecksumType.None, messageMetadata, data);
clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1, ChecksumType.None, messageMetadata, data));
channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
clientCommand.release();
assertTrue(getResponse() instanceof CommandSendReceipt);
Expand Down Expand Up @@ -1326,7 +1327,7 @@ public void testSendFailureOnEncryptionRequiredTopic() throws Exception {
.build();
ByteBuf data = Unpooled.buffer(1024);

clientCommand = Commands.newSend(1, 0, 1, ChecksumType.None, messageMetadata, data);
clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1, ChecksumType.None, messageMetadata, data));
channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
clientCommand.release();
assertTrue(getResponse() instanceof CommandSendError);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.Commands.ChecksumType;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder;
import org.slf4j.Logger;
Expand Down Expand Up @@ -471,7 +472,7 @@ public void testCorruptMessageRemove() throws Exception {
Builder metadataBuilder = ((MessageImpl) msg).getMessageBuilder();
MessageMetadata msgMetadata = metadataBuilder.setProducerName("test").setSequenceId(1).setPublishTime(10L)
.build();
ByteBuf cmd = Commands.newSend(producerId, 1, 1, ChecksumType.Crc32c, msgMetadata, payload);
ByteBufPair cmd = Commands.newSend(producerId, 1, 1, ChecksumType.Crc32c, msgMetadata, payload);
// (a) create OpSendMsg with message-data : "message-1"
OpSendMsg op = OpSendMsg.create(((MessageImpl) msg), cmd, 1, null);
// a.verify: as message is not corrupt: no need to update checksum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,17 @@
package org.apache.pulsar.client.impl;


import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.testng.Assert;
import org.testng.annotations.Test;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

public class RawMessageSerDeserTest {
static final Logger log = LoggerFactory.getLogger(RawMessageSerDeserTest.class);

Expand All @@ -45,7 +44,7 @@ public void testSerializationAndDeserialization() throws Exception {
.setPartition(10).setBatchIndex(20).build();

RawMessage m = new RawMessageImpl(id, headersAndPayload);
ByteBuf serialized = m.serialize();
ByteBuf serialized = ByteBufPair.coalesce(m.serialize());
byte[] bytes = new byte[serialized.readableBytes()];
serialized.readBytes(bytes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -107,6 +108,8 @@ public void initChannel(SocketChannel ch) throws Exception {
SslContext sslCtx = builder.build();
ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
}

ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4));
ch.pipeline().addLast("handler", new ClientCnx(conf, eventLoopGroup));
}
Expand Down
Loading

0 comments on commit 9f775b4

Please sign in to comment.