Task.Supervisor — Elixir v1.18.3 (original) (raw)

A task supervisor.

This module defines a supervisor which can be used to dynamically supervise tasks.

A task supervisor is started with no children, often under a supervisor and a name:

children = [
  {Task.Supervisor, name: MyApp.TaskSupervisor}
]

Supervisor.start_link(children, strategy: :one_for_one)

The options given in the child specification are documented in start_link/1.

Once started, you can start tasks directly under the supervisor, for example:

task = Task.Supervisor.async(MyApp.TaskSupervisor, fn ->
  :do_some_work
end)

See the Task module for more examples.

Scalability and partitioning

The Task.Supervisor is a single process responsible for starting other processes. In some applications, the Task.Supervisor may become a bottleneck. To address this, you can start multiple instances of the Task.Supervisor and then pick a random instance to start the task on.

Instead of:

children = [
  {Task.Supervisor, name: MyApp.TaskSupervisor}
]

and:

Task.Supervisor.async(MyApp.TaskSupervisor, fn -> :do_some_work end)

You can do this:

children = [
  {PartitionSupervisor,
   child_spec: Task.Supervisor,
   name: MyApp.TaskSupervisors}
]

and then:

Task.Supervisor.async(
  {:via, PartitionSupervisor, {MyApp.TaskSupervisors, self()}},
  fn -> :do_some_work end
)

In the code above, we start a partition supervisor that will by default start a dynamic supervisor for each core in your machine. Then, instead of calling the Task.Supervisor by name, you call it through the partition supervisor using the {:via, PartitionSupervisor, {name, key}}format, where name is the name of the partition supervisor and keyis the routing key. We picked self() as the routing key, which means each process will be assigned one of the existing task supervisors. Read the PartitionSupervisor docs for more information.

Name registration

A Task.Supervisor is bound to the same name registration rules as aGenServer. Read more about them in the GenServer docs.

Summary

Types

Options given to async_stream and async_stream_nolink functions.

Option values used by start_link

Functions

Starts a task that can be awaited on.

Starts a task that can be awaited on.

Starts a task that can be awaited on.

Starts a task that can be awaited on.

Returns a stream that runs the given function fun concurrently on each element in enumerable.

Returns a stream where the given function (module and function) is mapped concurrently on each element in enumerable.

Returns a stream that runs the given function concurrently on each element in enumerable.

Returns a stream where the given function (module and function) is mapped concurrently on each element in enumerable.

Returns all children PIDs except those that are restarting.

Starts a task as a child of the given supervisor.

Starts a task as a child of the given supervisor.

Terminates the child with the given pid.

Types

Options given to async_stream and async_stream_nolink functions.

Option values used by start_link

Functions

Starts a task that can be awaited on.

The supervisor must be a reference as defined in Supervisor. The task will still be linked to the caller, see Task.async/1 for more information and async_nolink/3 for a non-linked variant.

Raises an error if supervisor has reached the maximum number of children.

Options

Starts a task that can be awaited on.

The supervisor must be a reference as defined in Supervisor. The task will still be linked to the caller, see Task.async/1 for more information and async_nolink/3 for a non-linked variant.

Raises an error if supervisor has reached the maximum number of children.

Options

Starts a task that can be awaited on.

The supervisor must be a reference as defined in Supervisor. The task won't be linked to the caller, see Task.async/1 for more information.

Raises an error if supervisor has reached the maximum number of children.

Note this function requires the task supervisor to have :temporaryas the :restart option (the default), as async_nolink/3 keeps a direct reference to the task which is lost if the task is restarted.

Options

Compatibility with OTP behaviours

If you create a task using async_nolink inside an OTP behaviour like GenServer, you should match on the message coming from the task inside your GenServer.handle_info/2 callback.

The reply sent by the task will be in the format {ref, result}, where ref is the monitor reference held by the task struct and result is the return value of the task function.

Keep in mind that, regardless of how the task created with async_nolinkterminates, the caller's process will always receive a :DOWN message with the same ref value that is held by the task struct. If the task terminates normally, the reason in the :DOWN message will be :normal.

Examples

Typically, you use async_nolink/3 when there is a reasonable expectation that the task may fail, and you don't want it to take down the caller. Let's see an example where a GenServer is meant to run a single task and track its status:

