Skip to content

Commit

Permalink
MDL-67385 libraries: Upgrde mongodb to version 1.5.1
Browse files Browse the repository at this point in the history
  • Loading branch information
rezaies committed Dec 16, 2019
1 parent 5a28e4e commit 02c64a4
Show file tree
Hide file tree
Showing 75 changed files with 2,445 additions and 1,067 deletions.
File renamed without changes.
7 changes: 5 additions & 2 deletions cache/stores/mongodb/MongoDB/BulkWriteResult.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@
*/
class BulkWriteResult
{
/** @var WriteResult */
private $writeResult;

/** @var mixed[] */
private $insertedIds;

/** @var boolean */
private $isAcknowledged;

/**
* Constructor.
*
* @param WriteResult $writeResult
* @param mixed[] $insertedIds
*/
Expand Down
200 changes: 113 additions & 87 deletions cache/stores/mongodb/MongoDB/ChangeStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

namespace MongoDB;

use MongoDB\BSON\Serializable;
use MongoDB\Driver\Cursor;
use Iterator;
use MongoDB\Driver\CursorId;
use MongoDB\Driver\Exception\ConnectionException;
use MongoDB\Driver\Exception\RuntimeException;
use MongoDB\Driver\Exception\ServerException;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\ResumeTokenException;
use IteratorIterator;
use Iterator;
use MongoDB\Model\ChangeStreamIterator;
use function call_user_func;
use function in_array;

/**
* Iterator for a change stream.
Expand All @@ -42,27 +42,39 @@ class ChangeStream implements Iterator
*/
const CURSOR_NOT_FOUND = 43;

private static $errorCodeCappedPositionLost = 136;
private static $errorCodeInterrupted = 11601;
private static $errorCodeCursorKilled = 237;
/** @var array */
private static $nonResumableErrorCodes = [
136, // CappedPositionLost
237, // CursorKilled
11601, // Interrupted
];

private $resumeToken;
/** @var callable */
private $resumeCallable;
private $csIt;

/** @var ChangeStreamIterator */
private $iterator;

/** @var integer */
private $key = 0;
private $hasAdvanced = false;

/**
* Constructor.
* Whether the change stream has advanced to its first result. This is used
* to determine whether $key should be incremented after an iteration event.
*
* @var boolean
*/
private $hasAdvanced = false;

/**
* @internal
* @param Cursor $cursor
* @param callable $resumeCallable
* @param ChangeStreamIterator $iterator
* @param callable $resumeCallable
*/
public function __construct(Cursor $cursor, callable $resumeCallable)
public function __construct(ChangeStreamIterator $iterator, callable $resumeCallable)
{
$this->iterator = $iterator;
$this->resumeCallable = $resumeCallable;
$this->csIt = new IteratorIterator($cursor);
}

/**
Expand All @@ -71,15 +83,29 @@ public function __construct(Cursor $cursor, callable $resumeCallable)
*/
public function current()
{
return $this->csIt->current();
return $this->iterator->current();
}

/**
* @return \MongoDB\Driver\CursorId
* @return CursorId
*/
public function getCursorId()
{
return $this->csIt->getInnerIterator()->getId();
return $this->iterator->getInnerIterator()->getId();
}

/**
* Returns the resume token for the iterator's current position.
*
* Null may be returned if no change documents have been iterated and the
* server did not include a postBatchResumeToken in its aggregate or getMore
* command response.
*
* @return array|object|null
*/
public function getResumeToken()
{
return $this->iterator->getResumeToken();
}

/**
Expand All @@ -91,60 +117,40 @@ public function key()
if ($this->valid()) {
return $this->key;
}

return null;
}

/**
* @see http://php.net/iterator.next
* @return void
* @throws ResumeTokenException
*/
public function next()
{
try {
$this->csIt->next();
if ($this->valid()) {
if ($this->hasAdvanced) {
$this->key++;
}
$this->hasAdvanced = true;
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
}
/* If the cursorId is 0, the server has invalidated the cursor so we
* will never perform another getMore. This means that we cannot
* resume and we can therefore unset the resumeCallable, which will
* free any reference to Watch. This will also free the only
* reference to an implicit session, since any such reference
* belongs to Watch. */
if ((string) $this->getCursorId() === '0') {
$this->resumeCallable = null;
}
$this->iterator->next();
$this->onIteration($this->hasAdvanced);
} catch (RuntimeException $e) {
if ($this->isResumableError($e)) {
$this->resume();
}
$this->resumeOrThrow($e);
}
}

/**
* @see http://php.net/iterator.rewind
* @return void
* @throws ResumeTokenException
*/
public function rewind()
{
try {
$this->csIt->rewind();
if ($this->valid()) {
$this->hasAdvanced = true;
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
}
// As with next(), free the callable once we know it will never be used.
if ((string) $this->getCursorId() === '0') {
$this->resumeCallable = null;
}
$this->iterator->rewind();
/* Unlike next() and resume(), the decision to increment the key
* does not depend on whether the change stream has advanced. This
* ensures that multiple calls to rewind() do not alter state. */
$this->onIteration(false);
} catch (RuntimeException $e) {
if ($this->isResumableError($e)) {
$this->resume();
}
$this->resumeOrThrow($e);
}
}

Expand All @@ -154,75 +160,95 @@ public function rewind()
*/
public function valid()
{
return $this->csIt->valid();
return $this->iterator->valid();
}

/**
* Extracts the resume token (i.e. "_id" field) from the change document.
* Determines if an exception is a resumable error.
*
* @param array|document $document Change document
* @return mixed
* @throws InvalidArgumentException
* @throws ResumeTokenException if the resume token is not found or invalid
* @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resumable-error
* @param RuntimeException $exception
* @return boolean
*/
private function extractResumeToken($document)
private function isResumableError(RuntimeException $exception)
{
if ( ! is_array($document) && ! is_object($document)) {
throw InvalidArgumentException::invalidType('$document', $document, 'array or object');
if ($exception instanceof ConnectionException) {
return true;
}

if ($document instanceof Serializable) {
return $this->extractResumeToken($document->bsonSerialize());
if (! $exception instanceof ServerException) {
return false;
}

$resumeToken = is_array($document)
? (isset($document['_id']) ? $document['_id'] : null)
: (isset($document->_id) ? $document->_id : null);

if ( ! isset($resumeToken)) {
throw ResumeTokenException::notFound();
if ($exception->hasErrorLabel('NonResumableChangeStreamError')) {
return false;
}

if ( ! is_array($resumeToken) && ! is_object($resumeToken)) {
throw ResumeTokenException::invalidType($resumeToken);
if (in_array($exception->getCode(), self::$nonResumableErrorCodes)) {
return false;
}

return $resumeToken;
return true;
}

/**
* Determines if an exception is a resumable error.
* Perform housekeeping after an iteration event.
*
* @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resumable-error
* @param RuntimeException $exception
* @return boolean
* @param boolean $incrementKey Increment $key if there is a current result
* @throws ResumeTokenException
*/
private function isResumableError(RuntimeException $exception)
private function onIteration($incrementKey)
{
if ($exception instanceof ConnectionException) {
return true;
/* If the cursorId is 0, the server has invalidated the cursor and we
* will never perform another getMore nor need to resume since any
* remaining results (up to and including the invalidate event) will
* have been received in the last response. Therefore, we can unset the
* resumeCallable. This will free any reference to Watch as well as the
* only reference to any implicit session created therein. */
if ((string) $this->getCursorId() === '0') {
$this->resumeCallable = null;
}

if ( ! $exception instanceof ServerException) {
return false;
/* Return early if there is not a current result. Avoid any attempt to
* increment the iterator's key. */
if (! $this->valid()) {
return;
}

if (in_array($exception->getCode(), [self::$errorCodeCappedPositionLost, self::$errorCodeCursorKilled, self::$errorCodeInterrupted])) {
return false;
if ($incrementKey) {
$this->key++;
}

return true;
$this->hasAdvanced = true;
}

/**
* Creates a new changeStream after a resumable server error.
* Recreates the ChangeStreamIterator after a resumable server error.
*
* @return void
*/
private function resume()
{
$newChangeStream = call_user_func($this->resumeCallable, $this->resumeToken);
$this->csIt = $newChangeStream->csIt;
$this->csIt->rewind();
$this->iterator = call_user_func($this->resumeCallable, $this->getResumeToken(), $this->hasAdvanced);
$this->iterator->rewind();

$this->onIteration($this->hasAdvanced);
}

/**
* Either resumes after a resumable error or re-throws the exception.
*
* @param RuntimeException $exception
* @throws RuntimeException
*/
private function resumeOrThrow(RuntimeException $exception)
{
if ($this->isResumableError($exception)) {
$this->resume();

return;
}

throw $exception;
}
}
Loading

0 comments on commit 02c64a4

Please sign in to comment.