Skip to content

Commit

Permalink
Fix a bug where SpdySession.getActiveStreams() returns incorrect set
Browse files Browse the repository at this point in the history
Related issue: netty#2743

Motivation:

When there are more than one stream with the same priority, the set
returned by SpdySession.getActiveStream() will not include all of them,
because it uses TreeSet and only compares the priority of streams. If
two different streams have the same priority, one of them will be
discarded by TreeSet.

Modification:

- Rename getActiveStreams() to activeStreams()
- Replace PriorityComparator with StreamComparator

Result:

Two different streams with the same priority are compared correctly.
  • Loading branch information
trustin committed Aug 6, 2014
1 parent 6ecca27 commit 168e81e
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@
import java.util.Comparator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_SESSION_STREAM_ID;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;

final class SpdySession {

private final AtomicInteger activeLocalStreams = new AtomicInteger();
private final AtomicInteger activeRemoteStreams = new AtomicInteger();
private final Map<Integer, StreamState> activeStreams = PlatformDependent.newConcurrentHashMap();

private final StreamComparator streamComparator = new StreamComparator();
private final AtomicInteger sendWindowSize;
private final AtomicInteger receiveWindowSize;

Expand All @@ -60,10 +59,10 @@ boolean isActiveStream(int streamId) {
}

// Stream-IDs should be iterated in priority order
Set<Integer> getActiveStreams() {
TreeSet<Integer> streamIds = new TreeSet<Integer>(new PriorityComparator());
streamIds.addAll(activeStreams.keySet());
return streamIds;
Map<Integer, StreamState> activeStreams() {
Map<Integer, StreamState> streams = new TreeMap<Integer, StreamState>(streamComparator);
streams.putAll(activeStreams);
return streams;
}

void acceptStream(
Expand Down Expand Up @@ -208,8 +207,8 @@ boolean putPendingWrite(int streamId, PendingWrite pendingWrite) {

PendingWrite getPendingWrite(int streamId) {
if (streamId == SPDY_SESSION_STREAM_ID) {
for (Integer id : getActiveStreams()) {
StreamState state = activeStreams.get(id);
for (Map.Entry<Integer, StreamState> e: activeStreams().entrySet()) {
StreamState state = e.getValue();
if (state.getSendWindowSize() > 0) {
PendingWrite pendingWrite = state.getPendingWrite();
if (pendingWrite != null) {
Expand Down Expand Up @@ -321,15 +320,23 @@ void clearPendingWrites(Throwable cause) {
}
}

private final class PriorityComparator implements Comparator<Integer>, Serializable {
private final class StreamComparator implements Comparator<Integer>, Serializable {

private static final long serialVersionUID = 1161471649740544848L;

StreamComparator() { }

@Override
public int compare(Integer id1, Integer id2) {
StreamState state1 = activeStreams.get(id1);
StreamState state2 = activeStreams.get(id2);
return state1.getPriority() - state2.getPriority();

int result = state1.getPriority() - state2.getPriority();
if (result != 0) {
return result;
}

return id1 - id2;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
for (Integer streamId: spdySession.getActiveStreams()) {
for (Integer streamId: spdySession.activeStreams().keySet()) {
removeStream(streamId, ctx.newSucceededFuture());
}
ctx.fireChannelInactive();
Expand Down

0 comments on commit 168e81e

Please sign in to comment.