Skip to content

Latest commit

 

History

History
795 lines (637 loc) · 31.4 KB

scheduling.asciidoc

File metadata and controls

795 lines (637 loc) · 31.4 KB

Scheduling

To fully understand where time in an ERTS system is spent you need to understand how the system decides which Erlang code to run and when to run it. These decisions are made by the Scheduler.

The scheduler is responsible for the real-time guarantees of the system. In a strict Computer Science definition of the word real-time, a real-time system has to be able to guarantee a response within a specified time. That is, there are real deadlines and each task has to complete before its deadline. In Erlang there are no such guarantees, a timeout in Erlang is only guaranteed to not trigger before the given deadline.

In a general system like Erlang where we want to be able to handle all sorts of programs and loads, the scheduler will have to make some compromises. There will always be corner cases where a generic scheduler will behave badly. After reading this chapter you will have a deeper understanding of how the Erlang scheduler works and especially when it might not work optimally. You should be able to design your system to avoid the corner cases and you should also be able to analyze a misbehaving system.

Concurrency, Parallelism, and Preemptive Multitasking

Erlang is a concurrent language. When we say that processes run concurrently we mean that for an outside observer it looks like two processes are executing at the same time. In a single core system this is achieved by preemptive multitasking. This means that one process will run for a while, and then the scheduler of the virtual machine will suspend it and let another process run.

In a multicore or a distributed system we can achieve true parallelism, that is, two or more processes actually executing at the exact same time. In an SMP enabled emulator the system uses several OS threads to indirectly execute Erlang processes by running one scheduler and emulator per thread. In a system using the default settings for ERTS there will be one thread per enabled core (physical or hyper threaded).

We can check that we have a system capable of parallel execution, by checking if SMP support is enabled:

iex(1)> :erlang.system_info :smp_support
true

We can also check how many schedulers we have running in the system:

iex(2)> :erlang.system_info :schedulers_online
4

We can see this information in the Observer as shown in the figure below.

If we spawn more processes than schedulers we have and let them do some busy work we can see that there are a number of processes running in parallel and some processes that are runnable but not currently running. We can see this with the function erlang:process_info/2.

1> Loop = fun (0, _) -> ok; (N, F) -> F(N-1, F) end,
   BusyFun = fun() -> spawn(fun () -> Loop(1000000, Loop) end) end,
   SpawnThem = fun(N) -> [ BusyFun() || _ <- lists:seq(1, N)] end,
   GetStatus = fun() -> lists:sort([{erlang:process_info(P, [status]), P}
                        || P <- erlang:processes()]) end,
   RunThem = fun (N) -> SpawnThem(N), GetStatus() end,
   RunThem(8).

