Skip to content

Commit

Permalink
Map HTTP/2 Streams to Channels
Browse files Browse the repository at this point in the history
Motivation:

This allows using handlers for Streams in normal Netty-style. Frames are
read/written to the channel as messages, not directly as a
callback/method call. Handlers allow mixing and can ease HTTP/1 and
HTTP/2 interoperability by eventually supporting HTTP/1 handlers in
HTTP/2 and vise versa.

Modifications:

New handler Http2MultiplexCodec that converts from the current HTTP/2
API to a message-based API and child channels for streams.

Result:

The basics are done for server-side: new streams trigger creation of new
channels in much the same appearance to how new connections trigger new
channel creation. The basic frames HEADERS and DATA are handled, but
also GOAWAY and RST_STREAM.

Inbound flow control is implemented, but outbound is not. That will be
done later, along with not completing write promises on the child
channel until the write actually completes on the parent.

There is not yet support for outbound priority/weight, push promises,
and many other features.

There is a generic Object that may be set on stream frames. This also
paves the way for client-side support which needs a way to refer to
yet-to-be-created streams (due to how HEADERS allocates a stream id, and
the allocation order must be the same as transmission order).
  • Loading branch information
ejona86 authored and Scottmitch committed Mar 25, 2016
1 parent 5eab79a commit e24a5d8
Show file tree
Hide file tree
Showing 22 changed files with 2,361 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package io.netty.handler.codec.http2;

import static io.netty.util.internal.ObjectUtil.checkNotNull;

import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.OneTimeTask;

import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;

/**
* Child {@link Channel} of another channel, for use for modeling streams as channels.
*/
abstract class AbstractHttp2StreamChannel extends AbstractChannel {
/**
* Used by subclasses to queue a close channel within the read queue. When read, it will close
* the channel (using Unsafe) instead of notifying handlers of the message with {@code
* channelRead()}. Additional inbound messages must not arrive after this one.
*/
protected static final Object CLOSE_MESSAGE = new Object();
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
/**
* Number of bytes to consider non-payload messages, to determine when to stop reading. 9 is
* arbitrary, but also the minimum size of an HTTP/2 frame. Primarily is non-zero.
*/
private static final int ARBITRARY_MESSAGE_SIZE = 9;

static {
CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
}

private final ChannelConfig config = new DefaultChannelConfig(this);
private final Queue<Object> inboundBuffer = new ArrayDeque<Object>(4);
private final Runnable fireChildReadCompleteTask = new Runnable() {
@Override
public void run() {
if (readInProgress) {
readInProgress = false;
unsafe().recvBufAllocHandle().readComplete();
pipeline().fireChannelReadComplete();
}
}
};

private boolean closed;
private boolean readInProgress;

public AbstractHttp2StreamChannel(Channel parent) {
super(parent);
}

@Override
public ChannelMetadata metadata() {
return METADATA;
}

@Override
public ChannelConfig config() {
return config;
}

@Override
public boolean isOpen() {
return !closed;
}

@Override
public boolean isActive() {
return !closed;
}

@Override
protected AbstractUnsafe newUnsafe() {
return new Unsafe();
}

@Override
protected boolean isCompatible(EventLoop loop) {
return true;
}

@Override
protected SocketAddress localAddress0() {
return parent().localAddress();
}

@Override
protected SocketAddress remoteAddress0() {
return parent().remoteAddress();
}

@Override
protected void doBind(SocketAddress localAddress) throws Exception {
throw new UnsupportedOperationException();
}

@Override
protected void doDisconnect() throws Exception {
throw new UnsupportedOperationException();
}

@Override
protected void doClose() throws Exception {
closed = true;
while (!inboundBuffer.isEmpty()) {
ReferenceCountUtil.release(inboundBuffer.poll());
}
}

@Override
protected void doBeginRead() {
if (readInProgress) {
return;
}

final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config());
if (inboundBuffer.isEmpty()) {
readInProgress = true;
return;
}

do {
Object m = inboundBuffer.poll();
if (m == null) {
break;
}
if (!doRead0(m, allocHandle)) {
// Channel closed, and already cleaned up.
return;
}
} while (allocHandle.continueReading());

allocHandle.readComplete();
pipeline().fireChannelReadComplete();
}

