Skip to content

Commit

Permalink
[FLINK-7515][network] allow actual 0-length content in NettyMessage#a…
Browse files Browse the repository at this point in the history
…llocateBuffer()

Previously, length "0" meant "unknown content length" but there are cases where
the actual length is 0 and so we use -1 for tagging the special case now.

[FLINK-7515][network] address PR comments

This closes apache#4592.
  • Loading branch information
Nico Kruber authored and tillrohrmann committed Nov 6, 2017
1 parent e285a41 commit f1c4eb6
Showing 1 changed file with 44 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.nio.ByteBuffer;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand All @@ -67,12 +68,53 @@ abstract class NettyMessage {

// ------------------------------------------------------------------------

/**
* Allocates a new (header and contents) buffer and adds some header information for the frame
* decoder.
*
* <p>Before sending the buffer, you must write the actual length after adding the contents as
* an integer to position <tt>0</tt>!
*
* @param allocator
* byte buffer allocator to use
* @param id
* {@link NettyMessage} subclass ID
*
* @return a newly allocated direct buffer with header data written for {@link
* NettyMessageDecoder}
*/
private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id) {
return allocateBuffer(allocator, id, 0);
return allocateBuffer(allocator, id, -1);
}

/**
* Allocates a new (header and contents) buffer and adds some header information for the frame
* decoder.
*
* <p>If the <tt>length</tt> is unknown, you must write the actual length after adding the
* contents as an integer to position <tt>0</tt>!
*
* @param allocator
* byte buffer allocator to use
* @param id
* {@link NettyMessage} subclass ID
* @param length
* content length (or <tt>-1</tt> if unknown)
*
* @return a newly allocated direct buffer with header data written for {@link
* NettyMessageDecoder}
*/
private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, int length) {
final ByteBuf buffer = length != 0 ? allocator.directBuffer(HEADER_LENGTH + length) : allocator.directBuffer();
checkArgument(length <= Integer.MAX_VALUE - HEADER_LENGTH);

final ByteBuf buffer;
if (length != -1) {
buffer = allocator.directBuffer(HEADER_LENGTH + length);
} else {
// content length unknown -> start with the default initial size (rather than HEADER_LENGTH only):
buffer = allocator.directBuffer();
}

buffer.writeInt(HEADER_LENGTH + length);
buffer.writeInt(MAGIC_NUMBER);
buffer.writeByte(id);
Expand Down

0 comments on commit f1c4eb6

Please sign in to comment.