Skip to content

Commit

Permalink
Fix part of oppia#11475: Move datastoreio stub into JobBase (oppia#12947
Browse files Browse the repository at this point in the history
)

* Add JobRunResult type for Beam Jobs

* Update job_run_result.py

* transition to ndb_io

* Move datastoreio_stub into JobBase

* add unit tests

* tidy things up

* add test to shards

* tidy up
  • Loading branch information
brianrodri authored Jun 2, 2021
1 parent f753ff5 commit 43a2032
Show file tree
Hide file tree
Showing 14 changed files with 549 additions and 264 deletions.
4 changes: 2 additions & 2 deletions jobs/base_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
from __future__ import absolute_import # pylint: disable=import-only-modules
from __future__ import unicode_literals # pylint: disable=import-only-modules

from jobs import job_options
from jobs.io import stub_io
import python_utils


Expand Down Expand Up @@ -162,7 +162,7 @@ def __init__(self, pipeline):
pipeline: beam.Pipeline. The pipeline that manages the job.
"""
self.pipeline = pipeline
self.job_options = self.pipeline.options.view_as(job_options.JobOptions)
self.datastoreio_stub = stub_io.DatastoreioStub()

def run(self):
"""Runs PTransforms with self.pipeline to compute/process PValues.
Expand Down
12 changes: 3 additions & 9 deletions jobs/base_validation_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from core.platform import models
from jobs import base_jobs
from jobs import job_utils
from jobs.io import ndb_io
from jobs.transforms import base_validation
from jobs.transforms import base_validation_registry
from jobs.types import base_validation_errors
Expand Down Expand Up @@ -81,17 +82,10 @@ def run(self):
ValueError. When the `datastoreio` option, which provides the
PTransforms for performing datastore IO operations, is None.
"""
datastoreio = self.job_options.datastoreio
if datastoreio is None:
raise ValueError('JobOptions.datastoreio must not be None')

query_everything = job_utils.get_beam_query_from_ndb_query(
datastore_services.query_everything())

existing_models, deleted_models = (
self.pipeline
| 'Get all models' >> (
datastoreio.ReadFromDatastore(query_everything))
| 'Get all models' >> ndb_io.GetModels(
datastore_services.query_everything(), self.datastoreio_stub)
| 'Partition by model.deleted' >> (
beam.Partition(lambda model, _: int(model.deleted), 2))
)
Expand Down
13 changes: 0 additions & 13 deletions jobs/base_validation_jobs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,11 @@
from core.platform import models
import feconf
from jobs import base_validation_jobs
from jobs import job_options
from jobs import job_test_utils
from jobs.transforms import base_validation
from jobs.types import base_validation_errors
from jobs.types import model_property

from apache_beam import runners
from apache_beam.testing import test_pipeline

(auth_models, base_models, user_models) = models.Registry.import_models(
[models.NAMES.auth, models.NAMES.base_model, models.NAMES.user])

Expand All @@ -44,15 +40,6 @@ class AuditAllStorageModelsJobTests(job_test_utils.JobTestBase):
def test_empty_storage(self):
self.assert_job_output_is_empty()

def test_run_with_empty_model_getter(self):
pipeline = test_pipeline.TestPipeline(
runner=runners.DirectRunner(),
options=job_options.JobOptions(datastoreio=None))

self.assertRaisesRegexp(
ValueError, 'JobOptions.datastoreio must not be None',
base_validation_jobs.AuditAllStorageModelsJob(pipeline).run)

def test_base_validation(self):
base_model_with_invalid_id = self.create_model(
base_models.BaseModel, id='123@?!*', deleted=False)
Expand Down
128 changes: 128 additions & 0 deletions jobs/io/ndb_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# coding: utf-8
#
# Copyright 2021 The Oppia Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Provides an Apache Beam API for operating on NDB models."""

from __future__ import absolute_import # pylint: disable=import-only-modules
from __future__ import unicode_literals # pylint: disable=import-only-modules

import feconf
from jobs import job_utils

import apache_beam as beam


class GetModels(beam.PTransform):
"""Reads NDB models from the datastore using a query.
TODO(#11475): Stop using datastoreio_stub after we're able to use Cloud NDB.
"""

def __init__(self, query, datastoreio_stub, label=None):
"""Initializes the GetModels PTransform.
Args:
query: datastore_services.Query. The query used to fetch models.
datastoreio_stub: stub_io.DatastoreioStub. The stub instance
responsible for handling datastoreio operations.
label: str|None. The label of the PTransform.
"""
super(GetModels, self).__init__(label=label)
self.datastoreio = datastoreio_stub
self.query = query

def expand(self, pbegin):
"""Returns a PCollection containing the queried models.
Args:
pbegin: PValue. The initial PValue of the pipeline, used to anchor
the models to its underlying pipeline.
Returns:
PCollection. The PCollection of models.
"""
return (
pbegin
| 'Reading %r from the datastore' % self.query >> (
self.datastoreio.ReadFromDatastore(
job_utils.get_beam_query_from_ndb_query(self.query)))
| 'Transforming %r into NDB models' % self.query >> (
beam.Map(job_utils.get_ndb_model_from_beam_entity))
)