[{[{status,garbage_collecting}],<0.62.0>},
 {[{status,garbage_collecting}],<0.66.0>},
 {[{status,runnable}],<0.60.0>},
 {[{status,runnable}],<0.61.0>},
 {[{status,runnable}],<0.63.0>},
 {[{status,runnable}],<0.65.0>},
 {[{status,runnable}],<0.67.0>},
 {[{status,running}],<0.58.0>},
 {[{status,running}],<0.64.0>},
 {[{status,waiting}],<0.0.0>},
 {[{status,waiting}],<0.1.0>},

...

We will look closer at the different statuses that a process can have later in this chapter, but for now all we need to know is that a process that is running or garbage_collecting is actually running in on a scheduler. Since the machine in the example has four cores and four schedulers there are four process running in parallel (the shell process and three of the busy processes). There are also five busy processes waiting to run in the state runnable.

By using the Load Charts tab in the Observer we can see that all four schedulers are fully loaded while the busy processes execute.

observer:start().
ok
3> RunThem(8).
Observer

Preemptive Multitasking in ERTS Cooperating in C

The preemptive multitasking on the Erlang level is achieved by cooperative multitasking on the C level. The Erlang language, the compiler and the virtual machine works together to ensure that the execution of an Erlang process will yield within a limited time and let the next process run. The technique used to measure and limit the allowed execution time is called reduction counting, we will look at all the details of reduction counting soon.

Reductions

One can describe the scheduling in BEAM as preemptive scheduling on top of cooperative scheduling. A process can only be suspended at certain points of the execution, such as at a receive or a function call. In that way the scheduling is cooperative---a process has to execute code which allows for suspension. The nature of Erlang code makes it almost impossible for a process to run for a long time without doing a function call. There are a few Built In Functions (BIFs) that still can take too long without yielding. Also, if you call C code in a badly implemented Native Implemented Function (NIF) you might block one scheduler for a long time. We will look at how to write well behaved NIFs in [CH-C].

Since there are no other loop constructs than recursion and list comprehensions, there is no way to loop forever without doing a function call. Each function call is counted as a reduction; when the reduction limit for the process is reached it is suspended.

Note
Reductions The term reduction comes from the Prolog ancestry of Erlang. In Prolog each execution step is a goal-reduction, where each step reduces a logic problem into its constituent parts, and then tries to solve each part.

How Many Reductions Will You Get?

When a process is scheduled it will get a number of reductions defined by CONTEXT_REDS (defined in erl_vm.h, currently as 2000). After using up its reductions or when doing a receive without a matching message in the inbox, the process will be suspended and a new processes will be scheduled.

If the VM has executed as many reductions as defined by INPUT_REDUCTIONS (currently 2*CONTEXT_REDS, also defined in erl_vm.h) or if there is no process ready to run the scheduler will do system-level activities. That is, basically, check for IO; we will cover the details soon.

What is a Reduction Really?

It is not completely defined what a reduction is, but at least each function call should be counted as a reduction. Things get a bit more complicated when talking about BIFs and NIFs. A process should not be able to run for "a long time" without using a reduction and yielding. A function written in C can not yield in the middle, it has to make sure it is in a clean state and return. In order to be re-entrant it has to save its internal state somehow before it returns and then set up the state again on re-entry. This can be very costly, especially for a function that sometimes only does little work and sometimes lot. The reason for writing a function in C instead of Erlang is usually to achieve performance and to not do unnecessary book keeping work. Since there is no clear definition of what one reduction is, other than a function call on the Erlang level, there is a risk that a function implemented in C takes many more clock cycles per reduction than a normal Erlang function. This can lead to an imbalance in the scheduler, and even starvation.

For example in Erlang versions prior to R16, the BIFs binary_to_term/1 and term_to_binary/1 were non yielding and only counted as one reduction. This meant that a process calling these functions on large terms could starve other processes. This can even happen in a SMP system because of the way processes are balanced between schedulers, which we will get to soon.

While a process is running the emulator keeps the number of reductions left to execute in the (register mapped) variable FCALLS (see beam_emu.c).

We can examine this value with hipe_bifs:show_pcb/1:

iex(13)> :hipe_bifs.show_pcb self
 P: 0x00007efd7c2c0400
 -----------------------------------------------------------------
 Offset| Name          |              Value |             *Value |
     0 | id            | 0x00000270000004e3 |                    |

 ...

   328 | rcount        | 0x0000000000000000 |                    |
   336 | reds          | 0x000000000000a528 |                    |

 ...

   320 | fcalls        | 0x00000000000004a3 |                    |

The field reds keep track of the total number of reductions a process has done up until it was last suspended. By monitoring this number you can see which processes do the most work.

You can see the total number of reductions for a process (the reds field) by calling erlang:process_info/2 with the atom reductions as the second argument. You can also see this number in the process tab in the observer or with the i/0 command in the Erlang shell.

As noted earlier, each time a process starts the field fcalls is set to the value of CONTEXT_REDS and for each function call the process executes fcalls is reduced by 1. When the process is suspended the field reds is increased by the number of executed reductions. In some C like code something like: p→reds += (CONTEXT_REDS - p→fcalls).

Normally a process would do all its alloted reductions and fcalls would be 0 at this point, but if the process suspends in a receive waiting for a message it will have some reductions left.

When a process uses up all its reductions it will yield to let another process run, it will go from the process state running to the state runnable, if it yields in a receive it will instead go into the state waiting (for a message). In the next section we will take a look at all the different states a process can be in.

The Process State (or status)

The field status in the PCB contains the process state. It can be one of free, runnable, waiting, running, exiting, garbing, and suspended. When a process exits it is marked as free---you should never be able to see a process in this state, it is a short lived state where the process no longer exist as far as the rest of the system is concerned but there is still some clean up to be done (freeing memory and other resources).

Each process status represents a state in the Process State Machine. Events such as a timeout or a delivered message triggers transitions along the edges in the state machine. The Process State Machine looks like this:

Process State Machine
                                +--------+
                                |  free  |
              +-----------+     |        |
          +---> suspended |     +---^----+
          | +-+           |         |
          | | ++-------^^-+     +---+----+
          | |  |       ||       | exiting|
          | |  |       ||       |        |
          | |  |       ||       +---^----+
          | |  |       ||suspend    |
          | |  |       |+--------+  |
          | |  | resume|         |  | exit
          | |  |       |         |  |
          | | +v-------+--+    +-+--+-----+   GC   +----------+
          | | | runnable  |+-->| running  +--------> garbing  |
          | | |           |    |          <--------+          |
          | | +^------^---+    +----+-----+        +----------+
          | |  |      |             |
          | |  | msg  | timeout     | receive
          | |  |      |             |
          | |  |      |             |
          | |  |      |        +----v-----+
          | |  |      +--------+ waiting  |
          | |  +---------------+          |
          | |                  +^---+-----+
          | |resume             |   |
          | +-------------------+   |suspend
          +-------------------------+

options:
 - ".*": {fill: [[0.7, 0.7, 0.7], no-shadow], frame: [[0.9, 0.9, 0.9], line]}
 - ".*": {text : ["Monospace 10", no-shadow]}

The normal states for a process are runnable, waiting, and running. A running process is currently executing code in one of the schedulers. When a process enters a receive and there is no matching message in the message queue, the process will become waiting until a message arrives or a timeout occurs. If a process uses up all its reductions, it will become runnable and wait for a scheduler to pick it up again. A waiting process receiving a message or a timeout will become runnable.

Whenever a process needs to do garbage collection, it will go into the garbing state until the GC is done. While it is doing GC it saves the old state in the field gcstatus and when it is done it sets the state back to the old state using gcstatus.

The suspended state is only supposed to be used for debugging purposes. You can call erlang:suspend_process/2 on another process to force it into the suspended state. Each time a process calls suspend_process on another process, the suspend count is increased. This is recorded in the field rcount. A call to (erlang:resume_process/1) by the suspending process will decrease the suspend count. A process in the suspend state will not leave the suspend state until the suspend count reaches zero.

The field rstatus (resume status) is used to keep track of the state the process was in before a suspend. If it was running or runnable it will start up as runnable, and if it was waiting it will go back to the wait queue. If a suspended waiting process receives a timeout rstatus is set to runnable so it will resume as runnable.

To keep track of which process to run next the scheduler keeps the processes in a queue.

Process Queues

The main job of the scheduler is to keep track of work queues, that is, queues of processes and ports.

There are two process states that the scheduler has to handle, runnable, and waiting. Processes waiting to receive a message are in the waiting state. When a waiting process receives a message the send operations triggers a move of the receiving process into the runnable state. If the receive statement has a timeout the scheduler has to trigger the state transition to runnable when the timeout triggers. We will cover this mechanism later in this chapter.

The Ready Queue

Processes in the runnable state are placed in a FIFO (first in first out) queue handled by the scheduler, called the ready queue. The queue is implemented by a first and a last pointer and by the next pointer in the PCB of each participating process. When a new process is added to the queue the last pointer is followed and the process is added to the end of the queue in an O(1) operation. When a new process is scheduled it is just popped from the head (the first pointer) of the queue.

 The Ready Queue

 First: -->  P5       +---> P3       +-+-> P17
             next: ---+     next: ---+ |  next: NULL
                                       |
 Last: --------------------------------+

In a SMP system, where you have several scheduler threads, there is one queue per scheduler.

 Scheduler 1       Scheduler 2      Scheduler 3      Scheduler 4

 Ready: P5         Ready: P1        Ready: P7        Ready: P9
        P3                P4               P12
        P17                                P10

The reality is slightly more complicated since Erlang processes have priorities. Each scheduler actually have three queues. One queue for max priority tasks, one for high priority tasks and one queue containing both normal and low priority tasks.

 Scheduler 1       Scheduler 2      Scheduler 3      Scheduler 4

 Max:    P5        Max:             Max:             Max:
 High:             High:  P1        High:            High:
 Normal: P3        Ready: P4        Ready: P7        Ready: P9
         P17                               P12
                                           P10

If there are any processes int the max queue the scheduler will pick these processes for execution. If there are no processes in the max queue but there are processes in the high priority queue the scheduler will pick those processes. Only if there are no processes in the max and the high priority queues will the scheduler pick the first process from the normal and low queue.

When a normal process is inserted into the queue it gets a schedule count of 1 and a low priority process gets a schedule count of 8. When a process is picked from the front of the queue its schedule count is reduced by one, if the count reaches zero the process is scheduled, otherwise it is inserted at the end of the queue. This means that low priority processes will go through the queue seven times before they are scheduled.

Waiting, Timeouts and the Timing Wheel

A processs trying to do a receive on an empty mailbox or on a mailbox with no matching messages will yield and go into the waiting state.

When a message is deliverd to an inbox the sending process will check whether the receiver is sleeping in the waiting state, and in that case it will wake the process, change its state to runable, and put it at the end of the appropriate ready queue.

If the receive statement has a timeout clause a timer will be created for the process which will trigger after the specified timeout time. The only guarantee the runtime system gives on a timeout is that it will not trigger before the set time, it might be some time after the intended time before the process is scheduled and get to execute.

Timers are handled in the VM by a timing wheel. That is, an array of time slots which wraps around. Prior to Erlang 18 the timing wheel was a global resource and there could be some contention for the write lock if you had many processes inserting timers into the wheel. Make sure you are using a later version of Erlang if you use many timers.

The default size (TIW_SIZE) of the timing wheel is 65536 slots (or 8192 slots if you have built the system for a small memory footprint). The current time is indicated by an index into the array (tiw_pos). When a timer is inserted into the wheel with a timeout of T the timer is inserted into the slot at (tiw_pos+T)%TIW_SIZE.

   0 1                                      65535
  +-+-+- ... +-+-+-+-+-+-+-+-+-+-+-+ ... +-+-----+
  | | |      | | | | | | |t| | | | |     | |     |
  +-+-+- ... +-+-+-+-+-+-+-+-+-+-+-+ ... +-+-----+
              ^           ^                       ^
              |           |                       |
           tiw_pos     tiw_pos+T               TIW_SIZE

The timer stored in the timing wheel is a pointer to an ErlTimer struct. See erl_time.h. If several timers are inserted into the same slot they are linked together in a linked list by the prev and next fields. The count field is set to T/TIW_SIZE

/*
** Timer entry:
*/
typedef struct erl_timer {
    struct erl_timer* next;	/* next entry tiw slot or chain */
    struct erl_timer* prev;	/* prev entry tiw slot or chain */
    Uint slot;			/* slot in timer wheel */
    Uint count;			/* number of loops remaining */
    int    active;		/* 1=activated, 0=deactivated */
    /* called when timeout */
    void (*timeout)(void*);
    /* called when cancel (may be NULL) */
    void (*cancel)(void*);
    void* arg;        /* argument to timeout/cancel procs */
} ErlTimer;

Ports

A port is an Erlang abstraction for a communication point with the world outside of the Erlang VM. Communications with sockets, pipes, and file IO are all done through ports on the Erlang side.

A port, like a process, is created on the same scheduler as the creating process. Also like processes port uses reductions to decide when to yield, and they also get to run for 2000 reductions. But since ports don’t run Erlang code there are no Erlang function calls to count as reductions, instead each port task is counted as a number of reductions. Currently a task uses a little more than 200 reductions per task, and a number of reductions relative to one thousands of the size of transmitted data.

A port task is one operation on a port, like opening, closing, sending a number of bytes or receiving data. In order to execute a port task the executing thread takes a lock on the port.

Port tasks are scheduled and executed in each iteration in the scheduler loop (see below) before a new process is selected for execution.

Reductions

When a process is scheduled it will get a number of reductions defined by CONTEXT_REDS (defined in erl_vm.h, currently as 2000). After using up its reductions or when doing a receive without a matching message in the inbox, the process will be suspended and a new processes will be scheduled.

If the VM has executed as many reductions as defined by INPUT_REDUCTIONS (currently 2*CONTEXT_REDS, also defined in erl_vm.h) or if there is no process ready to run the scheduler will do system-level activities. That is, basically, check for IO; we will cover the details soon.

It is not completely defined what a reduction is, but at least each function call should be counted as a reduction. Things get a bit more complicated when talking about BIFs and NIFs. A process should not be able to run for "a long time" without using a reduction and yielding. A function written in C can usually not yield at any time, and the reason for writing it in C is usually to achieve performance. In such functions a reduction might take longer which can lead to imbalance in the scheduler.

For example in Erlang versions prior to R16 the BIFs binary_to_term/1 and term_to_binary/1 where non yielding and only counted as one reduction. This meant that a process calling theses functions on large terms could starve other processes. This can even happen in a SMP system because of the way processes are balanced between schedulers, which we will get to soon.

While a process is running the emulator keeps the number of reductions left to execute in the (register mapped) variable FCALLS (see beam_emu.c).

The Scheduler Loop

Conceptually you can look at the scheduler as the driver of program execution in the Erlang VM. In reality, that is, the way the C code is structured, it is the emulator (process_main in beam_emu.c) that drives the execution and it calls the scheduler as a subroutine to find the next process to execute.

Still, we will pretend that it is the other way around, since it makes a nice conceptual model for the scheduler loop. That is, we see it as the scheduler picking a process to execute and then handing over the execution to the emulator.

Looking at it that way, the scheduler loop looks like this:

  1. Update reduction counters.

  2. Check timers

  3. If needed check balance

  4. If needed migrate processes and ports

  5. Do auxiliary scheduler work

  6. If needed check IO and update time

  7. While needed pick a port task to execute

  8. Pick a process to execute

Load Balancing

The current strategy of the load balancer is to use as few schedulers as possible without overloading any CPU. The idea is that you will get better performance through better memory locality when processes share the same CPU.

One thing to note though is that the load balancing done in the scheduler is between scheduler threads and not necessarily between CPUs or cores. When you start the runtime system you can specify how schedulers should be allocated to cores. The default behaviour is that it is up to the OS to allocated scheduler threads to cores, but you can also choose to bind schedulers to cores.

The load balancer assumes that there is one schedulers running on each core so that moving a process from a overloaded scheduler to an under utilized scheduler will give you more parallel processing power. If you have changed how schedulers are allocated to cores, or if you OS is overloaded or bad at assigning threads to cores, the load balancing might actually work against you.

The load balancer uses two techniques to balance the load, task stealing and migration. Task stealing is used every time a scheduler runs out of work, this technique will result in the work becoming more spread out between schedulers. Migration is more complicated and tries to compact the load to the right number of schedulers.

Task Stealing

If a scheduler run queue is empty when it should pick a new process to schedule the scheduler will try to steal work from another scheduler.

First the scheduler takes a lock on itself to prevent other schedulers to try to steal work from the current scheduler. Then it checks if there are any inactive schedulers that it can steal a task from. If there are no inactive schedulers with stealable tasks then it will look at active schedulers, starting with schedulers having a higher id than itself, trying to find a stealable task.

The task stealing will look at one scheduler at a time and try to steal the highest priority task of that scheduler. Since this is done per scheduler there might actually be higher priority tasks that are stealable on another scheduler which will not be taken.

The task stealing tries to move tasks towards schedulers with lower numbers by trying to steal from schedulers with higher numbers, but since the stealing also will wrap around and steal from schedulers with lower numbers the result is that processes are spread out on all active schedulers.

Task stealing is quite fast and can be done on every iteration of the scheduler loop when a scheduler has run out of tasks.

Migration

To really utilize the schedulers optimally a more elaborate migration strategy is used. The current strategy is to compact the load to as few schedulers as possible, while at the same time spread it out so that no scheduler is overloaded.

This is done by the function check_balance in erl_process.c.

The migration is done by first setting up a migration plan and then letting schedulers execute on that plan until a new plan is set up. Every 2000*2000 reductions a scheduler calculates a migration path per priority per scheduler by looking at the workload of all schedulers. The migration path can have three different types of values: 1) cleared 2) migrate to scheduler # 3) immigrate from scheduler #

