Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added unit test to demonstrate that @malarzm's atomicset feature seems to fix the data corruption problem #3

Closed
wants to merge 10 commits into from
2 changes: 2 additions & 0 deletions docs/en/reference/annotations-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1193,6 +1193,8 @@ Alias of `@Index`_, with the ``unique`` option set by default.
/** @String @UniqueIndex */
private $email;

.. _annotations_reference_version:

@Version
--------

Expand Down
23 changes: 22 additions & 1 deletion docs/en/reference/collection-strategies.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,34 @@ strategy, but will first numerically reindex the collection to ensure that it is
stored as a BSON array.

``pushAll``
------------
-----------

The ``pushAll`` strategy uses MongoDB's `$pushAll`_ operator to insert
elements into the array. MongoDB does not allow elements to be added and removed
from an array in a single operation, so this strategy relies on multiple update
queries to remove and insert elements (in that order).

``atomicSet``
-------------

The ``atomicSet`` strategy uses MongoDB's `$set`_ operator to update the entire
collection with a single update query. Unlike with ``set`` strategy there will
be only one query for updating both parent document and collection itself. This
strategy can be especially useful when dealing with high concurrency and
:ref:`versioned documents <annotations_reference_version>`.

.. note::

Only top level document can contain collection with ``atomicSet`` or
``atomicSetArray``

``atomicSetArray``
------------------

The ``atomicSetArray`` strategy works exactly like ``atomicSet`` strategy, but
will first numerically reindex the collection to ensure that it is stored as a
BSON array.

.. _`$addToSet`: http://docs.mongodb.org/manual/reference/operator/addToSet/
.. _`$pushAll`: http://docs.mongodb.org/manual/reference/operator/pushAll/
.. _`$set`: http://docs.mongodb.org/manual/reference/operator/set/
Expand Down
13 changes: 13 additions & 0 deletions lib/Doctrine/ODM/MongoDB/Mapping/ClassMetadataInfo.php
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,11 @@ public function mapField(array $mapping)
if (isset($mapping['reference']) && ! empty($mapping['simple']) && ! isset($mapping['targetDocument'])) {
throw MappingException::simpleReferenceRequiresTargetDocument($this->name, $mapping['fieldName']);
}

if ($this->isEmbeddedDocument && isset($mapping['strategy']) &&
($mapping['strategy'] === 'atomicSet' || $mapping['strategy'] === 'atomicSetArray')) {
throw MappingException::atomicCollectionStrategyNotAllowed($mapping['strategy'], $this->name, $mapping['fieldName']);
}