class PutModels(beam.PTransform):
"""Writes NDB models to the datastore."""

def __init__(self, datastoreio_stub, label=None):
"""Initializes the PutModels PTransform.
Args:
datastoreio_stub: stub_io.DatastoreioStub. The stub instance
responsible for handling datastoreio operations.
label: str|None. The label of the PTransform.
"""
super(PutModels, self).__init__(label=label)
self.datastoreio = datastoreio_stub

def expand(self, model_pcoll):
"""Writes the given models to the datastore.
Args:
model_pcoll: PCollection. A PCollection of NDB models.
Returns:
PCollection. An empty PCollection.
"""
return (
model_pcoll
| 'Transforming the NDB models into Apache Beam entities' >> (
beam.Map(job_utils.get_beam_entity_from_ndb_model))
| 'Writing the NDB models to the datastore' >> (
self.datastoreio.WriteToDatastore(feconf.OPPIA_PROJECT_ID))
)


class DeleteModels(beam.PTransform):
"""Deletes NDB models from the datastore."""

def __init__(self, datastoreio_stub, label=None):
"""Initializes the DeleteModels PTransform.
Args:
datastoreio_stub: stub_io.DatastoreioStub. The stub instance
responsible for handling datastoreio operations.
label: str|None. The label of the PTransform.
"""
super(DeleteModels, self).__init__(label=label)
self.datastoreio = datastoreio_stub

def expand(self, model_key_pcoll):
"""Deletes the given models from the datastore.
Args:
model_key_pcoll: PCollection. The PCollection of NDB keys to delete.
Returns:
PCollection. An empty PCollection.
"""
return (
model_key_pcoll
| 'Transforming the NDB keys into Apache Beam keys' >> (
beam.Map(job_utils.get_beam_key_from_ndb_key))
| 'Deleting the NDB keys from the datastore' >> (
self.datastoreio.DeleteFromDatastore(feconf.OPPIA_PROJECT_ID))
)
119 changes: 119 additions & 0 deletions jobs/io/ndb_io_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# coding: utf-8
#
# Copyright 2021 The Oppia Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Provides an Apache Beam API for operating on NDB models."""

from __future__ import absolute_import # pylint: disable=import-only-modules
from __future__ import unicode_literals # pylint: disable=import-only-modules

from core.platform import models
from jobs import job_test_utils
from jobs.io import ndb_io
from jobs.io import stub_io

import apache_beam as beam

(base_models,) = models.Registry.import_models([models.NAMES.base_model])

datastore_services = models.Registry.import_datastore_services()


class NdbIoTests(job_test_utils.PipelinedTestBase):

def setUp(self):
super(NdbIoTests, self).setUp()
self.datastoreio_stub = stub_io.DatastoreioStub()

def tearDown(self):
datastore_services.delete_multi(
datastore_services.query_everything().iter(keys_only=True))
super(NdbIoTests, self).tearDown()

def get_everything(self):
"""Returns all models in the datastore.
Returns:
list(Model). All of the models in the datastore.
"""
return list(datastore_services.query_everything().iter())

def put_multi(self, model_list, update_last_updated_time=False):
"""Puts the given models into the datastore.
Args:
model_list: list(Model). The models to put into the datastore.
update_last_updated_time: bool. Whether to update the last updated
time before putting the model into storage.
"""
datastore_services.update_timestamps_multi(
model_list, update_last_updated_time=update_last_updated_time)
datastore_services.put_multi(model_list)

def test_read_from_datastore(self):
model_list = [
self.create_model(base_models.BaseModel, id='a'),
self.create_model(base_models.BaseModel, id='b'),
self.create_model(base_models.BaseModel, id='c'),
]
self.put_multi(model_list)

self.assertItemsEqual(self.get_everything(), model_list)

with self.datastoreio_stub.context():
model_pcoll = (
self.pipeline
| ndb_io.GetModels(
datastore_services.query_everything(),
self.datastoreio_stub)
)
self.assert_pcoll_equal(model_pcoll, model_list)

def test_write_to_datastore(self):
model_list = [
self.create_model(base_models.BaseModel, id='a'),
self.create_model(base_models.BaseModel, id='b'),
self.create_model(base_models.BaseModel, id='c'),
]

self.assertItemsEqual(self.get_everything(), [])

with self.datastoreio_stub.context():
self.assert_pcoll_empty(
self.pipeline
| beam.Create(model_list)
| ndb_io.PutModels(self.datastoreio_stub)
)

self.assertItemsEqual(self.get_everything(), model_list)

def test_delete_from_datastore(self):
model_list = [
self.create_model(base_models.BaseModel, id='a'),
self.create_model(base_models.BaseModel, id='b'),
self.create_model(base_models.BaseModel, id='c'),
]
self.put_multi(model_list)

self.assertItemsEqual(self.get_everything(), model_list)

with self.datastoreio_stub.context():
self.assert_pcoll_empty(
self.pipeline
| beam.Create([model.key for model in model_list])
| ndb_io.DeleteModels(self.datastoreio_stub)
)

self.assertItemsEqual(self.get_everything(), [])
Loading

0 comments on commit 43a2032

Please sign in to comment.