Skip to content

Commit

Permalink
Initial test harness for h2spec
Browse files Browse the repository at this point in the history
  • Loading branch information
Lukasa authored and pgjones committed Aug 2, 2019
1 parent ab5b891 commit d588e7c
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 23 deletions.
96 changes: 73 additions & 23 deletions examples/asyncio/asyncio-server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@
This example demonstrates handling requests with bodies, as well as handling
those without. In particular, it demonstrates the fact that DataReceived may
be called multiple times, and that applications must handle that possibility.
Please note that this example does not handle flow control, and so only works
properly for relatively small requests. Please see other examples to understand
how flow control should work.
"""
import asyncio
import io
Expand All @@ -23,10 +19,11 @@
from h2.config import H2Configuration
from h2.connection import H2Connection
from h2.events import (
ConnectionTerminated, DataReceived, RequestReceived, StreamEnded
ConnectionTerminated, DataReceived, RequestReceived, StreamEnded,
StreamReset
)
from h2.errors import ErrorCodes
from h2.exceptions import ProtocolError
from h2.exceptions import ProtocolError, StreamClosedError


RequestData = collections.namedtuple('RequestData', ['headers', 'data'])
Expand All @@ -38,12 +35,18 @@ def __init__(self):
self.conn = H2Connection(config=config)
self.transport = None
self.stream_data = {}
self.flow_control_futures = {}

def connection_made(self, transport: asyncio.Transport):
self.transport = transport
self.conn.initiate_connection()
self.transport.write(self.conn.data_to_send())

def connection_lost(self, exc):
for future in self.flow_control_futures.values():
future.cancel()
self.flow_control_futures = {}

def data_received(self, data: bytes):
try:
events = self.conn.receive_data(data)
Expand All @@ -61,18 +64,15 @@ def data_received(self, data: bytes):
self.stream_complete(event.stream_id)
elif isinstance(event, ConnectionTerminated):
self.transport.close()
elif isinstance(event, StreamReset):
self.stream_reset(event.stream_id)

self.transport.write(self.conn.data_to_send())

def request_received(self, headers: List[Tuple[str, str]], stream_id: int):
headers = collections.OrderedDict(headers)
method = headers[':method']

# We only support GET and POST.
if method not in ('GET', 'POST'):
self.return_405(headers, stream_id)
return

# Store off the request data.
request_data = RequestData(headers, io.BytesIO())
self.stream_data[stream_id] = request_data
Expand Down Expand Up @@ -101,18 +101,7 @@ def stream_complete(self, stream_id: int):
('server', 'asyncio-h2'),
)
self.conn.send_headers(stream_id, response_headers)
self.conn.send_data(stream_id, data, end_stream=True)

def return_405(self, headers: List[Tuple[str, str]], stream_id: int):
"""
We don't support the given method, so we want to return a 405 response.
"""
response_headers = (
(':status', '405'),
('content-length', '0'),
('server', 'asyncio-h2'),
)
self.conn.send_headers(stream_id, response_headers, end_stream=True)
asyncio.ensure_future(self.send_data(data, stream_id))

def receive_data(self, data: bytes, stream_id: int):
"""
Expand All @@ -128,6 +117,67 @@ def receive_data(self, data: bytes, stream_id: int):
else:
stream_data.data.write(data)

def stream_reset(self, stream_id):
"""
A stream reset was sent. Stop sending data.
"""
if stream_id in self.flow_control_futures:
future = self.flow_control_futures.pop(stream_id)
future.cancel()

async def send_data(self, data, stream_id):
"""
Send data according to the flow control rules.
"""
while data:
while not self.conn.local_flow_control_window(stream_id):
try:
await self.wait_for_flow_control(stream_id)
except asyncio.CancelledError:
return

chunk_size = min(
self.conn.local_flow_control_window(stream_id),
len(data),
self.conn.max_outbound_frame_size,
)

try:
self.conn.send_data(
stream_id,
data[:chunk_size],
end_stream=(chunk_size == len(data))
)
except (StreamClosedError, ProtocolError):
# The stream got closed and we didn't get told. We're done
# here.
break

self.transport.write(self.conn.data_to_send())
data = data[chunk_size:]

async def wait_for_flow_control(self, stream_id):
"""
Waits for a Future that fires when the flow control window is opened.
"""
f = asyncio.Future()
self.flow_control_futures[stream_id] = f
await f

def window_updated(self, stream_id, delta):
"""
A window update frame was received. Unblock some number of flow control
Futures.
"""
if stream_id and stream_id in self.flow_control_futures:
f = self.flow_control_futures.pop(stream_id)
f.set_result(delta)
elif not stream_id:
for f in self.flow_control_futures.values():
f.set_result(delta)

self.flow_control_futures = {}


ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.options |= (
Expand Down
19 changes: 19 additions & 0 deletions test/h2spectest.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env bash
# A test script that runs the example Python Twisted server and then runs
# h2spec against it. Prints the output of h2spec. This script does not expect
# to be run directly, but instead via `tox -e h2spec`.

set -x

# Kill all background jobs on exit.
trap 'kill $(jobs -p)' EXIT

pushd examples/asyncio
python asyncio-server.py &
popd

# Wait briefly to let the server start up
sleep 2

# Go go go!
h2spec -k -t -v -p 8443
7 changes: 7 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,10 @@ deps =
commands =
check-manifest
python setup.py check --metadata --restructuredtext --strict

[testenv:h2spec]
basepython=python3.6
deps = twisted[tls]==17.1.0
whitelist_externals = {toxinidir}/test/h2spectest.sh
commands =
{toxinidir}/test/h2spectest.sh

0 comments on commit d588e7c

Please sign in to comment.