Skip to content

Commit

Permalink
Add missing pipeline ops lock for initiate_pipeline_update and additi…
Browse files Browse the repository at this point in the history
…onal validation

PiperOrigin-RevId: 393408829
  • Loading branch information
goutham authored and tfx-copybara committed Aug 27, 2021
1 parent c2155c1 commit e5173ed
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 0 deletions.
1 change: 1 addition & 0 deletions tfx/orchestration/experimental/core/pipeline_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ def stop_node(


@_to_status_not_ok_error
@_pipeline_ops_lock
def initiate_pipeline_update(
mlmd_handle: metadata.Metadata,
pipeline: pipeline_pb2.Pipeline) -> pstate.PipelineState:
Expand Down
18 changes: 18 additions & 0 deletions tfx/orchestration/experimental/core/pipeline_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,13 @@ def load_from_orchestrator_context(
def pipeline_uid(self) -> task_lib.PipelineUid:
return task_lib.PipelineUid.from_pipeline(self.pipeline)

@property
def pipeline_run_id(self) -> Optional[str]:
"""Returns pipeline_run_id in case of sync pipeline, `None` otherwise."""
if self.pipeline.execution_mode == pipeline_pb2.Pipeline.SYNC:
return self.pipeline.runtime_spec.pipeline_run_id.field_value.string_value
return None

def is_active(self) -> bool:
"""Returns `True` if pipeline is active."""
self._check_context()
Expand Down Expand Up @@ -290,6 +297,17 @@ def initiate_update(self, updated_pipeline: pipeline_pb2.Pipeline) -> None:
message=('Updating execution_mode of an active pipeline is not '
'supported'))

if self.pipeline.execution_mode == pipeline_pb2.Pipeline.SYNC:
updated_pipeline_run_id = (
updated_pipeline.runtime_spec.pipeline_run_id.field_value.string_value
)
if self.pipeline_run_id != updated_pipeline_run_id:
raise status_lib.StatusNotOkError(
code=status_lib.Code.INVALID_ARGUMENT,
message=(f'For sync pipeline, pipeline_run_id should match; found '
f'mismatch: {self.pipeline_run_id} (existing) vs. '
f'{updated_pipeline_run_id} (updated)'))

# TODO(b/194311197): We require that structure of the updated pipeline
# exactly matches the original. There is scope to relax this restriction.

Expand Down

0 comments on commit e5173ed

Please sign in to comment.