Skip to content

Commit

Permalink
handle the crypto stream separately in the packet packer
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Oct 1, 2018
1 parent f686214 commit 25847cf
Show file tree
Hide file tree
Showing 12 changed files with 122 additions and 136 deletions.
1 change: 1 addition & 0 deletions crypto_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type cryptoStream interface {
io.Reader
io.Writer
handleStreamFrame(*wire.StreamFrame) error
hasData() bool
popStreamFrame(protocol.ByteCount) (*wire.StreamFrame, bool)
closeForShutdown(error)
setReadOffset(protocol.ByteCount)
Expand Down
30 changes: 5 additions & 25 deletions framer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ type framer struct {
cryptoStream cryptoStream
version protocol.VersionNumber

streamQueueMutex sync.Mutex
activeStreams map[protocol.StreamID]struct{}
streamQueue []protocol.StreamID
hasCryptoStreamData bool
streamQueueMutex sync.Mutex
activeStreams map[protocol.StreamID]struct{}
streamQueue []protocol.StreamID

controlFrameMutex sync.Mutex
controlFrames []wire.Frame
Expand Down Expand Up @@ -57,13 +56,9 @@ func (f *framer) AppendControlFrames(frames []wire.Frame, maxLen protocol.ByteCo
return frames, length
}

// AddActiveStream adds a stream that has data to write.
// It should not be used for the crypto stream.
func (f *framer) AddActiveStream(id protocol.StreamID) {
if id == f.version.CryptoStreamID() { // the crypto stream is handled separately
f.streamQueueMutex.Lock()
f.hasCryptoStreamData = true
f.streamQueueMutex.Unlock()
return
}
f.streamQueueMutex.Lock()
if _, ok := f.activeStreams[id]; !ok {
f.streamQueue = append(f.streamQueue, id)
Expand All @@ -72,21 +67,6 @@ func (f *framer) AddActiveStream(id protocol.StreamID) {
f.streamQueueMutex.Unlock()
}

func (f *framer) HasCryptoStreamData() bool {
f.streamQueueMutex.Lock()
hasCryptoStreamData := f.hasCryptoStreamData
f.streamQueueMutex.Unlock()
return hasCryptoStreamData
}

func (f *framer) PopCryptoStreamFrame(maxLen protocol.ByteCount) *wire.StreamFrame {
f.streamQueueMutex.Lock()
frame, hasMoreData := f.cryptoStream.popStreamFrame(maxLen)
f.hasCryptoStreamData = hasMoreData
f.streamQueueMutex.Unlock()
return frame
}

func (f *framer) AppendStreamFrames(frames []wire.Frame, maxLen protocol.ByteCount) []wire.Frame {
var length protocol.ByteCount
f.streamQueueMutex.Lock()
Expand Down
32 changes: 0 additions & 32 deletions framer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,38 +72,6 @@ var _ = Describe("Stream Framer", func() {
})
})

Context("handling the crypto stream", func() {
It("says if it has crypto stream data", func() {
Expect(framer.HasCryptoStreamData()).To(BeFalse())
framer.AddActiveStream(framer.version.CryptoStreamID())
Expect(framer.HasCryptoStreamData()).To(BeTrue())
})

It("says that it doesn't have crypto stream data after popping all data", func() {
streamID := framer.version.CryptoStreamID()
f := &wire.StreamFrame{
StreamID: streamID,
Data: []byte("foobar"),
}
cryptoStream.EXPECT().popStreamFrame(protocol.ByteCount(1000)).Return(f, false)
framer.AddActiveStream(streamID)
Expect(framer.PopCryptoStreamFrame(1000)).To(Equal(f))
Expect(framer.HasCryptoStreamData()).To(BeFalse())
})

It("says that it has more crypto stream data if not all data was popped", func() {
streamID := framer.version.CryptoStreamID()
f := &wire.StreamFrame{
StreamID: streamID,
Data: []byte("foobar"),
}
cryptoStream.EXPECT().popStreamFrame(protocol.ByteCount(1000)).Return(f, true)
framer.AddActiveStream(streamID)
Expect(framer.PopCryptoStreamFrame(1000)).To(Equal(f))
Expect(framer.HasCryptoStreamData()).To(BeTrue())
})
})

Context("popping STREAM frames", func() {
It("returns nil when popping an empty framer", func() {
Expect(framer.AppendStreamFrames(nil, 1000)).To(BeEmpty())
Expand Down
12 changes: 12 additions & 0 deletions mock_crypto_stream_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 0 additions & 24 deletions mock_frame_source_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions mock_send_stream_internal_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions mock_stream_internal_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 16 additions & 9 deletions packet_packer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ type sealingManager interface {
}

type frameSource interface {
HasCryptoStreamData() bool
PopCryptoStreamFrame(protocol.ByteCount) *wire.StreamFrame
AppendStreamFrames([]wire.Frame, protocol.ByteCount) []wire.Frame
AppendControlFrames([]wire.Frame, protocol.ByteCount) ([]wire.Frame, protocol.ByteCount)
}
Expand Down Expand Up @@ -98,6 +96,7 @@ type packetPacker struct {

packetNumberGenerator *packetNumberGenerator
getPacketNumberLen func(protocol.PacketNumber) protocol.PacketNumberLen
cryptoStream cryptoStream
framer frameSource
acks ackFrameSource

Expand All @@ -117,13 +116,15 @@ func newPacketPacker(
remoteAddr net.Addr, // only used for determining the max packet size
token []byte,
divNonce []byte,
cryptoStream cryptoStream,
cryptoSetup sealingManager,
framer frameSource,
acks ackFrameSource,
perspective protocol.Perspective,
version protocol.VersionNumber,
) *packetPacker {
return &packetPacker{
cryptoStream: cryptoStream,
cryptoSetup: cryptoSetup,
divNonce: divNonce,
token: token,
Expand Down Expand Up @@ -306,14 +307,17 @@ func (p *packetPacker) packHandshakeRetransmission(packet *ackhandler.Packet) (*
// PackPacket packs a new packet
// the other controlFrames are sent in the next packet, but might be queued and sent in the next packet if the packet would overflow MaxPacketSize otherwise
func (p *packetPacker) PackPacket() (*packedPacket, error) {
hasCryptoStreamFrame := p.framer.HasCryptoStreamData()
packet, err := p.maybePackCryptoPacket()
if err != nil {
return nil, err
}
if packet != nil {
return packet, nil
}
// if this is the first packet to be send, make sure it contains stream data
if !p.hasSentPacket && !hasCryptoStreamFrame {
if !p.hasSentPacket && packet == nil {
return nil, nil
}
if hasCryptoStreamFrame {
return p.packCryptoPacket()
}

encLevel, sealer := p.cryptoSetup.GetSealer()

Expand Down Expand Up @@ -357,15 +361,18 @@ func (p *packetPacker) PackPacket() (*packedPacket, error) {
}, nil
}

func (p *packetPacker) packCryptoPacket() (*packedPacket, error) {
func (p *packetPacker) maybePackCryptoPacket() (*packedPacket, error) {
if !p.cryptoStream.hasData() {
return nil, nil
}
encLevel, sealer := p.cryptoSetup.GetSealerForCryptoStream()
header := p.getHeader(encLevel)
headerLength, err := header.GetLength(p.version)
if err != nil {
return nil, err
}
maxLen := p.maxPacketSize - protocol.ByteCount(sealer.Overhead()) - protocol.NonForwardSecurePacketSizeReduction - headerLength
sf := p.framer.PopCryptoStreamFrame(maxLen)
sf, _ := p.cryptoStream.popStreamFrame(maxLen)
sf.DataLenPresent = false
frames := []wire.Frame{sf}
raw, err := p.writeAndSealPacket(header, frames, sealer)
Expand Down
Loading

0 comments on commit 25847cf

Please sign in to comment.