Skip to content

Commit

Permalink
Add support for HTTP/2 draft 12.
Browse files Browse the repository at this point in the history
Motivation:

Draft 12 has just arrived and has quite a few changes. Need to update in
order to keep current with the spec.

Modifications:

This is a rewrite of the original (draft 10) code. There are only 2
handlers now: preface and connection. The connection handler is now
callback based rather than frame based (there are no frame classes
anymore). AbstractHttp2ConnectionHandler is the base class for any
HTTP/2 handlers. All of the stream priority logic now resides in the
outbound flow controller, and its interface exposes methods for
adding/updating priority for streams.

Upgraded to hpack 0.7.0, which is used by draft12. Also removed
draft10 code and moved draft12 code to the ../http2 package
(no draft subpackage).

Result:

Addition of a HTTP/2 draft 12 support.
  • Loading branch information
nmittler authored and Norman Maurer committed May 6, 2014
1 parent b1a51f0 commit ca7c53d
Show file tree
Hide file tree
Showing 126 changed files with 9,492 additions and 8,646 deletions.
2 changes: 1 addition & 1 deletion codec-http2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<dependency>
<groupId>com.twitter</groupId>
<artifactId>hpack</artifactId>
<version>0.6.0</version>
<version>0.7.0</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,341 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package io.netty.handler.codec.http2;

import static io.netty.handler.codec.http2.Http2Exception.format;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
import io.netty.handler.codec.http2.Http2Stream.State;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;

