Skip to content

Commit

Permalink
Introduce Task.async_stream/3 and Task.async_stream/5 (elixir-lang#5367)
Browse files Browse the repository at this point in the history
  • Loading branch information
josevalim authored Oct 31, 2016
1 parent e72e562 commit 1a2f278
Show file tree
Hide file tree
Showing 5 changed files with 649 additions and 26 deletions.
71 changes: 71 additions & 0 deletions lib/elixir/lib/task.ex
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,77 @@ defmodule Task do
%Task{pid: pid, ref: ref, owner: owner}
end

@doc """
Returns a stream that runs the given `module`, `function` and `args`
concurrently on each item in `enumerable`.
Each item will be appended to the given `args` and processed by its
own task. The tasks will be linked to the current process similar to
`async/3`.
When streamed, each task will emit `{:ok, val}` upon successful
completion or `{:exit, val}` if the caller is trapping exits. Results
are emitted in the same order as the original `enumerable`.
The level of concurrency can be controlled via the `:max_concurrency`
option and defaults to `System.schedulers_online/1`. The timeout
can also be given as option and defaults to 5000 and it defaults to
the maximum amount of time to wait without a task reply.
Finally, consider using `Task.Supervisor.async_stream/6` to start tasks
under a supervisor. If you find yourself trapping exits to handle exits
inside the async stream, consider using `Task.Supervisor.async_stream_nolink/6`
to start tasks that are not linked to the current process.
## Options
* `:max_concurrency` - sets the maximum number of tasks to run
at the same time. Defaults to `System.schedulers_online/1`.
* `:timeout` - the maximum amount of time to wait without
receiving a task reply (across all running tasks).
## Example
Let's build a stream and then enumerate it:
stream = Task.async_stream(collection, Mod, :expensive_fun, [])
Enum.to_list(stream)
The concurrency can be increased or decreased using the `:max_concurrency`
option. For example, if the tasks are IO heavy, the value can be increased:
max_concurrency = System.schedulers_online * 2
stream = Task.async_stream(collection, Mod, :expensive_fun, [], max_concurrency: max_concurrency)
Enum.to_list(stream)
"""
@spec async_stream(Enumerable.t, module, atom, [term], Keyword.t) :: Enumerable.t
def async_stream(enumerable, module, function, args, options \\ [])
when is_atom(module) and is_atom(function) and is_list(args) do
build_stream(enumerable, {module, function, args}, options)
end

@doc """
Returns a stream that runs the given `function` concurrently on each
item in `enumerable`.
Each `enumerable` item is passed as argument to the `function` and
processed by its own task. The tasks will be linked to the current
process, similar to `async/1`.
See `async_stream/5` for discussion and examples.
"""
@spec async_stream(Enumerable.t, (term -> term), Keyword.t) :: Enumerable.t
def async_stream(enumerable, fun, options \\ []) when is_function(fun, 1) do
build_stream(enumerable, fun, options)
end

defp build_stream(enumerable, fun, options) do
&Task.Supervised.stream(enumerable, &1, &2, fun, options, fn owner, mfa ->
{:link, Task.Supervised.spawn_link(owner, get_info(owner), mfa)}
end)
end

defp get_info(self) do
{node(),
case Process.info(self, :registered_name) do
Expand Down
207 changes: 202 additions & 5 deletions lib/elixir/lib/task/supervised.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
defmodule Task.Supervised do
@moduledoc false

@ref_timeout 5_000

def start(info, fun) do
Expand Down Expand Up @@ -105,9 +104,9 @@ defmodule Task.Supervised do
end

defp exit(_info, _mfa, _log_reason, reason)
when reason == :normal
when reason == :shutdown
when tuple_size(reason) == 2 and elem(reason, 0) == :shutdown do
when reason == :normal
when reason == :shutdown
when tuple_size(reason) == 2 and elem(reason, 0) == :shutdown do
exit(reason)
end

Expand All @@ -132,7 +131,7 @@ defmodule Task.Supervised do
defp get_running({mod, fun, args}), do: {:erlang.make_fun(mod, fun, length(args)), args}

defp get_reason({:undef, [{mod, fun, args, _info} | _] = stacktrace} = reason)
when is_atom(mod) and is_atom(fun) do
when is_atom(mod) and is_atom(fun) do
cond do
:code.is_loaded(mod) === false ->
{:"module could not be loaded", stacktrace}
Expand All @@ -148,4 +147,202 @@ defmodule Task.Supervised do
defp get_reason(reason) do
reason
end

## Stream

def stream(enumerable, acc, reducer, mfa, options, spawn) do
next = &Enumerable.reduce(enumerable, &1, fn x, acc -> {:suspend, [x | acc]} end)
max_concurrency = Keyword.get(options, :max_concurrency, System.schedulers_online)
timeout = Keyword.get(options, :timeout, 5000)
parent = self()

# Start a process responsible for translating down messages.
{monitor_pid, monitor_ref} = spawn_monitor(fn -> stream_monitor(parent) end)
send(monitor_pid, {parent, monitor_ref})

stream_reduce(acc, max_concurrency, 0, 0, %{}, next,
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)
end

defp stream_reduce({:halt, acc}, _max, _spawned, _delivered, waiting, next,
_reducer, _mfa, _spawn, monitor_pid, monitor_ref, timeout) do
is_function(next) && next.({:halt, []})
stream_close(waiting, monitor_pid, monitor_ref, timeout)
{:halted, acc}
end

defp stream_reduce({:suspend, acc}, max, spawned, delivered, waiting, next,
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout) do
{:suspended, acc, &stream_reduce(&1, max, spawned, delivered, waiting, next,
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)}
end

# All spawned, all delivered, next is done.
defp stream_reduce({:cont, acc}, _max, spawned, delivered, waiting, next,
_reducer, _mfa, _spawn, monitor_pid, monitor_ref, timeout)
when spawned == delivered and next == :done do
stream_close(waiting, monitor_pid, monitor_ref, timeout)
{:done, acc}
end

# No more tasks to spawn because max == 0 or next is done.
defp stream_reduce({:cont, acc}, max, spawned, delivered, waiting, next,
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)
when max == 0
when next == :done do
receive do
{{^monitor_ref, position}, value} ->
%{^position => {pid, :running}} = waiting
waiting = Map.put(waiting, position, {pid, {:ok, value}})
stream_reduce({:cont, acc}, max, spawned, delivered, waiting, next,
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)
{:DOWN, {^monitor_ref, position}, reason} ->
waiting =
case waiting do
# We update the entry only if it is running.
# If it is ok or removed, we are done.
%{^position => {pid, :running}} -> Map.put(waiting, position, {pid, {:exit, reason}})
%{} -> waiting
end
stream_deliver({:cont, acc}, max + 1, spawned, delivered, waiting, next,
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)
{:DOWN, ^monitor_ref, _, ^monitor_pid, reason} ->
stream_close(waiting, monitor_pid, monitor_ref, timeout)
exit({reason, {__MODULE__, :stream, [timeout]}})
after
timeout ->
stream_close(waiting, monitor_pid, monitor_ref, timeout)
exit({:timeout, {__MODULE__, :stream, [timeout]}})
end
end

defp stream_reduce({:cont, acc}, max, spawned, delivered, waiting, next,
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout) do
try do
next.({:cont, []})
catch
kind, reason ->
stacktrace = System.stacktrace
stream_close(waiting, monitor_pid, monitor_ref, timeout)
:erlang.raise(kind, reason, stacktrace)
else
{:suspended, [value], next} ->
waiting = stream_spawn(value, spawned, waiting, mfa, spawn, monitor_pid, monitor_ref)
stream_reduce({:cont, acc}, max - 1, spawned + 1, delivered, waiting, next,
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)
{_, [value]} ->
waiting = stream_spawn(value, spawned, waiting, mfa, spawn, monitor_pid, monitor_ref)
stream_reduce({:cont, acc}, max - 1, spawned + 1, delivered, waiting, :done,
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)
{_, []} ->
stream_reduce({:cont, acc}, max, spawned, delivered, waiting, :done,
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)
end
end

defp stream_deliver({:suspend, acc}, max, spawned, delivered, waiting, next,
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout) do
{:suspended, acc, &stream_deliver(&1, max, spawned, delivered, waiting, next,
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)}
end
defp stream_deliver({:halt, acc}, max, spawned, delivered, waiting, next,
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout) do
stream_reduce({:halt, acc}, max, spawned, delivered, waiting, next,
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)
end
defp stream_deliver({:cont, acc}, max, spawned, delivered, waiting, next,
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout) do
case waiting do
%{^delivered => {_, {_, _} = reply}} ->
try do
reducer.(reply, acc)
catch
kind, reason ->
stacktrace = System.stacktrace
is_function(next) && next.({:halt, []})
stream_close(waiting, monitor_pid, monitor_ref, timeout)
:erlang.raise(kind, reason, stacktrace)
else
pair ->
stream_deliver(pair, max, spawned, delivered + 1, Map.delete(waiting, delivered), next,
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)
end
%{} ->
stream_reduce({:cont, acc}, max, spawned, delivered, waiting, next,
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)
end
end

defp stream_close(waiting, monitor_pid, monitor_ref, timeout) do
for {_, {pid, _}} <- waiting do
Process.unlink(pid)
end
send(monitor_pid, {:DOWN, monitor_ref})
receive do
{:DOWN, ^monitor_ref, _, _, {:shutdown, ^monitor_ref}} ->
:ok
{:DOWN, ^monitor_ref, _, _, reason} ->
exit({reason, {__MODULE__, :stream, [timeout]}})
end
stream_cleanup_inbox(monitor_ref)
end

defp stream_cleanup_inbox(monitor_ref) do
receive do
{{^monitor_ref, _}, _} ->
stream_cleanup_inbox(monitor_ref)
{:DOWN, {^monitor_ref, _}, _} ->
stream_cleanup_inbox(monitor_ref)
after
0 ->
:ok
end
end

defp stream_mfa({mod, fun, args}, arg), do: {mod, fun, [arg | args]}
defp stream_mfa(fun, arg), do: {:erlang, :apply, [fun, [arg]]}

defp stream_spawn(value, spawned, waiting, mfa, spawn, monitor_pid, monitor_ref) do
owner = self()
{type, pid} = spawn.(owner, stream_mfa(mfa, value))
send(monitor_pid, {:UP, owner, monitor_ref, spawned, type, pid})
Map.put(waiting, spawned, {pid, :running})
end

defp stream_monitor(parent_pid) do
parent_ref = Process.monitor(parent_pid)
receive do
{^parent_pid, monitor_ref} ->
stream_monitor(parent_pid, parent_ref, monitor_ref, %{})
{:DOWN, ^parent_ref, _, _, reason} ->
exit(reason)
end
end

defp stream_monitor(parent_pid, parent_ref, monitor_ref, counters) do
receive do
{:UP, owner, ^monitor_ref, counter, type, pid} ->
ref = Process.monitor(pid)
send(pid, {owner, {monitor_ref, counter}})
counters = Map.put(counters, ref, {counter, type, pid})
stream_monitor(parent_pid, parent_ref, monitor_ref, counters)
{:DOWN, ^monitor_ref} ->
for {ref, {_counter, _, pid}} <- counters do
Process.exit(pid, :kill)
receive do
{:DOWN, ^ref, _, _, _} -> :ok
end
end
exit({:shutdown, monitor_ref})
{:DOWN, ^parent_ref, _, _, reason} ->
for {_ref, {_counter, :link, pid}} <- counters do
Process.exit(pid, reason)
end
exit(reason)
{:DOWN, ref, _, _, reason} ->
{{counter, _, _}, counters} = Map.pop(counters, ref)
send(parent_pid, {:DOWN, {monitor_ref, counter}, reason})
stream_monitor(parent_pid, parent_ref, monitor_ref, counters)
end
end
end
Loading

0 comments on commit 1a2f278

Please sign in to comment.