When a process becomes ready (for example by receiving a message or triggering a timeout) it will normally be scheduled on the last scheduler it ran on (S1). That is, if the migration path of that scheduler (S1), at that priority, is cleared. If the migration path of the scheduler is set to emigrate (to S2) the process will be handed over to that scheduler if both S1 and S2 have unbalanced run-queues. We will get back to what that means.

When a scheduler (S1) is to pick a new process to execute it checks to see if it has an immigration path from (S2) set. If the two involved schedulers have unbalanced run-queues S1 will steal a process from S2.

The migration path is calculated by comparing the maximum run-queues for each scheduler for a certain priority. Each scheduler will update a counter in each iteration of its scheduler loop keeping track of the maximal queue length. This information is then used to calculate an average (max) queue length (AMQL).

 Max
 Run Q
 Length
    5         o
              o
           o  o
Avg: 2.5 --------------
           o  o     o
    1      o  o     o

scheduler S1 S2 S3 S4

Then the schedulers are sorted on their max queue lengths.

 Max
 Run Q
 Length
    5               o
                    o
                 o  o
Avg: 2.5 --------------
              o  o  o
    1         o  o  o

scheduler S3 S4 S1 S2

           ^        ^
           |        |
          tix      fix

Any scheduler with a longer run queue than average (S1, S2) will be marked for emigration and any scheduler with a shorter max run queue than average (S3, S4) will be targeted for immigration.