defmodule MyApp.Server do
  use GenServer

  # ...

  def start_task do
    GenServer.call(__MODULE__, :start_task)
  end

  # In this case the task is already running, so we just return :ok.
  def handle_call(:start_task, _from, %{ref: ref} = state) when is_reference(ref) do
    {:reply, :ok, state}
  end

  # The task is not running yet, so let's start it.
  def handle_call(:start_task, _from, %{ref: nil} = state) do
    task =
      Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn ->
        ...
      end)

    # We return :ok and the server will continue running
    {:reply, :ok, %{state | ref: task.ref}}
  end

  # The task completed successfully
  def handle_info({ref, answer}, %{ref: ref} = state) do
    # We don't care about the DOWN message now, so let's demonitor and flush it
    Process.demonitor(ref, [:flush])
    # Do something with the result and then return
    {:noreply, %{state | ref: nil}}
  end

  # The task failed
  def handle_info({:DOWN, ref, :process, _pid, _reason}, %{ref: ref} = state) do
    # Log and possibly restart the task...
    {:noreply, %{state | ref: nil}}
  end
end

Starts a task that can be awaited on.

The supervisor must be a reference as defined in Supervisor. The task won't be linked to the caller, see Task.async/1 for more information.

Raises an error if supervisor has reached the maximum number of children.

Note this function requires the task supervisor to have :temporaryas the :restart option (the default), as async_nolink/5 keeps a direct reference to the task which is lost if the task is restarted.

Returns a stream that runs the given function fun concurrently on each element in enumerable.

Each element in enumerable is passed as argument to the given function funand processed by its own task. The tasks will be spawned under the givensupervisor and linked to the caller process, similarly to async/3.

See async_stream/6 for discussion, options, and examples.

Returns a stream where the given function (module and function) is mapped concurrently on each element in enumerable.

Each element will be prepended to the given args and processed by its own task. The tasks will be spawned under the given supervisor and linked to the caller process, similarly to async/5.

When streamed, each task will emit {:ok, value} upon successful completion or {:exit, reason} if the caller is trapping exits. The order of results depends on the value of the :ordered option.

The level of concurrency and the time tasks are allowed to run can be controlled via options (see the "Options" section below).

If you find yourself trapping exits to handle exits inside the async stream, consider using async_stream_nolink/6 to start tasks that are not linked to the calling process.

Options

Examples

Let's build a stream and then enumerate it:

stream = Task.Supervisor.async_stream(MySupervisor, collection, Mod, :expensive_fun, [])
Enum.to_list(stream)

Returns a stream that runs the given function concurrently on each element in enumerable.

Each element in enumerable is passed as argument to the given function funand processed by its own task. The tasks will be spawned under the givensupervisor and will not be linked to the caller process, similarly to async_nolink/3.

See async_stream/6 for discussion and examples.

Error handling and cleanup

Even if tasks are not linked to the caller, there is no risk of leaving dangling tasks running after the stream halts.

Consider the following example:

Task.Supervisor.async_stream_nolink(MySupervisor, collection, fun, on_timeout: :kill_task, ordered: false)
|> Enum.each(fn
  {:ok, _} -> :ok
  {:exit, reason} -> raise "Task exited: #{Exception.format_exit(reason)}"
end)

If one task raises or times out:

  1. the second clause gets called
  2. an exception is raised
  3. the stream halts
  4. all ongoing tasks will be shut down

Here is another example:

Task.Supervisor.async_stream_nolink(MySupervisor, collection, fun, on_timeout: :kill_task, ordered: false)
|> Stream.filter(&match?({:ok, _}, &1))
|> Enum.take(3)

This will return the three first tasks to succeed, ignoring timeouts and errors, and shut down every ongoing task.

Just running the stream with Stream.run/1 on the other hand would ignore errors and process the whole stream.

Returns a stream where the given function (module and function) is mapped concurrently on each element in enumerable.

Each element in enumerable will be prepended to the given args and processed by its own task. The tasks will be spawned under the given supervisor and will not be linked to the caller process, similarly to async_nolink/5.

See async_stream/6 for discussion, options, and examples.

Returns all children PIDs except those that are restarting.

Note that calling this function when supervising a large number of children under low memory conditions can cause an out of memory exception.

Starts a task as a child of the given supervisor.

Task.Supervisor.start_child(MyTaskSupervisor, fn ->
  IO.puts("I am running in a task")
end)

Note that the spawned process is not linked to the caller, but only to the supervisor. This command is useful in case the task needs to perform side-effects (like I/O) and you have no interest in its results nor if it completes successfully.

Options

Starts a task as a child of the given supervisor.

Similar to start_child/3 except the task is specified by the given module, fun and args.

Starts a new supervisor.

Examples

A task supervisor is typically started under a supervision tree using the tuple format:

{Task.Supervisor, name: MyApp.TaskSupervisor}

You can also start it by calling start_link/1 directly:

Task.Supervisor.start_link(name: MyApp.TaskSupervisor)

But this is recommended only for scripting and should be avoided in production code. Generally speaking, processes should always be started inside supervision trees.

Options

This function could also receive :restart and :shutdown as options but those two options have been deprecated and it is now preferred to give them directly to start_child.

Terminates the child with the given pid.