Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continue storage deletion when some fail #4454

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3525,11 +3525,10 @@ def storage_delete(names: List[str], all: bool, yes: bool): # pylint: disable=r
if sum([len(names) > 0, all]) != 1:
raise click.UsageError('Either --all or a name must be specified.')
if all:
storages = sky.storage_ls()
if not storages:
names = global_user_state.get_storage_names()
if not names:
click.echo('No storage(s) to delete.')
return
names = [s['name'] for s in storages]
else:
names = _get_glob_storages(names)
if names:
Expand All @@ -3543,7 +3542,15 @@ def storage_delete(names: List[str], all: bool, yes: bool): # pylint: disable=r
abort=True,
show_default=True)

subprocess_utils.run_in_parallel(sky.storage_delete, names)
def delete_storage(name: str):
andylizf marked this conversation as resolved.
Show resolved Hide resolved
try:
sky.storage_delete(name)
except Exception as e: # pylint: disable=broad-except
andylizf marked this conversation as resolved.
Show resolved Hide resolved
click.secho(f'Error deleting storage {name}: {e}', fg='red')

subprocess_utils.run_in_parallel(delete_storage,
names,
continue_on_error=True)


@cli.group(cls=_NaturalOrderGroup)
Expand Down
13 changes: 8 additions & 5 deletions sky/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,8 +915,11 @@ def storage_delete(name: str) -> None:
handle = global_user_state.get_handle_from_storage_name(name)
if handle is None:
raise ValueError(f'Storage name {name!r} not found.')
else:
storage_object = data.Storage(name=handle.storage_name,
source=handle.source,
sync_on_reconstruction=False)
storage_object.delete()

assert handle.storage_name == name, (
f'In global_user_state, storage name {name!r} does not match '
f'handle.storage_name {handle.storage_name!r}')
storage_object = data.Storage(name=handle.storage_name,
source=handle.source,
sync_on_reconstruction=False)
storage_object.delete()
7 changes: 6 additions & 1 deletion sky/global_user_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,8 +826,13 @@ def get_storage_names_start_with(starts_with: str) -> List[str]:
return [row[0] for row in rows]


def get_storage_names() -> List[str]:
rows = _DB.cursor.execute('SELECT name FROM storage')
return [row[0] for row in rows]
andylizf marked this conversation as resolved.
Show resolved Hide resolved


def get_storage() -> List[Dict[str, Any]]:
rows = _DB.cursor.execute('select * from storage')
rows = _DB.cursor.execute('SELECT * FROM storage')
records = []
for name, launched_at, handle, last_use, status in rows:
# TODO: use namedtuple instead of dict
Expand Down
36 changes: 27 additions & 9 deletions sky/utils/subprocess_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,29 +96,47 @@ def get_parallel_threads(cloud_str: Optional[str] = None) -> int:
return max(4, cpu_count - 1) * _get_thread_multiplier(cloud_str)


# TODO(andyl): Why this function returns a list of results? Why not yielding?
def run_in_parallel(func: Callable,
args: Iterable[Any],
num_threads: Optional[int] = None) -> List[Any]:
num_threads: Optional[int] = None,
continue_on_error: bool = False) -> List[Any]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding this seems change the return value type from Value to Union[Value, Exception], which looks a little bit strange to me. Why do we need this anyway? aren't we already add the try-except in the func?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I thought about not adding a function to catch and log these exceptions, but instead handle all exceptions in run_in_parallel. Anyway, it made sense to have this function have an argument like continue_on_error.
But then I realized it violated the abstraction concept and would make it harder to print custom logs. So I ended up wrapping the function and logging it.

"""Run a function in parallel on a list of arguments.

The function 'func' should raise a CommandError if the command fails.

Args:
func: The function to run in parallel
args: Iterable of arguments to pass to func
num_threads: Number of threads to use. If None, uses
get_parallel_threads()
continue_on_error: If True, continues execution when errors occur
If False (default), raises the first error immediately

Returns:
A list of the return values of the function func, in the same order as the
arguments.
arguments. If continue_on_error=True, failed operations will have
their exceptions in the result list.
"""
# Reference: https://stackoverflow.com/questions/25790279/python-multiprocessing-early-termination # pylint: disable=line-too-long
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not removing this reference

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean why remove the reference? I think it's trivial now, and the reference doesn't add much value to the code.

processes = num_threads if num_threads is not None else get_parallel_threads(
)
processes = (num_threads
if num_threads is not None else get_parallel_threads())
with pool.ThreadPool(processes=processes) as p:
# Run the function in parallel on the arguments, keeping the order.
return list(p.imap(func, args))
ordered_iterators = p.imap(func, args)

# TODO(andyl): Is this list(ordered_iterators) clear? Maybe we should
# merge two cases, and move this logic deeper to
# `except e: results.append(e) if continue_on_error else raise e`
if not continue_on_error:
return list(ordered_iterators)
else:
results: List[Union[Any, Exception]] = []
while True:
try:
result = next(ordered_iterators)
results.append(result)
except StopIteration:
break
except Exception as e: # pylint: disable=broad-except
results.append(e)
return results


def handle_returncode(returncode: int,
Expand Down
Loading