This is done by looping over the ordered set of schedulers with two indices (immigrate from (fix)) and (emigrate to (tix)). In each iteration of the a loop the immigration path of S[tix] is set to S[fix] and the emigration path of S[fix] is set to S[tix]. Then tix is increased and fix decreased till they both pass the balance point. If one index reaches the balance point first it wraps.

In the example: * Iteration 1: S2.emigrate_to = S3 and S3.immigrate_from = S2 * Iteration 2: S1.emigrate_to = S4 and S4.immigrate_from = S1

Then we are done.

In reality things are a bit more complicated since schedulers can be taken off line. The migration planning is only done for online schedulers. Also, as mentioned before, this is done per priority level.

When a process is to be inserted into a ready queue and there is a migration path set from S1 to S2 the scheduler first checks that the run queue of S1 is larger than AMQL and that the run queue of S2 is smaller than the average. This way the migration is only allowed if both queues are still unbalanced.

There are two exception though where a migration is forced even when the queues are balanced or even imbalanced in the wrong way. In both these cases a special evacuation flag is set which overrides the balance test.

The evacuation flag is set when a scheduler is taken off line to ensure that no new processes are scheduled on an off line scheduler. The flag is also set when the scheduler detects that no progress is made on some priority. That is, if there for example is a max priority process which always is ready to run so that no normal priority processes ever are scheduled. Then the evacuation flag will be set for the normal priority queue for that scheduler.