To fully understand where time in an ERTS system is spent you need to understand how the system decides which Erlang code to run and when to run it. These decisions are made by the Scheduler.
The scheduler is responsible for the real-time guarantees of the system. In a strict Computer Science definition of the word real-time, a real-time system has to be able to guarantee a response within a specified time. That is, there are real deadlines and each task has to complete before its deadline. In Erlang there are no such guarantees, a timeout in Erlang is only guaranteed to not trigger before the given deadline.
In a general system like Erlang where we want to be able to handle all sorts of programs and loads, the scheduler will have to make some compromises. There will always be corner cases where a generic scheduler will behave badly. After reading this chapter you will have a deeper understanding of how the Erlang scheduler works and especially when it might not work optimally. You should be able to design your system to avoid the corner cases and you should also be able to analyze a misbehaving system.
Erlang is a concurrent language. When we say that processes run concurrently we mean that for an outside observer it looks like two processes are executing at the same time. In a single core system this is achieved by preemptive multitasking. This means that one process will run for a while, and then the scheduler of the virtual machine will suspend it and let another process run.
In a multicore or a distributed system we can achieve true parallelism, that is, two or more processes actually executing at the exact same time. In an SMP enabled emulator the system uses several OS threads to indirectly execute Erlang processes by running one scheduler and emulator per thread. In a system using the default settings for ERTS there will be one thread per enabled core (physical or hyper threaded).
We can check that we have a system capable of parallel execution, by checking if SMP support is enabled:
iex(1)> :erlang.system_info :smp_support true
We can also check how many schedulers we have running in the system:
iex(2)> :erlang.system_info :schedulers_online 4
We can see this information in the Observer as shown in the figure below.
If we spawn more processes than schedulers we have and
let them do some busy work we can see that there are a number
of processes running in parallel and some processes that
are runnable but not currently running. We can see this
with the function erlang:process_info/2
.
1> Loop = fun (0, _) -> ok; (N, F) -> F(N-1, F) end, BusyFun = fun() -> spawn(fun () -> Loop(1000000, Loop) end) end, SpawnThem = fun(N) -> [ BusyFun() || _ <- lists:seq(1, N)] end, GetStatus = fun() -> lists:sort([{erlang:process_info(P, [status]), P} || P <- erlang:processes()]) end, RunThem = fun (N) -> SpawnThem(N), GetStatus() end, RunThem(8). [{[{status,garbage_collecting}],<0.62.0>}, {[{status,garbage_collecting}],<0.66.0>}, {[{status,runnable}],<0.60.0>}, {[{status,runnable}],<0.61.0>}, {[{status,runnable}],<0.63.0>}, {[{status,runnable}],<0.65.0>}, {[{status,runnable}],<0.67.0>}, {[{status,running}],<0.58.0>}, {[{status,running}],<0.64.0>}, {[{status,waiting}],<0.0.0>}, {[{status,waiting}],<0.1.0>}, ...
We will look closer at the different statuses that a process can have later in this chapter, but for now all we need to know is that a process that is running or garbage_collecting is actually running in on a scheduler. Since the machine in the example has four cores and four schedulers there are four process running in parallel (the shell process and three of the busy processes). There are also five busy processes waiting to run in the state runnable.
By using the Load Charts tab in the Observer we can see that all four schedulers are fully loaded while the busy processes execute.
observer:start(). ok 3> RunThem(8).
The preemptive multitasking on the Erlang level is achieved by cooperative multitasking on the C level. The Erlang language, the compiler and the virtual machine works together to ensure that the execution of an Erlang process will yield within a limited time and let the next process run. The technique used to measure and limit the allowed execution time is called reduction counting, we will look at all the details of reduction counting soon.
One can describe the scheduling in BEAM as preemptive scheduling on top of cooperative scheduling. A process can only be suspended at certain points of the execution, such as at a receive or a function call. In that way the scheduling is cooperative---a process has to execute code which allows for suspension. The nature of Erlang code makes it almost impossible for a process to run for a long time without doing a function call. There are a few Built In Functions (BIFs) that still can take too long without yielding. Also, if you call C code in a badly implemented Native Implemented Function (NIF) you might block one scheduler for a long time. We will look at how to write well behaved NIFs in [CH-C].
Since there are no other loop constructs than recursion and
list comprehensions,
there is no way to loop forever without doing a function call.
Each function call is counted as a reduction
; when the reduction
limit for the process is reached it is suspended.
Note
|
Reductions The term reduction comes from the Prolog ancestry of Erlang. In Prolog each execution step is a goal-reduction, where each step reduces a logic problem into its constituent parts, and then tries to solve each part. |
When a process is scheduled it will get a number of reductions defined
by CONTEXT_REDS
(defined in erl_vm.h,
currently as 2000). After using
up its reductions or when doing a receive without a matching message
in the inbox, the process will be suspended and a new processes will
be scheduled.
If the VM has executed as many reductions as defined by
INPUT_REDUCTIONS
(currently 2*CONTEXT_REDS
, also defined in
erl_vm.h) or if there is no process ready to run
the scheduler will do system-level activities. That is, basically,
check for IO; we will cover the details soon.
It is not completely defined what a reduction is, but at least each function call should be counted as a reduction. Things get a bit more complicated when talking about BIFs and NIFs. A process should not be able to run for "a long time" without using a reduction and yielding. A function written in C can not yield in the middle, it has to make sure it is in a clean state and return. In order to be re-entrant it has to save its internal state somehow before it returns and then set up the state again on re-entry. This can be very costly, especially for a function that sometimes only does little work and sometimes lot. The reason for writing a function in C instead of Erlang is usually to achieve performance and to not do unnecessary book keeping work. Since there is no clear definition of what one reduction is, other than a function call on the Erlang level, there is a risk that a function implemented in C takes many more clock cycles per reduction than a normal Erlang function. This can lead to an imbalance in the scheduler, and even starvation.
For example in Erlang versions prior to R16, the BIFs
binary_to_term/1
and term_to_binary/1
were non yielding and only
counted as one reduction. This meant that a process calling these
functions on large terms could starve other processes. This can even
happen in a SMP system because of the way processes are balanced
between schedulers, which we will get to soon.
While a process is running the emulator keeps the number of reductions
left to execute in the (register mapped) variable FCALLS
(see
beam_emu.c).
We can examine this value with hipe_bifs:show_pcb/1
:
iex(13)> :hipe_bifs.show_pcb self P: 0x00007efd7c2c0400 ----------------------------------------------------------------- Offset| Name | Value | *Value | 0 | id | 0x00000270000004e3 | | ... 328 | rcount | 0x0000000000000000 | | 336 | reds | 0x000000000000a528 | | ... 320 | fcalls | 0x00000000000004a3 | |
The field reds
keep track of the total number of reductions a
process has done up until it was last suspended. By monitoring this
number you can see which processes do the most work.
You can see the total number of reductions for a process (the reds
field) by calling erlang:process_info/2
with the atom reductions
as the second argument. You can also see this number in the process
tab in the observer or with the i/0 command in the Erlang shell.
As noted earlier, each time a process starts the field fcalls
is set to
the value of CONTEXT_REDS
and for each function call the
process executes fcalls
is reduced by 1. When the process is
suspended the field reds is increased by the number of executed
reductions. In some C like code something like:
p→reds += (CONTEXT_REDS - p→fcalls)
.
Normally a process would do all its allotted reductions and fcalls
would be 0 at this point, but if the process suspends in a receive
waiting for a message it will have some reductions left.
When a process uses up all its reductions it will yield to let another process run, it will go from the process state running to the state runnable, if it yields in a receive it will instead go into the state waiting (for a message). In the next section we will take a look at all the different states a process can be in.
The field status
in the PCB contains the process state. It can be one
of free, runnable, waiting, running, exiting, garbing,
and suspended. When a process exits it is marked as
free---you should never be able to see a process in this state,
it is a short lived state where the process no longer exist as
far as the rest of the system is concerned but there is still
some clean up to be done (freeing memory and other resources).
Each process status represents a state in the Process State Machine. Events such as a timeout or a delivered message triggers transitions along the edges in the state machine. The Process State Machine looks like this:
+--------+ | free | +-----------+ | | +---> suspended | +---^----+ | +-+ | | | | ++-------^^-+ +---+----+ | | | || | exiting| | | | || | | | | | || +---^----+ | | | ||suspend | | | | |+--------+ | | | | resume| | | exit | | | | | | | | +v-------+--+ +-+--+-----+ GC +----------+ | | | runnable |+-->| running +--------> garbing | | | | | | <--------+ | | | +^------^---+ +----+-----+ +----------+ | | | | | | | | msg | timeout | receive | | | | | | | | | | | | | | +----v-----+ | | | +--------+ waiting | | | +---------------+ | | | +^---+-----+ | |resume | | | +-------------------+ |suspend +-------------------------+ options: - ".*": {fill: [[0.7, 0.7, 0.7], no-shadow], frame: [[0.9, 0.9, 0.9], line]} - ".*": {text : ["Monospace 10", no-shadow]}
The normal states for a process are runnable, waiting, and running. A running process is currently executing code in one of the schedulers. When a process enters a receive and there is no matching message in the message queue, the process will become waiting until a message arrives or a timeout occurs. If a process uses up all its reductions, it will become runnable and wait for a scheduler to pick it up again. A waiting process receiving a message or a timeout will become runnable.
Whenever a process needs to do garbage collection, it will go into
the garbing
state until the GC is done. While it is doing GC
it saves the old state in the field gcstatus
and when it is done
it sets the state back to the old state using gcstatus
.
The suspended state is only supposed to be used for debugging
purposes. You can call erlang:suspend_process/2
on another process
to force it into the suspended state. Each time a process calls
suspend_process
on another process, the suspend count is increased.
This is recorded in the field rcount
.
A call to (erlang:resume_process/1
) by the suspending process will
decrease the suspend count. A process in the suspend state will not
leave the suspend state until the suspend count reaches zero.
The field rstatus
(resume status) is used to keep track of the
state the process was in before a suspend. If it was running
or runnable it will start up as runnable, and if it was waiting
it will go back to the wait queue. If a suspended waiting process
receives a timeout rstatus
is set to runnable so it will resume
as runnable.
To keep track of which process to run next the scheduler keeps the processes in a queue.
The main job of the scheduler is to keep track of work queues, that is, queues of processes and ports.
There are two process states that the scheduler has to handle, runnable, and waiting. Processes waiting to receive a message are in the waiting state. When a waiting process receives a message the send operations triggers a move of the receiving process into the runnable state. If the receive statement has a timeout the scheduler has to trigger the state transition to runnable when the timeout triggers. We will cover this mechanism later in this chapter.
Processes in the runnable state are placed in a FIFO (first in first out) queue handled by the scheduler, called the ready queue. The queue is implemented by a first and a last pointer and by the next pointer in the PCB of each participating process. When a new process is added to the queue the last pointer is followed and the process is added to the end of the queue in an O(1) operation. When a new process is scheduled it is just popped from the head (the first pointer) of the queue.
The Ready Queue First: --> P5 +---> P3 +-+-> P17 next: ---+ next: ---+ | next: NULL | Last: --------------------------------+
In a SMP system, where you have several scheduler threads, there is one queue per scheduler.
Scheduler 1 Scheduler 2 Scheduler 3 Scheduler 4 Ready: P5 Ready: P1 Ready: P7 Ready: P9 P3 P4 P12 P17 P10
The reality is slightly more complicated since Erlang processes have priorities. Each scheduler actually has three queues. One queue for max priority tasks, one for high priority tasks and one queue containing both normal and low priority tasks.
Scheduler 1 Scheduler 2 Scheduler 3 Scheduler 4 Max: P5 Max: Max: Max: High: High: P1 High: High: Normal: P3 Ready: P4 Ready: P7 Ready: P9 P17 P12 P10
If there are any processes in the max queue the scheduler will pick these processes for execution. If there are no processes in the max queue but there are processes in the high priority queue the scheduler will pick those processes. Only if there are no processes in the max and the high priority queues will the scheduler pick the first process from the normal and low queue.
When a normal process is inserted into the queue it gets a schedule count of 1 and a low priority process gets a schedule count of 8. When a process is picked from the front of the queue its schedule count is reduced by one, if the count reaches zero the process is scheduled, otherwise it is inserted at the end of the queue. This means that low priority processes will go through the queue seven times before they are scheduled.
A processs trying to do a receive on an empty mailbox or on a mailbox with no matching messages will yield and go into the waiting state.
When a message is delivered to an inbox the sending process will check whether the receiver is sleeping in the waiting state, and in that case it will wake the process, change its state to runable, and put it at the end of the appropriate ready queue.
If the receive statement has a timeout clause a timer will be created for the process which will trigger after the specified timeout time. The only guarantee the runtime system gives on a timeout is that it will not trigger before the set time, it might be some time after the intended time before the process is scheduled and gets to execute.
Timers are handled in the VM by a timing wheel. That is, an array of time slots which wraps around. Prior to Erlang 18 the timing wheel was a global resource and there could be some contention for the write lock if you had many processes inserting timers into the wheel. Make sure you are using a later version of Erlang if you use many timers.
The default size (TIW_SIZE) of the timing wheel is 65536 slots (or 8192 slots if you have built the system for a small memory footprint). The current time is indicated by an index into the array (tiw_pos). When a timer is inserted into the wheel with a timeout of T the timer is inserted into the slot at (tiw_pos+T)%TIW_SIZE.
0 1 65535 +-+-+- ... +-+-+-+-+-+-+-+-+-+-+-+ ... +-+-----+ | | | | | | | | | |t| | | | | | | | +-+-+- ... +-+-+-+-+-+-+-+-+-+-+-+ ... +-+-----+ ^ ^ ^ | | | tiw_pos tiw_pos+T TIW_SIZE
The timer stored in the timing wheel is a pointer to an ErlTimer struct. See erl_time.h. If several timers are inserted into the same slot they are linked together in a linked list by the prev and next fields. The count field is set to T/TIW_SIZE
/*
** Timer entry:
*/
typedef struct erl_timer {
struct erl_timer* next; /* next entry tiw slot or chain */
struct erl_timer* prev; /* prev entry tiw slot or chain */
Uint slot; /* slot in timer wheel */
Uint count; /* number of loops remaining */
int active; /* 1=activated, 0=deactivated */
/* called when timeout */
void (*timeout)(void*);
/* called when cancel (may be NULL) */
void (*cancel)(void*);
void* arg; /* argument to timeout/cancel procs */
} ErlTimer;
A port is an Erlang abstraction for a communication point with the world outside of the Erlang VM. Communications with sockets, pipes, and file IO are all done through ports on the Erlang side.
A port, like a process, is created on the same scheduler as the creating process. Also like processes ports use reductions to decide when to yield, and they also get to run for 2000 reductions. But since ports don’t run Erlang code there are no Erlang function calls to count as reductions, instead each port task is counted as a number of reductions. Currently a task uses a little more than 200 reductions per task, and a number of reductions relative to one thousands of the size of transmitted data.
A port task is one operation on a port, like opening, closing, sending a number of bytes or receiving data. In order to execute a port task the executing thread takes a lock on the port.
Port tasks are scheduled and executed in each iteration in the scheduler loop (see below) before a new process is selected for execution.
When a process is scheduled it will get a number of reductions defined by CONTEXT_REDS (defined in erl_vm.h, currently as 2000). After using up its reductions or when doing a receive without a matching message in the inbox, the process will be suspended and a new processes will be scheduled.
If the VM has executed as many reductions as defined by INPUT_REDUCTIONS (currently 2*CONTEXT_REDS, also defined in erl_vm.h) or if there is no process ready to run the scheduler will do system-level activities. That is, basically, check for IO; we will cover the details soon.
It is not completely defined what a reduction is, but at least each function call should be counted as a reduction. Things get a bit more complicated when talking about BIFs and NIFs. A process should not be able to run for "a long time" without using a reduction and yielding. A function written in C can usually not yield at any time, and the reason for writing it in C is usually to achieve performance. In such functions a reduction might take longer which can lead to imbalance in the scheduler.
For example in Erlang versions prior to R16 the BIFs binary_to_term/1 and term_to_binary/1 where non yielding and only counted as one reduction. This meant that a process calling theses functions on large terms could starve other processes. This can even happen in a SMP system because of the way processes are balanced between schedulers, which we will get to soon.
While a process is running the emulator keeps the number of reductions left to execute in the (register mapped) variable FCALLS (see beam_emu.c).
Conceptually you can look at the scheduler as the driver of program execution in the Erlang VM. In reality, that is, the way the C code is structured, it is the emulator (process_main in beam_emu.c) that drives the execution and it calls the scheduler as a subroutine to find the next process to execute.
Still, we will pretend that it is the other way around, since it makes a nice conceptual model for the scheduler loop. That is, we see it as the scheduler picking a process to execute and then handing over the execution to the emulator.
Looking at it that way, the scheduler loop looks like this:
-
Update reduction counters.
-
Check timers
-
If needed check balance
-
If needed migrate processes and ports
-
Do auxiliary scheduler work
-
If needed check IO and update time
-
While needed pick a port task to execute
-
Pick a process to execute
The current strategy of the load balancer is to use as few schedulers as possible without overloading any CPU. The idea is that you will get better performance through better memory locality when processes share the same CPU.
One thing to note though is that the load balancing done in the scheduler is between scheduler threads and not necessarily between CPUs or cores. When you start the runtime system you can specify how schedulers should be allocated to cores. The default behaviour is that it is up to the OS to allocated scheduler threads to cores, but you can also choose to bind schedulers to cores.
The load balancer assumes that there is one scheduler running on each core so that moving a process from a overloaded scheduler to an under utilized scheduler will give you more parallel processing power. If you have changed how schedulers are allocated to cores, or if your OS is overloaded or bad at assigning threads to cores, the load balancing might actually work against you.
The load balancer uses two techniques to balance the load, task stealing and migration. Task stealing is used every time a scheduler runs out of work, this technique will result in the work becoming more spread out between schedulers. Migration is more complicated and tries to compact the load to the right number of schedulers.
If a scheduler run queue is empty when it should pick a new process to schedule the scheduler will try to steal work from another scheduler.
First the scheduler takes a lock on itself to prevent other schedulers to try to steal work from the current scheduler. Then it checks if there are any inactive schedulers that it can steal a task from. If there are no inactive schedulers with stealable tasks then it will look at active schedulers, starting with schedulers having a higher id than itself, trying to find a stealable task.
The task stealing will look at one scheduler at a time and try to steal the highest priority task of that scheduler. Since this is done per scheduler there might actually be higher priority tasks that are stealable on another scheduler which will not be taken.
The task stealing tries to move tasks towards schedulers with lower numbers by trying to steal from schedulers with higher numbers, but since the stealing also will wrap around and steal from schedulers with lower numbers the result is that processes are spread out on all active schedulers.
Task stealing is quite fast and can be done on every iteration of the scheduler loop when a scheduler has run out of tasks.
To really utilize the schedulers optimally a more elaborate migration strategy is used. The current strategy is to compact the load to as few schedulers as possible, while at the same time spread it out so that no scheduler is overloaded.
This is done by the function check_balance in erl_process.c.
The migration is done by first setting up a migration plan and then letting schedulers execute on that plan until a new plan is set up. Every 2000*2000 reductions a scheduler calculates a migration path per priority per scheduler by looking at the workload of all schedulers. The migration path can have three different types of values: 1) cleared 2) migrate to scheduler # 3) immigrate from scheduler #
When a process becomes ready (for example by receiving a message or triggering a timeout) it will normally be scheduled on the last scheduler it ran on (S1). That is, if the migration path of that scheduler (S1), at that priority, is cleared. If the migration path of the scheduler is set to emigrate (to S2) the process will be handed over to that scheduler if both S1 and S2 have unbalanced run-queues. We will get back to what that means.
When a scheduler (S1) is to pick a new process to execute it checks to see if it has an immigration path from (S2) set. If the two involved schedulers have unbalanced run-queues S1 will steal a process from S2.
The migration path is calculated by comparing the maximum run-queues for each scheduler for a certain priority. Each scheduler will update a counter in each iteration of its scheduler loop keeping track of the maximal queue length. This information is then used to calculate an average (max) queue length (AMQL).
Max Run Q Length 5 o o o o Avg: 2.5 -------------- o o o 1 o o o scheduler S1 S2 S3 S4
Then the schedulers are sorted on their max queue lengths.
Max Run Q Length 5 o o o o Avg: 2.5 -------------- o o o 1 o o o scheduler S3 S4 S1 S2 ^ ^ | | tix fix
Any scheduler with a longer run queue than average (S1, S2) will be marked for emigration and any scheduler with a shorter max run queue than average (S3, S4) will be targeted for immigration.
This is done by looping over the ordered set of schedulers with two indices (immigrate from (fix)) and (emigrate to (tix)). In each iteration of the a loop the immigration path of S[tix] is set to S[fix] and the emigration path of S[fix] is set to S[tix]. Then tix is increased and fix decreased till they both pass the balance point. If one index reaches the balance point first it wraps.
In the example: * Iteration 1: S2.emigrate_to = S3 and S3.immigrate_from = S2 * Iteration 2: S1.emigrate_to = S4 and S4.immigrate_from = S1
Then we are done.
In reality things are a bit more complicated since schedulers can be taken offline. The migration planning is only done for online schedulers. Also, as mentioned before, this is done per priority level.
When a process is to be inserted into a ready queue and there is a migration path set from S1 to S2 the scheduler first checks that the run queue of S1 is larger than AMQL and that the run queue of S2 is smaller than the average. This way the migration is only allowed if both queues are still unbalanced.
There are two exceptions though where a migration is forced even when the queues are balanced or even imbalanced in the wrong way. In both these cases a special evacuation flag is set which overrides the balance test.
The evacuation flag is set when a scheduler is taken offline to ensure that no new processes are scheduled on an offline scheduler. The flag is also set when the scheduler detects that no progress is made on some priority. That is, if there for example is a max priority process which always is ready to run so that no normal priority processes ever are scheduled. Then the evacuation flag will be set for the normal priority queue for that scheduler.