Skip to content

Commit

Permalink
[workflow] S3 support for workflow (ray-project#16993)
Browse files Browse the repository at this point in the history
* up

* up

* up

* format

* up

* fix comment

* up

* update

* update

* move dep

* bump pytest versin

* use lazy_fixture explicitly

* format
  • Loading branch information
fishbone authored Jul 14, 2021
1 parent 645d8fc commit dc0f948
Show file tree
Hide file tree
Showing 11 changed files with 593 additions and 144 deletions.
1 change: 1 addition & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ exclude =
python/build/
python/.eggs/
python/ray/_private/thirdparty/*
python/ray/experimental/workflow/tests/mock_server.py
max-line-length = 79
inline-quotes = "
ignore =
Expand Down
4 changes: 2 additions & 2 deletions python/ray/experimental/workflow/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def run(entry_workflow: "Workflow",
type of the workflow will be ObjectRef[T] if the output step is
a workflow step returning type T.
storage: The external storage URL or a custom storage class. If not
specified, ``$(pwd)/workflow_data`` will be used.
specified, ``/tmp/ray/workflow_data`` will be used.
workflow_id: A unique identifier that can be used to resume the
workflow. If not specified, a random id will be generated.
Expand Down Expand Up @@ -101,7 +101,7 @@ def resume(workflow_id: str,
Args:
workflow_id: The id of the workflow to resume.
storage: The external storage URL or a custom storage class. If not
specified, ``$(pwd)/workflow_data`` will be used.
specified, ``/tmp/ray/workflow_data`` will be used.
Returns:
An object reference that can be used to retrieve the workflow result.
Expand Down
54 changes: 47 additions & 7 deletions python/ray/experimental/workflow/storage/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import os

import logging
import urllib.parse as parse
from ray.experimental.workflow.storage.base import Storage
from ray.experimental.workflow.storage.base import DataLoadError, DataSaveError
from ray.experimental.workflow.storage.filesystem import FilesystemStorageImpl
from ray.experimental.workflow.storage.s3 import S3StorageImpl

logger = logging.getLogger(__name__)


def create_storage(storage_url: str) -> Storage:
Expand All @@ -11,21 +15,57 @@ def create_storage(storage_url: str) -> Storage:
Args:
storage_url: A URL indicates the storage type and root path.
Currently only two types of storages are supported: local fs and s3
For local fs, a path is needed, it can be either a URI with scheme
file:// or just a local path, i.e.:
file:///local_path
local_path
For s3, bucket, path are necessary. In the meantime, other parameters
can be passed as well, like credientials or regions, i.e.:
s3://bucket/path?region_name=str&endpoint_url=str&aws_access_key_id=str&
aws_secret_access_key=str&aws_session_token=str
All parameters are optional and have the same meaning as boto3.client
Returns:
A storage instance.
"""
# TODO(suquark): in the future we need to support general URLs for
# different storages, e.g. "s3://xxxx/xxx". Currently we just use
# 'pathlib.Path' for convenience.
return FilesystemStorageImpl(storage_url)
parsed_url = parse.urlparse(storage_url)
if parsed_url.scheme == "file" or parsed_url.scheme == "":
return FilesystemStorageImpl(parsed_url.path)
elif parsed_url.scheme == "s3":
bucket = parsed_url.netloc
s3_path = parsed_url.path
if not s3_path:
raise ValueError(f"Invalid s3 path: {s3_path}")
params = dict({
tuple(param.split("=", 1))
for param in str(parsed_url.query).split("&")
})
return S3StorageImpl(bucket, s3_path, **params)
else:
raise ValueError(f"Invalid url: {storage_url}")


# the default storage is a local filesystem storage with a hidden directory
_global_storage = create_storage(os.path.join(os.path.curdir, "workflow_data"))
_global_storage = None


def get_global_storage() -> Storage:
global _global_storage
if _global_storage is None:
storage_url = os.environ.get("RAY_WORKFLOW_STORAGE")
if storage_url is None:
# We should use get_temp_dir_path, but for ray client, we don't
# have this one. We need a flag to tell whether it's a client
# or a driver to use the right dir.
# For now, just use /tmp/ray/workflow_data
logger.warning(
"Using default local dir: `/tmp/ray/workflow_data`. "
"This should only be used for testing purposes.")
storage_url = "file:///tmp/ray/workflow_data"
_global_storage = create_storage(storage_url)
return _global_storage


Expand All @@ -34,5 +74,5 @@ def set_global_storage(storage: Storage) -> None:
_global_storage = storage


__all__ = ("Storage", "create_storage", "get_global_storage",
__all__ = ("Storage", "get_global_storage", "create_storage",
"set_global_storage", "DataLoadError", "DataSaveError")
49 changes: 26 additions & 23 deletions python/ray/experimental/workflow/storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class Storage(metaclass=abc.ABCMeta):
"""

@abstractmethod
def load_step_input_metadata(self, workflow_id: str,
step_id: StepID) -> Dict[str, Any]:
async def load_step_input_metadata(self, workflow_id: str,
step_id: StepID) -> Dict[str, Any]:
"""Load the input metadata of a step.
Args:
Expand All @@ -55,8 +55,8 @@ def load_step_input_metadata(self, workflow_id: str,
"""

@abstractmethod
def save_step_input_metadata(self, workflow_id: str, step_id: StepID,
metadata: Dict[str, Any]) -> None:
async def save_step_input_metadata(self, workflow_id: str, step_id: StepID,
metadata: Dict[str, Any]) -> None:
"""Save the input metadata of a step.
Args:
Expand All @@ -69,8 +69,8 @@ def save_step_input_metadata(self, workflow_id: str, step_id: StepID,
"""

@abstractmethod
def load_step_output_metadata(self, workflow_id: str,
step_id: StepID) -> Dict[str, Any]:
async def load_step_output_metadata(self, workflow_id: str,
step_id: StepID) -> Dict[str, Any]:
"""Load the output metadata of a step.
Args:
Expand All @@ -85,8 +85,9 @@ def load_step_output_metadata(self, workflow_id: str,
"""

@abstractmethod
def save_step_output_metadata(self, workflow_id: str, step_id: StepID,
metadata: Dict[str, Any]) -> None:
async def save_step_output_metadata(self, workflow_id: str,
step_id: StepID,
metadata: Dict[str, Any]) -> None:
"""Save the output metadata of a step.
Args:
Expand All @@ -99,7 +100,7 @@ def save_step_output_metadata(self, workflow_id: str, step_id: StepID,
"""

@abstractmethod
def load_step_output(self, workflow_id: str, step_id: StepID) -> Any:
async def load_step_output(self, workflow_id: str, step_id: StepID) -> Any:
"""Load the output of the workflow step from checkpoint.
Args:
Expand All @@ -114,8 +115,8 @@ def load_step_output(self, workflow_id: str, step_id: StepID) -> Any:
"""

@abstractmethod
def save_step_output(self, workflow_id: str, step_id: StepID,
output: Any) -> None:
async def save_step_output(self, workflow_id: str, step_id: StepID,
output: Any) -> None:
"""Save the output of a workflow step.
Args:
Expand All @@ -127,8 +128,8 @@ def save_step_output(self, workflow_id: str, step_id: StepID,
"""

@abstractmethod
def load_step_func_body(self, workflow_id: str,
step_id: StepID) -> Callable:
async def load_step_func_body(self, workflow_id: str,
step_id: StepID) -> Callable:
"""Load the function body of the workflow step.
Args:
Expand All @@ -143,8 +144,8 @@ def load_step_func_body(self, workflow_id: str,
"""

@abstractmethod
def save_step_func_body(self, workflow_id: str, step_id: StepID,
func_body: Callable) -> None:
async def save_step_func_body(self, workflow_id: str, step_id: StepID,
func_body: Callable) -> None:
"""Save the function body of the workflow step.
Args:
Expand All @@ -157,7 +158,8 @@ def save_step_func_body(self, workflow_id: str, step_id: StepID,
"""

@abstractmethod
def load_step_args(self, workflow_id: str, step_id: StepID) -> ArgsType:
async def load_step_args(self, workflow_id: str,
step_id: StepID) -> ArgsType:
"""Load the input arguments of the workflow step. This must be
done under a serialization context, otherwise the arguments would
not be reconstructed successfully.
Expand All @@ -174,8 +176,8 @@ def load_step_args(self, workflow_id: str, step_id: StepID) -> ArgsType:
"""

@abstractmethod
def save_step_args(self, workflow_id: str, step_id: StepID,
args: ArgsType) -> None:
async def save_step_args(self, workflow_id: str, step_id: StepID,
args: ArgsType) -> None:
"""Save the function body of the workflow step.
Args:
Expand All @@ -188,8 +190,8 @@ def save_step_args(self, workflow_id: str, step_id: StepID,
"""

@abstractmethod
def load_object_ref(self, workflow_id: str,
object_id: str) -> ray.ObjectRef:
async def load_object_ref(self, workflow_id: str,
object_id: str) -> ray.ObjectRef:
"""Load the input object ref.
Args:
Expand All @@ -204,8 +206,8 @@ def load_object_ref(self, workflow_id: str,
"""

@abstractmethod
def save_object_ref(self, workflow_id: str,
obj_ref: ray.ObjectRef) -> None:
async def save_object_ref(self, workflow_id: str,
obj_ref: ray.ObjectRef) -> None:
"""Save the input object ref.
Args:
Expand All @@ -217,7 +219,8 @@ def save_object_ref(self, workflow_id: str,
"""

@abstractmethod
def get_step_status(self, workflow_id: str, step_id: StepID) -> StepStatus:
async def get_step_status(self, workflow_id: str,
step_id: StepID) -> StepStatus:
"""Check the status of a step in the storage.
Args:
Expand Down
48 changes: 26 additions & 22 deletions python/ray/experimental/workflow/storage/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,95 +133,98 @@ class FilesystemStorageImpl(Storage):
def __init__(self, workflow_root_dir: str):
self._workflow_root_dir = pathlib.Path(workflow_root_dir)

def load_step_input_metadata(self, workflow_id: str,
step_id: StepID) -> Dict[str, Any]:
async def load_step_input_metadata(self, workflow_id: str,
step_id: StepID) -> Dict[str, Any]:
step_dir = self._workflow_root_dir / workflow_id / STEPS_DIR / step_id
try:
with _open_atomic(step_dir / STEP_INPUTS_METADATA) as f:
return json.load(f)
except Exception as e:
raise DataLoadError from e

def save_step_input_metadata(self, workflow_id: str, step_id: StepID,
metadata: Dict[str, Any]) -> None:
async def save_step_input_metadata(self, workflow_id: str, step_id: StepID,
metadata: Dict[str, Any]) -> None:
step_dir = self._workflow_root_dir / workflow_id / STEPS_DIR / step_id
try:
with _open_atomic(step_dir / STEP_INPUTS_METADATA, "w") as f:
json.dump(metadata, f)
except Exception as e:
raise DataSaveError from e

def load_step_output_metadata(self, workflow_id: str,
step_id: StepID) -> Dict[str, Any]:
async def load_step_output_metadata(self, workflow_id: str,
step_id: StepID) -> Dict[str, Any]:
step_dir = self._workflow_root_dir / workflow_id / STEPS_DIR / step_id
try:
with _open_atomic(step_dir / STEP_OUTPUTS_METADATA) as f:
return json.load(f)
except Exception as e:
raise DataLoadError from e

def save_step_output_metadata(self, workflow_id: str, step_id: StepID,
metadata: Dict[str, Any]) -> None:
async def save_step_output_metadata(self, workflow_id: str,
step_id: StepID,
metadata: Dict[str, Any]) -> None:
step_dir = self._workflow_root_dir / workflow_id / STEPS_DIR / step_id
try:
with _open_atomic(step_dir / STEP_OUTPUTS_METADATA, "w") as f:
json.dump(metadata, f)
except Exception as e:
raise DataSaveError from e

def load_step_output(self, workflow_id: str, step_id: StepID) -> Any:
async def load_step_output(self, workflow_id: str, step_id: StepID) -> Any:
step_dir = self._workflow_root_dir / workflow_id / STEPS_DIR / step_id
try:
with _open_atomic(step_dir / STEP_OUTPUT, "rb") as f:
return ray.cloudpickle.load(f)
except Exception as e:
raise DataLoadError from e

def save_step_output(self, workflow_id: str, step_id: StepID,
output: Any) -> None:
async def save_step_output(self, workflow_id: str, step_id: StepID,
output: Any) -> None:
step_dir = self._workflow_root_dir / workflow_id / STEPS_DIR / step_id
try:
with _open_atomic(step_dir / STEP_OUTPUT, "wb") as f:
ray.cloudpickle.dump(output, f)
except Exception as e:
raise DataSaveError from e

def load_step_func_body(self, workflow_id: str,
step_id: StepID) -> Callable:
async def load_step_func_body(self, workflow_id: str,
step_id: StepID) -> Callable:
step_dir = self._workflow_root_dir / workflow_id / STEPS_DIR / step_id
try:
with _open_atomic(step_dir / STEP_FUNC_BODY, "rb") as f:
return ray.cloudpickle.load(f)
except Exception as e:
raise DataLoadError from e

def save_step_func_body(self, workflow_id: str, step_id: StepID,
func_body: Callable) -> None:
async def save_step_func_body(self, workflow_id: str, step_id: StepID,
func_body: Callable) -> None:
step_dir = self._workflow_root_dir / workflow_id / STEPS_DIR / step_id
try:
with _open_atomic(step_dir / STEP_FUNC_BODY, "wb") as f:
ray.cloudpickle.dump(func_body, f)
except Exception as e:
raise DataSaveError from e

def load_step_args(self, workflow_id: str, step_id: StepID) -> ArgsType:
async def load_step_args(self, workflow_id: str,
step_id: StepID) -> ArgsType:
step_dir = self._workflow_root_dir / workflow_id / STEPS_DIR / step_id
try:
with _open_atomic(step_dir / STEP_ARGS, "rb") as f:
return ray.cloudpickle.load(f)
except Exception as e:
raise DataLoadError from e

def save_step_args(self, workflow_id: str, step_id: StepID,
args: ArgsType) -> None:
async def save_step_args(self, workflow_id: str, step_id: StepID,
args: ArgsType) -> None:
step_dir = self._workflow_root_dir / workflow_id / STEPS_DIR / step_id
try:
with _open_atomic(step_dir / STEP_ARGS, "wb") as f:
ray.cloudpickle.dump(args, f)
except Exception as e:
raise DataSaveError from e

def load_object_ref(self, workflow_id: str, object_id) -> ray.ObjectRef:
async def load_object_ref(self, workflow_id: str,
object_id) -> ray.ObjectRef:
objects_dir = self._workflow_root_dir / workflow_id / OBJECTS_DIR
try:
with _open_atomic(objects_dir / object_id, "rb") as f:
Expand All @@ -230,8 +233,8 @@ def load_object_ref(self, workflow_id: str, object_id) -> ray.ObjectRef:
except Exception as e:
raise DataLoadError from e

def save_object_ref(self, workflow_id: str,
obj_ref: ray.ObjectRef) -> None:
async def save_object_ref(self, workflow_id: str,
obj_ref: ray.ObjectRef) -> None:
objects_dir = self._workflow_root_dir / workflow_id / OBJECTS_DIR
try:
obj = ray.get(obj_ref)
Expand All @@ -240,7 +243,8 @@ def save_object_ref(self, workflow_id: str,
except Exception as e:
raise DataSaveError from e

def get_step_status(self, workflow_id: str, step_id: StepID) -> StepStatus:
async def get_step_status(self, workflow_id: str,
step_id: StepID) -> StepStatus:
step_dir = self._workflow_root_dir / workflow_id / STEPS_DIR / step_id
return StepStatus(
output_object_exists=(step_dir / STEP_OUTPUT).exists(),
Expand Down
Loading

0 comments on commit dc0f948

Please sign in to comment.