Skip to content

Commit

Permalink
Split filter items to avoid BSON document too large if source repo
Browse files Browse the repository at this point in the history
contains > 345,000 units of same type

The issue was caused by long query (>16M) if source repo contains too many
units ( > 345,000) of same type.

The PR splits unit_ids in the filter to several chunks, then fetches data
separately, eventually combines all results.

Note that the PR only handle the case when the query does not need to sort.
It means the PR will not help if the query contains sorting. So the patch does
not fix the issue pulp#2220 completely.

ref pulp#2220
https://pulp.plan.io/issues/2220
  • Loading branch information
zxiong committed Jan 7, 2020
1 parent 65025d1 commit 5a9bdc7
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 20 deletions.
27 changes: 24 additions & 3 deletions server/pulp/server/managers/repo/unit_association_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

_VALID_DIRECTIONS = (SORT_ASCENDING, SORT_DESCENDING)

UNITS_BATCH_SIZE = 100000


class RepoUnitAssociationQueryManager(object):

Expand Down Expand Up @@ -115,9 +117,28 @@ def get_units(self, repo_id, criteria=None, as_generator=False):
# Use a generator expression here to keep from going back to the types
# collections once we've returned our limit of results.
# Be sure to skip cursors that would otherwise return an empty result set.
units_cursors = (self._associated_units_by_type_cursor(t, criteria,
associations_lookup[t].keys())
for t in association_unit_types if t in associations_lookup)
#
# If the repo contains too many units (>345K) of same type, the number of associated
# units will be too large and cause that the length of corresponding query exceeds the
# limit (16M), so split it into batches. But the way does not help if the criteria
# contains sorting.
if criteria.unit_sort:
units_cursors = (self._associated_units_by_type_cursor(t, criteria,
associations_lookup[t].keys())
for t in association_unit_types if t in associations_lookup)
else:
units_cursors_list = []
for t in association_unit_types:
if t in associations_lookup:
associations_unit_keys = associations_lookup[t].keys()
unit_keys_list = [associations_unit_keys[i:i + UNITS_BATCH_SIZE]
for i in xrange(0, len(associations_unit_keys),
UNITS_BATCH_SIZE)]
for sub_unit_keys in unit_keys_list:
units_cursors_list.append(
self._associated_units_by_type_cursor(
t, criteria, sub_unit_keys))
units_cursors = (units_cursor for units_cursor in units_cursors_list)

if not criteria.association_sort:
# If we're not sorting based on association fields, then set the
Expand Down
104 changes: 87 additions & 17 deletions server/test/unit/server/managers/repo/test_unit_association_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,23 +580,6 @@ def test_get_units_by_type_not_query(self):
for u in units:
self.assertTrue(u['metadata']['key_1'] != 'aardvark')

def test_criteria_str(self):
# Setup
c1 = UnitAssociationCriteria()
c2 = UnitAssociationCriteria(
type_ids=['a'], association_filters={'a': 'a'}, unit_filters={'b': 'b'},
association_sort=['c'], unit_sort=['d'], limit=1, skip=2, association_fields=['e'],
unit_fields=['f'], remove_duplicates=True)

# Test no exceptions are raised
str(c1)
str(c2)

def test_criteria_init(self):
# Test
c = UnitAssociationCriteria(type_ids='single')
self.assertEqual(['single'], c.type_ids)

def _assert_unit_integrity(self, unit):
"""
Makes sure all of the expected fields are present in the unit and that
Expand All @@ -618,3 +601,90 @@ def _assert_unit_integrity(self, unit):
self.assertTrue(unit['metadata']['md_1'] is not None)
self.assertTrue(unit['metadata']['md_2'] is not None)
self.assertTrue(unit['metadata']['md_3'] is not None)

def test_get_units_by_type_batches(self):
# Check if units are found as expectation by different batch size
association_query_manager.UNITS_BATCH_SIZE = 100000
units_1 = self.manager.get_units_by_type('repo-1', 'alpha')

association_query_manager.UNITS_BATCH_SIZE = 2
units_2 = self.manager.get_units_by_type('repo-1', 'alpha')

for item in units_1:
found = False
for item2 in units_2:
if item2['unit_id'] == item['unit_id'] and item['unit_id'] in self.units['alpha']:
found = True
continue
self.assertTrue(found)

self.assertEqual(len(self.units['alpha']), len(units_1))

# Revert
association_query_manager.UNITS_BATCH_SIZE = 100000

@mock.patch('pulp.server.managers.repo.unit_association_query.RepoUnitAssociationQueryManager.\
_associated_units_by_type_cursor')
def test_get_units_by_type_batch_size(self, _associated_units_by_type_cursor_mock):
# Test
cursor_obj = mock.MagicMock()
cursor_obj.__iter__ = mock.MagicMock(return_value=iter([]))
_associated_units_by_type_cursor_mock.return_value = cursor_obj

association_query_manager.UNITS_BATCH_SIZE = 2
self.manager.get_units_by_type('repo-1', 'alpha')

# Verify if 3 units are split to 2 batches
batch_num = math.ceil(
float(len(self.units['alpha'])) / association_query_manager.UNITS_BATCH_SIZE)
self.assertEqual(
_associated_units_by_type_cursor_mock.call_count, batch_num)

association_query_manager.UNITS_BATCH_SIZE = 3
self.manager.get_units_by_type('repo-1', 'alpha')

# Verify if units is not split when size of units is less than batch size
self.assertEqual(
_associated_units_by_type_cursor_mock.call_count, batch_num + 1)

# Revert
association_query_manager.UNITS_BATCH_SIZE = 100000

@mock.patch('pulp.server.managers.repo.unit_association_query.RepoUnitAssociationQueryManager.\
_associated_units_by_type_cursor')
def test_get_units_by_type_condition(self, _associated_units_by_type_cursor_mock):
# Test
association_query_manager.UNITS_BATCH_SIZE = 2
criteria = UnitAssociationCriteria(
unit_sort=[('created', association_manager.SORT_DESCENDING)])
self.manager.get_units_by_type('repo-1', 'alpha', criteria)

# Verify if finding unit without batches
self.assertEqual(_associated_units_by_type_cursor_mock.call_count, 1)

self.manager.get_units_by_type('repo-1', 'alpha')
# Verify if finding unit by batches
batch_num = math.ceil(
float(len(self.units['alpha'])) / association_query_manager.UNITS_BATCH_SIZE)
self.assertEqual(
_associated_units_by_type_cursor_mock.call_count, batch_num + 1)

# Revert
association_query_manager.UNITS_BATCH_SIZE = 100000

def test_criteria_str(self):
# Setup
c1 = UnitAssociationCriteria()
c2 = UnitAssociationCriteria(
type_ids=['a'], association_filters={'a': 'a'}, unit_filters={'b': 'b'},
association_sort=['c'], unit_sort=['d'], limit=1, skip=2, association_fields=['e'],
unit_fields=['f'], remove_duplicates=True)

# Test no exceptions are raised
str(c1)
str(c2)

def test_criteria_init(self):
# Test
c = UnitAssociationCriteria(type_ids='single')
self.assertEqual(['single'], c.type_ids)

0 comments on commit 5a9bdc7

Please sign in to comment.