/**
* Simple implementation of {@link Http2Connection}.
*/
public class DefaultHttp2Connection implements Http2Connection {

private final Map<Integer, Http2Stream> streamMap = new HashMap<Integer, Http2Stream>();
private final Set<Http2Stream> activeStreams = new LinkedHashSet<Http2Stream>();
private final DefaultEndpoint localEndpoint;
private final DefaultEndpoint remoteEndpoint;
private boolean goAwaySent;
private boolean goAwayReceived;
private boolean server;

public DefaultHttp2Connection(boolean server, boolean allowCompressedData) {
this.server = server;
localEndpoint = new DefaultEndpoint(server, allowCompressedData);
remoteEndpoint = new DefaultEndpoint(!server, false);
}

@Override
public boolean isServer() {
return server;
}

@Override
public Http2Stream requireStream(int streamId) throws Http2Exception {
Http2Stream stream = stream(streamId);
if (stream == null) {
throw protocolError("Stream does not exist %d", streamId);
}
return stream;
}

@Override
public Http2Stream stream(int streamId) {
return streamMap.get(streamId);
}

@Override
public int numActiveStreams() {
return activeStreams.size();
}

@Override
public Set<Http2Stream> activeStreams() {
return Collections.unmodifiableSet(activeStreams);
}

@Override
public Endpoint local() {
return localEndpoint;
}

@Override
public Endpoint remote() {
return remoteEndpoint;
}

@Override
public void goAwaySent() {
goAwaySent = true;
}

@Override
public void goAwayReceived() {
goAwayReceived = true;
}

@Override
public boolean isGoAwaySent() {
return goAwaySent;
}

@Override
public boolean isGoAwayReceived() {
return goAwayReceived;
}

@Override
public boolean isGoAway() {
return isGoAwaySent() || isGoAwayReceived();
}

/**
* Simple stream implementation. Streams can be compared to each other by priority.
*/
private final class DefaultStream implements Http2Stream {
private final int id;
private State state = State.IDLE;

DefaultStream(int id) {
this.id = id;
}

@Override
public int id() {
return id;
}

@Override
public State state() {
return state;
}

@Override
public Http2Stream verifyState(Http2Error error, State... allowedStates) throws Http2Exception {
for (State allowedState : allowedStates) {
if (state == allowedState) {
return this;
}
}
throw format(error, "Stream %d in unexpected state: %s", id, state);
}

@Override
public Http2Stream openForPush() throws Http2Exception {
switch (state) {
case RESERVED_LOCAL:
state = State.HALF_CLOSED_REMOTE;
break;
case RESERVED_REMOTE:
state = State.HALF_CLOSED_LOCAL;
break;
default:
throw protocolError("Attempting to open non-reserved stream for push");
}
return this;
}

@Override
public Http2Stream close() {
if (state == State.CLOSED) {
return this;
}

state = State.CLOSED;
activeStreams.remove(this);
streamMap.remove(id);
return this;
}

@Override
public Http2Stream closeLocalSide() {
switch (state) {
case OPEN:
state = State.HALF_CLOSED_LOCAL;
break;
case HALF_CLOSED_LOCAL:
break;
default:
close();
break;
}
return this;
}

@Override
public Http2Stream closeRemoteSide() {
switch (state) {
case OPEN:
state = State.HALF_CLOSED_REMOTE;
break;
case HALF_CLOSED_REMOTE:
break;
default:
close();
break;
}
return this;
}

@Override
public boolean remoteSideOpen() {
return state == HALF_CLOSED_LOCAL || state == OPEN || state == RESERVED_REMOTE;
}

@Override
public boolean localSideOpen() {
return state == HALF_CLOSED_REMOTE || state == OPEN || state == RESERVED_LOCAL;
}
}

/**
* Simple endpoint implementation.
*/
private final class DefaultEndpoint implements Endpoint {
private int nextStreamId;
private int lastStreamCreated;
private int maxStreams = Integer.MAX_VALUE;
private boolean pushToAllowed = true;
private boolean allowCompressedData;

DefaultEndpoint(boolean serverEndpoint, boolean allowCompressedData) {
// Determine the starting stream ID for this endpoint. Zero is reserved for the
// connection and 1 is reserved for responding to an upgrade from HTTP 1.1.
// Client-initiated streams use odd identifiers and server-initiated streams use
// even.
nextStreamId = serverEndpoint ? 2 : 3;
this.allowCompressedData = allowCompressedData;
}

@Override
public DefaultStream createStream(int streamId, boolean halfClosed) throws Http2Exception {
checkNewStreamAllowed(streamId);

// Create and initialize the stream.
DefaultStream stream = new DefaultStream(streamId);
if (halfClosed) {
stream.state = isLocal() ? State.HALF_CLOSED_LOCAL : State.HALF_CLOSED_REMOTE;
} else {
stream.state = State.OPEN;
}

// Update the next and last stream IDs.
nextStreamId += 2;
lastStreamCreated = streamId;

// Register the stream and mark it as active.
streamMap.put(streamId, stream);
activeStreams.add(stream);
return stream;
}

@Override
public DefaultStream reservePushStream(int streamId, Http2Stream parent)
throws Http2Exception {
if (parent == null) {
throw protocolError("Parent stream missing");
}
if (isLocal() ? !parent.localSideOpen() : !parent.remoteSideOpen()) {
throw protocolError("Stream %d is not open for sending push promise", parent.id());
}
if (!opposite().allowPushTo()) {
throw protocolError("Server push not allowed to opposite endpoint.");
}

// Create and initialize the stream.
DefaultStream stream = new DefaultStream(streamId);
stream.state = isLocal() ? State.RESERVED_LOCAL : State.RESERVED_REMOTE;

// Update the next and last stream IDs.
nextStreamId += 2;
lastStreamCreated = streamId;

// Register the stream.
streamMap.put(streamId, stream);
return stream;
}

@Override
public void allowPushTo(boolean allow) {
pushToAllowed = allow;
}

@Override
public boolean allowPushTo() {
return pushToAllowed;
}

@Override
public int maxStreams() {
return maxStreams;
}

@Override
public void maxStreams(int maxStreams) {
this.maxStreams = maxStreams;
}

@Override
public boolean allowCompressedData() {
return allowCompressedData;
}

@Override
public void allowCompressedData(boolean allow) {
allowCompressedData = allow;
}

@Override
public int lastStreamCreated() {
return lastStreamCreated;
}

@Override
public Endpoint opposite() {
return isLocal() ? remoteEndpoint : localEndpoint;
}

private void checkNewStreamAllowed(int streamId) throws Http2Exception {
if (isGoAway()) {
throw protocolError("Cannot create a stream since the connection is going away");
}
if (nextStreamId < 0) {
throw protocolError("No more streams can be created on this connection");
}
if (streamId != nextStreamId) {
throw protocolError("Incorrect next stream ID requested: %d", streamId);
}
if (streamMap.size() + 1 > maxStreams) {
throw protocolError("Maximum streams exceeded for this endpoint.");
}
}

private boolean isLocal() {
return this == localEndpoint;
}
}
}
Loading

0 comments on commit ca7c53d

Please sign in to comment.