Skip to content

Commit

Permalink
fixed loop termination combinator persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
LanderOtto committed Nov 23, 2023
1 parent f7c8e18 commit c0ae0a1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
22 changes: 22 additions & 0 deletions streamflow/workflow/combinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,25 @@ async def _product(self) -> AsyncIterable[MutableMapping[str, Token]]:
}
for k in self.output_items
}

@classmethod
async def _load(
cls,
context: StreamFlowContext,
row: MutableMapping[str, Any],
loading_context: DatabaseLoadingContext,
) -> LoopTerminationCombinator:
combinator = cls(
name=row["name"],
workflow=await loading_context.load_workflow(context, row["workflow"]),
)
for item in row["output_items"]:
combinator.add_output_item(item)
return combinator

async def _save_additional_params(self, context: StreamFlowContext):
# self.token_values is not saved because it is always empty at the beginning of execution
return {
**await super()._save_additional_params(context),
**{"output_items": self.output_items},
}
11 changes: 6 additions & 5 deletions tests/test_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,13 @@ async def test_loop_termination_combinator(context: StreamFlowContext):
await workflow.save(context)

name = utils.random_name()
combinator = LoopTerminationCombinator(
name=name + "-loop-termination-combinator", workflow=workflow
)
combinator.add_output_item("test")
combinator.add_output_item("another")
step = workflow.create_step(
cls=CombinatorStep,
name=name + "-combinator",
combinator=LoopTerminationCombinator(
name=utils.random_name(), workflow=workflow
),
cls=CombinatorStep, name=name + "-loop-termination", combinator=combinator
)
await save_load_and_test(step, context)

Expand Down

0 comments on commit c0ae0a1

Please sign in to comment.