@Override
protected final void doWrite(ChannelOutboundBuffer in) throws Exception {
if (closed) {
throw CLOSED_CHANNEL_EXCEPTION;
}

EventExecutor preferredExecutor = preferredEventExecutor();

// TODO: this is pretty broken; futures should only be completed after they are processed on
// the parent channel. However, it isn't currently possible due to ChannelOutboundBuffer's
// behavior which requires completing the current future before getting the next message. It
// should become easier once we have outbound flow control support.
// https://github.com/netty/netty/issues/4941
if (preferredExecutor.inEventLoop()) {
for (;;) {
Object msg = in.current();
if (msg == null) {
break;
}
try {
doWrite(ReferenceCountUtil.retain(msg));
} catch (Throwable t) {
// It would be nice to fail the future, but we can't do that if not on the event
// loop. So we instead opt for a solution that is consistent.
pipeline().fireExceptionCaught(t);
}
in.remove();
}
doWriteComplete();
} else {
// Use a copy because the original msgs will be recycled by AbstractChannel.
final Object[] msgsCopy = new Object[in.size()];
for (int i = 0; i < msgsCopy.length; i ++) {
msgsCopy[i] = ReferenceCountUtil.retain(in.current());
in.remove();
}

preferredExecutor.execute(new OneTimeTask() {
@Override
public void run() {
for (Object msg : msgsCopy) {
try {
doWrite(msg);
} catch (Throwable t) {
pipeline().fireExceptionCaught(t);
}
}
doWriteComplete();
}
});
}
}

/**
* Process a single write. Guaranteed to eventually be followed by a {@link #doWriteComplete()},
* which denotes the end of the batch of writes. May be called from any thread.
*/
protected abstract void doWrite(Object msg) throws Exception;

/**
* Process end of batch of {@link #doWrite()}s. May be called from any thread.
*/
protected abstract void doWriteComplete();

/**
* The ideal thread for events like {@link #doWrite()} to be processed on. May be used for
* efficient batching, but not required.
*/
protected abstract EventExecutor preferredEventExecutor();

/**
* {@code bytes}-count of bytes provided to {@link #fireChildRead} have been read. May be called
* from any thread. Must not throw an exception.
*/
protected abstract void bytesConsumed(int bytes);

/**
* Receive a read message. This does not notify handlers unless a read is in progress on the
* channel. May be called from any thread.
*/
protected void fireChildRead(final Object msg) {
if (eventLoop().inEventLoop()) {
fireChildRead0(msg);
} else {
eventLoop().execute(new OneTimeTask() {
@Override
public void run() {
fireChildRead0(msg);
}
});
}
}

private void fireChildRead0(Object msg) {
if (closed) {
ReferenceCountUtil.release(msg);
return;
}
if (readInProgress) {
assert inboundBuffer.isEmpty();
// Check for null because inboundBuffer doesn't support null; we want to be consistent
// for what values are supported.
RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
readInProgress = doRead0(checkNotNull(msg, "msg"), allocHandle);
if (!allocHandle.continueReading()) {
fireChildReadCompleteTask.run();
}
} else {
inboundBuffer.add(msg);
}
}

protected void fireChildReadComplete() {
if (eventLoop().inEventLoop()) {
fireChildReadCompleteTask.run();
} else {
eventLoop().execute(fireChildReadCompleteTask);
}
}

/**
* Returns whether reads should continue. The only reason reads shouldn't continue is that the
* channel was just closed.
*/
private boolean doRead0(Object msg, RecvByteBufAllocator.Handle allocHandle) {
if (msg == CLOSE_MESSAGE) {
allocHandle.readComplete();
pipeline().fireChannelReadComplete();
unsafe().close(voidPromise());
return false;
}
int numBytesToBeConsumed = 0;
if (msg instanceof Http2DataFrame) {
Http2DataFrame data = (Http2DataFrame) msg;
numBytesToBeConsumed = data.content().readableBytes() + data.padding();
allocHandle.lastBytesRead(numBytesToBeConsumed);
} else {
allocHandle.lastBytesRead(ARBITRARY_MESSAGE_SIZE);
}
allocHandle.incMessagesRead(1);
pipeline().fireChannelRead(msg);
if (numBytesToBeConsumed != 0) {
bytesConsumed(numBytesToBeConsumed);
}
return true;
}

private final class Unsafe extends AbstractUnsafe {
@Override
public void connect(final SocketAddress remoteAddress,
SocketAddress localAddress, final ChannelPromise promise) {
promise.setFailure(new UnsupportedOperationException());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http2;

/**
* Abstract implementation of {@link Http2StreamFrame}.
*/
public abstract class AbstractHttp2StreamFrame implements Http2StreamFrame {
private Object stream;

@Override
public AbstractHttp2StreamFrame setStream(Object stream) {
this.stream = stream;
return this;
}

@Override
public Object stream() {
return stream;
}

/**
* Returns {@code true} if {@code o} has equal {@code stream} to this object.
*/
@Override
public boolean equals(Object o) {
if (!(o instanceof Http2StreamFrame)) {
return false;
}
Http2StreamFrame other = (Http2StreamFrame) o;
if (stream == null) {
return other.stream() == null;
}
return stream.equals(other.stream());
}

@Override
public int hashCode() {
if (stream == null) {
return 61432814;
}
return stream.hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ public Http2FrameListener frameListener() {
return listener;
}

// Visible for testing
Http2FrameListener internalFrameListener() {
return internalFrameListener;
}

@Override
public boolean prefaceReceived() {
return FrameReadListener.class == internalFrameListener.getClass();
Expand Down
Loading

0 comments on commit e24a5d8

Please sign in to comment.