Skip to content

Commit

Permalink
Defer known-length payload writes when within a segment of completion.
Browse files Browse the repository at this point in the history
This allows both really small bodies and very large buffered bodies to be written as a single frame.
  • Loading branch information
JakeWharton committed Feb 5, 2016
1 parent 2d2adbf commit cc70a10
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.EOFException;
import java.io.IOException;
import java.util.Random;
import okhttp3.RequestBody;
import okio.Buffer;
import okio.BufferedSink;
import okio.ByteString;
Expand All @@ -35,6 +36,7 @@
import static okhttp3.internal.ws.WebSocketProtocol.PAYLOAD_SHORT_MAX;
import static okhttp3.internal.ws.WebSocketProtocol.toggleMask;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public final class WebSocketWriterTest {
Expand All @@ -61,7 +63,7 @@ public final class WebSocketWriterTest {
private final WebSocketWriter clientWriter = new WebSocketWriter(true, data, random);

@Test public void serverTextMessage() throws IOException {
BufferedSink sink = Okio.buffer(serverWriter.newMessageSink(OPCODE_TEXT));
BufferedSink sink = Okio.buffer(serverWriter.newMessageSink(OPCODE_TEXT, -1));

sink.writeUtf8("Hel").flush();
assertData("010348656c");
Expand All @@ -73,8 +75,65 @@ public final class WebSocketWriterTest {
assertData("8000");
}

@Test public void serverSmallBufferedPayloadWrittenAsOneFrame() throws IOException {
int length = 5;
byte[] bytes = binaryData(length);

RequestBody body = RequestBody.create(null, bytes);
BufferedSink sink = Okio.buffer(serverWriter.newMessageSink(OPCODE_TEXT, length));
body.writeTo(sink);
sink.close();

assertData("8105");
assertData(bytes);
assertTrue(data.exhausted());
}

@Test public void serverLargeBufferedPayloadWrittenAsOneFrame() throws IOException {
int length = 12345;
byte[] bytes = binaryData(length);

RequestBody body = RequestBody.create(null, bytes);
BufferedSink sink = Okio.buffer(serverWriter.newMessageSink(OPCODE_TEXT, length));
body.writeTo(sink);
sink.close();

assertData("817e");
assertData(String.format("%04x", length));
assertData(bytes);
assertTrue(data.exhausted());
}

@Test public void serverLargeNonBufferedPayloadWrittenAsMultipleFrames() throws IOException {
int length = 100_000;
Buffer bytes = new Buffer().write(binaryData(length));

BufferedSink sink = Okio.buffer(serverWriter.newMessageSink(OPCODE_TEXT, length));
Buffer body = bytes.clone();
sink.write(body.readByteString(20_000));
sink.write(body.readByteString(20_000));
sink.write(body.readByteString(20_000));
sink.write(body.readByteString(20_000));
sink.write(body.readByteString(20_000));
sink.close();

assertData("017e4800");
assertData(bytes.readByteArray(18_432));
assertData("007e5000");
assertData(bytes.readByteArray(20_480));
assertData("007e5000");
assertData(bytes.readByteArray(20_480));
assertData("007e5000");
assertData(bytes.readByteArray(20_480));
assertData("007e4800");
assertData(bytes.readByteArray(18_432));
assertData("807e06a0");
assertData(bytes.readByteArray(1_696));
assertTrue(data.exhausted());
}

@Test public void closeFlushes() throws IOException {
BufferedSink sink = Okio.buffer(serverWriter.newMessageSink(OPCODE_TEXT));
BufferedSink sink = Okio.buffer(serverWriter.newMessageSink(OPCODE_TEXT, -1));

sink.writeUtf8("Hel").flush();
assertData("010348656c");
Expand All @@ -84,7 +143,7 @@ public final class WebSocketWriterTest {
}

@Test public void noWritesAfterClose() throws IOException {
Sink sink = serverWriter.newMessageSink(OPCODE_TEXT);
Sink sink = serverWriter.newMessageSink(OPCODE_TEXT, -1);

sink.close();
assertData("8100");
Expand All @@ -100,7 +159,7 @@ public final class WebSocketWriterTest {
}

@Test public void clientTextMessage() throws IOException {
BufferedSink sink = Okio.buffer(clientWriter.newMessageSink(OPCODE_TEXT));
BufferedSink sink = Okio.buffer(clientWriter.newMessageSink(OPCODE_TEXT, -1));

sink.writeUtf8("Hel").flush();
assertData("018360b420bb28d14c");
Expand All @@ -113,7 +172,7 @@ public final class WebSocketWriterTest {
}

@Test public void serverBinaryMessage() throws IOException {
BufferedSink sink = Okio.buffer(serverWriter.newMessageSink(OPCODE_BINARY));
BufferedSink sink = Okio.buffer(serverWriter.newMessageSink(OPCODE_BINARY, -1));

sink.write(binaryData(50)).flush();
assertData("0232");
Expand All @@ -128,7 +187,7 @@ public final class WebSocketWriterTest {
}

@Test public void serverMessageLengthShort() throws IOException {
Sink sink = serverWriter.newMessageSink(OPCODE_BINARY);
Sink sink = serverWriter.newMessageSink(OPCODE_BINARY, -1);

// Create a payload which will overflow the normal payload byte size.
Buffer payload = new Buffer();
Expand All @@ -148,7 +207,7 @@ public final class WebSocketWriterTest {
}

@Test public void serverMessageLengthLong() throws IOException {
Sink sink = serverWriter.newMessageSink(OPCODE_BINARY);
Sink sink = serverWriter.newMessageSink(OPCODE_BINARY, -1);

// Create a payload which will overflow the normal and short payload byte size.
Buffer payload = new Buffer();
Expand All @@ -175,7 +234,7 @@ public final class WebSocketWriterTest {

random.setSeed(0); // Reset the seed so real data matches.

BufferedSink sink = Okio.buffer(clientWriter.newMessageSink(OPCODE_BINARY));
BufferedSink sink = Okio.buffer(clientWriter.newMessageSink(OPCODE_BINARY, -1));

byte[] part1 = binaryData(50);
sink.write(part1).flush();
Expand Down Expand Up @@ -314,9 +373,9 @@ public final class WebSocketWriterTest {
}

@Test public void twoMessageSinksThrows() {
clientWriter.newMessageSink(OPCODE_TEXT);
clientWriter.newMessageSink(OPCODE_TEXT, -1);
try {
clientWriter.newMessageSink(OPCODE_TEXT);
clientWriter.newMessageSink(OPCODE_TEXT, -1);
fail();
} catch (IllegalStateException e) {
assertEquals("Another message writer is active. Did you call close()?", e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public boolean readMessage() {
+ ". Must use WebSocket.TEXT or WebSocket.BINARY.");
}

BufferedSink sink = Okio.buffer(writer.newMessageSink(formatOpcode));
BufferedSink sink = Okio.buffer(writer.newMessageSink(formatOpcode, message.contentLength()));
try {
message.writeTo(sink);
sink.close();
Expand Down
11 changes: 9 additions & 2 deletions okhttp-ws/src/main/java/okhttp3/internal/ws/WebSocketWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,15 @@ private void writeControlFrameSynchronized(int opcode, Buffer payload) throws IO
* Stream a message payload as a series of frames. This allows control frames to be interleaved
* between parts of the message.
*/
public Sink newMessageSink(int formatOpcode) {
public Sink newMessageSink(int formatOpcode, long contentLength) {
if (activeWriter) {
throw new IllegalStateException("Another message writer is active. Did you call close()?");
}
activeWriter = true;

// Reset FrameSink state for a new writer.
frameSink.formatOpcode = formatOpcode;
frameSink.contentLength = contentLength;
frameSink.isFirstFrame = true;
frameSink.closed = false;

Expand Down Expand Up @@ -226,6 +227,7 @@ private void writeMaskedSynchronized(BufferedSource source, long byteCount) thro

private final class FrameSink implements Sink {
private int formatOpcode;
private long contentLength;
private boolean isFirstFrame;
private boolean closed;

Expand All @@ -234,8 +236,13 @@ private final class FrameSink implements Sink {

buffer.write(source, byteCount);

// Determine if this is a buffered write which we can defer until close() flushes.
boolean deferWrite = isFirstFrame
&& contentLength != -1
&& buffer.size() > contentLength - 2048 /* segment size */;

long emitCount = buffer.completeSegmentByteCount();
if (emitCount > 0) {
if (emitCount > 0 && !deferWrite) {
synchronized (WebSocketWriter.this) {
writeMessageFrameSynchronized(formatOpcode, emitCount, isFirstFrame, false /* final */);
}
Expand Down

0 comments on commit cc70a10

Please sign in to comment.