Skip to content

Commit

Permalink
Merge branch 'http-2-go-away-handling' into 5.x
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Nov 28, 2024
2 parents 7580dc2 + ebc7cdd commit d23e0b8
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 226 deletions.
1 change: 0 additions & 1 deletion src/Connection/Http2Connection.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
<?php declare(strict_types=1);
/** @noinspection PhpUnhandledExceptionInspection */

namespace Amp\Http\Client\Connection;

Expand Down
106 changes: 54 additions & 52 deletions src/Connection/Internal/Http2ConnectionProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ final class Http2ConnectionProcessor implements Http2Processor

private bool $hasWriteError = false;

private int|null $shutdown = null;
private ?int $shutdown = null;

private readonly Queue $frameQueue;

Expand Down Expand Up @@ -1073,6 +1073,8 @@ public function isClosed(): bool

private function runReadFiber(): void
{
$parser = new Http2Parser($this, $this->hpack);

try {
$this->frameQueue->push(Http2Parser::PREFACE);

Expand All @@ -1090,19 +1092,7 @@ private function runReadFiber(): void
self::DEFAULT_MAX_FRAME_SIZE
)
)->await();
} catch (\Throwable $exception) {
$this->shutdown(new SocketException(
"The HTTP/2 connection closed" . ($this->shutdown !== null ? ' unexpectedly' : ''),
$this->shutdown ?? Http2Parser::GRACEFUL_SHUTDOWN,
$exception,
), 0);

return;
}

$parser = new Http2Parser($this, $this->hpack);

try {
while (null !== $chunk = $this->socket->read()) {
$parser->push($chunk);
}
Expand All @@ -1113,13 +1103,24 @@ private function runReadFiber(): void
Http2Parser::INTERNAL_ERROR,
$exception,
));

return;
} finally {
$parser->cancel();
}

$this->shutdown();
$this->cancelPongWatcher(false);

if ($this->onClose !== null) {
$onClose = $this->onClose;
$this->onClose = null;

foreach ($onClose as $callback) {
EventLoop::queue($callback);
}
}

$this->frameQueue->complete();

$this->shutdown();
}
}