if (isset($mapping['reference']) && $mapping['type'] === 'one') {
$mapping['association'] = self::REFERENCE_ONE;
Expand Down Expand Up @@ -1204,6 +1209,10 @@ public function mapOneEmbedded(array $mapping)
*/
public function mapManyEmbedded(array $mapping)
{
if ($this->isEmbeddedDocument && isset($mapping['strategy']) &&
($mapping['strategy'] === 'atomicSet' || $mapping['strategy'] === 'atomicSetArray')) {
throw MappingException::atomicCollectionStrategyNotAllowed($mapping['strategy'], $this->name, $mapping['fieldName']);
}
$mapping['embedded'] = true;
$mapping['type'] = 'many';
$this->mapField($mapping);
Expand All @@ -1228,6 +1237,10 @@ public function mapOneReference(array $mapping)
*/
public function mapManyReference(array $mapping)
{
if ($this->isEmbeddedDocument && isset($mapping['strategy']) &&
($mapping['strategy'] === 'atomicSet' || $mapping['strategy'] === 'atomicSetArray')) {
throw MappingException::atomicCollectionStrategyNotAllowed($mapping['strategy'], $this->name, $mapping['fieldName']);
}
$mapping['reference'] = true;
$mapping['type'] = 'many';
$this->mapField($mapping);
Expand Down
5 changes: 5 additions & 0 deletions lib/Doctrine/ODM/MongoDB/Mapping/MappingException.php
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,9 @@ public static function simpleReferenceRequiresTargetDocument($className, $fieldN
{
return new self("Target document must be specified for simple reference: $className::$fieldName");
}

public static function atomicCollectionStrategyNotAllowed($strategy, $className, $fieldName)
{
return new self("$strategy collection strategy can be used only in top level document, used in $className::$fieldName");
}
}
84 changes: 55 additions & 29 deletions lib/Doctrine/ODM/MongoDB/Persisters/CollectionPersister.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,19 @@ public function __construct(DocumentManager $dm, PersistenceBuilder $pb, UnitOfW
$this->pb = $pb;
$this->uow = $uow;
}

/**
* INTERNAL:
* Prepares $unset query for PersistentCollection removal.
*
* @param \Doctrine\ODM\MongoDB\PersistentCollection $coll
* @return array
*/
public function prepareDeleteQuery(PersistentCollection $coll)
{
list($propertyPath) = $this->getPathAndParent($coll);
return array('$unset' => array($propertyPath => true));
}

/**
* Deletes a PersistentCollection instance completely from a document using $unset.
Expand All @@ -82,8 +95,11 @@ public function delete(PersistentCollection $coll, array $options)
if ($mapping['isInverseSide']) {
return; // ignore inverse side
}
list($propertyPath, $parent) = $this->getPathAndParent($coll);
$query = array('$unset' => array($propertyPath => true));
if ($mapping['strategy'] === "atomicSet" || $mapping['strategy'] === "atomicSetArray") {
throw new \UnexpectedValueException($mapping['strategy'] . ' delete collection strategy should have been handled by DocumentPersister. Please report a bug in issue tracker');
}
list(, $parent) = $this->getPathAndParent($coll);
$query = $this->prepareDeleteQuery($coll);
$this->executeQuery($parent, $query, $options);
}

Expand All @@ -103,9 +119,12 @@ public function update(PersistentCollection $coll, array $options)
}

switch ($mapping['strategy']) {
case 'atomicSet':
case 'atomicSetArray':
throw new \UnexpectedValueException($mapping['strategy'] . ' update collection strategy should have been handled by DocumentPersister. Please report a bug in issue tracker');

case 'set':
case 'setArray':
$coll->initialize();
$this->setCollection($coll, $options);
break;

Expand All @@ -120,22 +139,25 @@ public function update(PersistentCollection $coll, array $options)
throw new \UnexpectedValueException('Unsupported collection strategy: ' . $mapping['strategy']);
}
}

/**
* Sets a PersistentCollection instance.
*
* This method is intended to be used with the "set" or "setArray"
* strategies. The "setArray" strategy will ensure that the collection is
* set as a BSON array, which means the collection elements will be
* reindexed numerically before storage.
*
* @param PersistentCollection $coll
* @param array $options
* INTERNAL:
* Prepares $set query for PersistentCollection update.
*
* This method is intended to be used with the "set", "setArray", "atomicSet"
* and "atomicSetArray" strategies. The "setArray" and "atomicSetArray"
* strategy will ensure that the collection is set as a BSON array, which
* means the collection elements will be reindexed numerically before storage.
*
* @param \Doctrine\ODM\MongoDB\PersistentCollection $coll
* @return array
*/
private function setCollection(PersistentCollection $coll, array $options)
public function prepareSetQuery(PersistentCollection $coll)
{
$coll->initialize();

$mapping = $coll->getMapping();
list($propertyPath, $parent) = $this->getPathAndParent($coll);
list($propertyPath, ) = $this->getPathAndParent($coll);

$pb = $this->pb;

Expand All @@ -145,12 +167,28 @@ private function setCollection(PersistentCollection $coll, array $options)

$setData = $coll->map($callback)->toArray();

if ($mapping['strategy'] === 'setArray') {
if ($mapping['strategy'] === 'setArray' || $mapping['strategy'] === 'atomicSetArray') {
$setData = array_values($setData);
}

$query = array('$set' => array($propertyPath => $setData));
return array('$set' => array($propertyPath => $setData));
}

/**
* Sets a PersistentCollection instance.
*
* This method is intended to be used with the "set" or "setArray"
* strategies. The "setArray" strategy will ensure that the collection is
* set as a BSON array, which means the collection elements will be
* reindexed numerically before storage.
*
* @param PersistentCollection $coll
* @param array $options
*/
private function setCollection(PersistentCollection $coll, array $options)
{
list(, $parent) = $this->getPathAndParent($coll);
$query = $this->prepareSetQuery($coll);
$this->executeQuery($parent, $query, $options);
}

Expand Down Expand Up @@ -227,18 +265,6 @@ private function insertElements(PersistentCollection $coll, array $options)
$this->executeQuery($parent, $query, $options);
}

/**
* Gets the document database identifier value for the given document.
*
* @param object $document
* @param ClassMetadata $class
* @return mixed $id
*/
private function getDocumentId($document, ClassMetadata $class)
{
return $class->getDatabaseIdentifierValue($this->uow->getDocumentIdentifier($document));
}

/**
* Gets the parent information for a given PersistentCollection. It will
* retrieve the top-level persistent Document that the PersistentCollection
Expand Down
43 changes: 36 additions & 7 deletions lib/Doctrine/ODM/MongoDB/Persisters/DocumentPersister.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,6 @@ class DocumentPersister
*/
private $uow;

/**
* The Hydrator instance
*
* @var HydratorInterface
*/
private $hydrator;

/**
* The ClassMetadata instance for the document type being persisted.
*
Expand Down Expand Up @@ -235,6 +228,11 @@ public function executeInserts(array $options = array())
}
$data[$versionMapping['name']] = $nextVersion;
}

$atomicCollectionQuery = $this->getAtomicCollectionUpdateQuery($document);
if (!empty($atomicCollectionQuery['$set'])) {
$data = array_merge_recursive($data, $atomicCollectionQuery['$set']);
}

$inserts[$oid] = $data;
}
Expand Down Expand Up @@ -282,6 +280,8 @@ public function executeUpserts(array $options = array())
}
$data['$set'][$versionMapping['name']] = $nextVersion;
}

$data = array_merge_recursive($data, $this->getAtomicCollectionUpdateQuery($document));

try {
$this->executeUpsert($data, $options);
Expand Down Expand Up @@ -375,6 +375,8 @@ public function update($document, array $options = array())
$this->class->reflFields[$this->class->versionField]->setValue($document, $nextVersion);
}
}

$update = array_merge_recursive($update, $this->getAtomicCollectionUpdateQuery($document));

/* We got here because the document has one or more related
* PersistentCollections to be committed later; however, if the
Expand Down Expand Up @@ -1230,4 +1232,31 @@ private function getClassDiscriminatorValues(ClassMetadata $metadata)
}
return $discriminatorValues;
}

private function getAtomicCollectionUpdateQuery($document)
{
$update = array();
$collections = $this->uow->getScheduledCollections($document);
foreach ($collections as $coll) {
/* @var $coll PersistentCollection */
$mapping = $coll->getMapping();
if ($mapping['strategy'] !== "atomicSet" && $mapping['strategy'] !== "atomicSetArray") {
continue;
}
$collPersister = $this->uow->getCollectionPersister();
if ($this->uow->isCollectionScheduledForUpdate($coll)) {
$update = array_merge_recursive($update, $collPersister->prepareSetQuery($coll));
$this->uow->unscheduleCollectionUpdate($coll);
/* Collection can be set for both deletion and update if
* PersistentCollection instance was changed. Since we're dealing
* with collection update in one query we won't need the $unset
*/
$this->uow->unscheduleCollectionDeletion($coll);
} else {
$update = array_merge_recursive($update, $collPersister->prepareDeleteQuery($coll));
$this->uow->unscheduleCollectionDeletion($coll);
}
}
return $update;
}
}
60 changes: 59 additions & 1 deletion lib/Doctrine/ODM/MongoDB/UnitOfWork.php
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,13 @@ public function commit($document = null, array $options = array())
foreach ($this->collectionDeletions as $collectionToDelete) {
$this->getCollectionPersister()->delete($collectionToDelete, $options);
}

// This hack allows me to fire simulated concurrent PHP logic from a functional test allowing us to prove the atomicset operations work.
global $concurrentPHPRequestSimulatedLogic;
if (get_class($document) == 'Documents\UserVersioned' && is_callable($concurrentPHPRequestSimulatedLogic) && $document->getUsername() == 'ae7e3a92-2983-4cc0-97fb-f074ebd0cabd' && $document->version == 2) {
$concurrentPHPRequestSimulatedLogic();
}

// Collection updates (deleteRows, updateRows, insertRows)
foreach ($this->collectionUpdates as $collectionToUpdate) {
$this->getCollectionPersister()->update($collectionToUpdate, $options);
Expand Down Expand Up @@ -2698,6 +2705,22 @@ public function isCollectionScheduledForDeletion(PersistentCollection $coll)
{
return in_array($coll, $this->collectionDeletions, true);
}

/**
* INTERNAL:
* Unschedules a collection from being updated deleted when this UnitOfWork commits.
* Effectively this is used for atomicSet and atomicSetArray update strategies.
* The method doesn't remove $coll from $this->hasScheduledCollections because
* it is called only from DocumentPersister resolving that very document
*
* @param \Doctrine\ODM\MongoDB\PersistentCollection $coll
*/
public function unscheduleCollectionDeletion(PersistentCollection $coll)
{
if (($key = array_search($coll, $this->collectionDeletions, true)) !== false) {
unset($this->collectionDeletions[$key]);
}
}

/**
* INTERNAL:
Expand All @@ -2711,6 +2734,22 @@ public function scheduleCollectionUpdate(PersistentCollection $coll)
$this->scheduleCollectionOwner($coll);
}

/**
* INTERNAL:
* Unschedules a collection from being updated update when this UnitOfWork commits.
* Effectively this is used for atomicSet and atomicSetArray update strategies.
* The method doesn't remove $coll from $this->hasScheduledCollections because
* it is called only from DocumentPersister resolving that very document
*
* @param \Doctrine\ODM\MongoDB\PersistentCollection $coll
*/
public function unscheduleCollectionUpdate(PersistentCollection $coll)
{
if (($key = array_search($coll, $this->collectionUpdates, true)) !== false) {
unset($this->collectionUpdates[$key]);
}
}

/**
* Checks whether a PersistentCollection is scheduled for update.
*
Expand All @@ -2722,6 +2761,21 @@ public function isCollectionScheduledForUpdate(PersistentCollection $coll)
return in_array($coll, $this->collectionUpdates, true);
}

/**
* INTERNAL:
* Gets PersistentCollections that are scheduled to update and related to $document
*
* @param object $document
* @return array
*/
public function getScheduledCollections($document)
{
$oid = spl_object_hash($document);
return isset($this->hasScheduledCollections[$oid])
? $this->hasScheduledCollections[$oid]
: array();
}

/**
* Checks whether the document is related to a PersistentCollection
* scheduled for update or deletion.
Expand Down Expand Up @@ -2752,8 +2806,12 @@ private function scheduleCollectionOwner(PersistentCollection $coll)
list(, $document, ) = $this->getParentAssociation($document);
$class = $this->dm->getClassMetadata(get_class($document));
}
$oid = spl_object_hash($document);

$this->hasScheduledCollections[spl_object_hash($document)] = true;
if (!isset($this->hasScheduledCollections[$oid])) {
$this->hasScheduledCollections[$oid] = array();
}
$this->hasScheduledCollections[$oid][] = $coll;

if ( ! $this->isDocumentScheduled($document)) {
$this->scheduleForUpdate($document);
Expand Down
Loading