private function writeFrame(
Expand All @@ -1128,7 +1129,7 @@ private function writeFrame(
int $stream = 0,
string $data = ''
): Future {
if ($this->shutdown !== null) {
if ($this->frameQueue->isComplete()) {
return Future::complete();
}

Expand Down Expand Up @@ -1367,7 +1368,16 @@ private function releaseStream(int $streamId, ?\Throwable $exception, bool $unpr
}
}

if (!$this->streams && !$this->socket->isClosed() && $this->socket instanceof ResourceStream) {
if ($this->streams) {
return;
}

if ($this->shutdown !== null) {
$this->socket->close();
return;
}

if ($this->socket instanceof ResourceStream) {
$this->socket->unreference();
}
}
Expand Down Expand Up @@ -1406,8 +1416,7 @@ private function cancelPongWatcher(bool $receivedPong): void

/**
* @param HttpException|null $reason Shutdown reason.
* @param int|null $lastId ID of last processed frame. Null to use the last opened frame ID or 0 if no
* streams have been opened.
* @param int|null $lastId ID of last processed frame if available.
*/
private function shutdown(?HttpException $reason = null, ?int $lastId = null): void
{
Expand All @@ -1427,47 +1436,40 @@ private function shutdown(?HttpException $reason = null, ?int $lastId = null): v
$this->settings = null;
}

$exception = $reason;
foreach ($this->streams as $id => $stream) {
$unprocessed = $lastId !== null && $id > $lastId;

$this->releaseStream($id, $exception, $unprocessed);
}

$previous = $reason->getPrevious();
$previous = $previous instanceof Http2ConnectionException ? $previous : null;

$code = $previous?->getCode() ?? Http2Parser::GRACEFUL_SHUTDOWN;

$message = match ($code) {
Http2Parser::PROTOCOL_ERROR,
Http2Parser::FLOW_CONTROL_ERROR,
Http2Parser::FRAME_SIZE_ERROR,
Http2Parser::COMPRESSION_ERROR,
Http2Parser::SETTINGS_TIMEOUT,
Http2Parser::ENHANCE_YOUR_CALM => $previous?->getMessage(),
default => null,
};

$this->writeFrame(Http2Parser::GOAWAY, data: \pack('N', $code) . $message)
->finally($this->socket->close(...))
->ignore();

$this->cancelPongWatcher(false);

if ($this->onClose !== null) {
$onClose = $this->onClose;
$this->onClose = null;
$this->shutdown = $code;

foreach ($onClose as $callback) {
EventLoop::queue($callback);
if ($lastId === null) {
$message = match ($code) {
Http2Parser::PROTOCOL_ERROR,
Http2Parser::FLOW_CONTROL_ERROR,
Http2Parser::FRAME_SIZE_ERROR,
Http2Parser::COMPRESSION_ERROR,
Http2Parser::SETTINGS_TIMEOUT,
Http2Parser::ENHANCE_YOUR_CALM => $previous?->getMessage(),
default => null,
};

$this->writeFrame(Http2Parser::GOAWAY, data: \pack('NN', 0, $code) . $message)->ignore();

foreach ($this->streams as $id => $stream) {
$this->releaseStream($id, $reason, unprocessed: false);
}

return;
}

$this->shutdown = $code;
$this->frameQueue->complete();
foreach ($this->streams as $id => $stream) {
if ($id <= $lastId) {
continue;
}

\assert(empty($this->streams), 'Streams array not empty after shutdown');
$this->releaseStream($id, $reason, unprocessed: true);
}
}

/**
Expand Down
3 changes: 1 addition & 2 deletions src/Connection/Internal/Http2Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ final class Http2Stream
/** @var int Bytes received on the stream. */
public int $received = 0;

public int $bufferSize;
public int $bufferSize = 0;

public string $requestBodyBuffer = '';

Expand Down Expand Up @@ -68,7 +68,6 @@ public function __construct(
$this->pendingResponse = new DeferredFuture();
$this->requestBodyCompletion = new DeferredFuture();
$this->body = new Queue();
$this->bufferSize = 0;

// Trailers future may never be exposed to the user if the request fails, so ignore.
$this->trailers = new DeferredFuture();
Expand Down
19 changes: 16 additions & 3 deletions test/Connection/ConnectionLimitingPoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Amp\Http\Client\Trailers;
use Amp\PHPUnit\AsyncTestCase;
use Amp\Socket\InternetAddress;
use PHPUnit\Framework\MockObject\MockObject;
use Revolt\EventLoop;
use function Amp\async;
use function Amp\delay;
Expand Down Expand Up @@ -89,17 +90,29 @@ public function testConnectionBecomingAvailableWhileConnecting(): void
$factory->expects(self::exactly(2))
->method('create')
->willReturnCallback(function () use ($connection): Connection {
delay(0.5);
static $count = 0;
if (!$count++) {
return $connection;
}

delay(0.25);
$connection = $this->createMockConnection(new Request('http://localhost'));
$connection->expects(self::never())
->method('getStream');

return $connection;
});

$connection->expects(self::exactly(2))
->method('getStream');

$pool = ConnectionLimitingPool::byAuthority(2, $factory);

$client = (new HttpClientBuilder)
->usingPool($pool)
->build();

$this->setTimeout(0.75);
$this->setTimeout(0.5);

Future\await([
async(fn () => $client->request(new Request('http://localhost'))),
Expand Down Expand Up @@ -145,7 +158,7 @@ public function testConnectionNotClosedWhileInUse(): void
}
}

private function createMockConnection(Request $request): Connection
private function createMockConnection(Request $request): Connection&MockObject
{
$response = new Response('1.1', 200, null, [], new ReadableBuffer, $request, Future::complete(new Trailers([])));

Expand Down
Loading

0 comments on commit d23e0b8

Please sign in to comment.