Skip to content

Commit

Permalink
Lockless implementation of add_callback
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Nov 3, 2016
1 parent 4d783c6 commit 29b1429
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 41 deletions.
63 changes: 22 additions & 41 deletions tornado/ioloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from __future__ import absolute_import, division, print_function, with_statement

import collections
import datetime
import errno
import functools
Expand Down Expand Up @@ -693,8 +694,7 @@ def initialize(self, impl, time_func=None, **kwargs):
self.time_func = time_func or time.time
self._handlers = {}
self._events = {}
self._callbacks = []
self._callback_lock = threading.Lock()
self._callbacks = collections.deque()
self._timeouts = []
self._cancellations = 0
self._running = False
Expand All @@ -712,8 +712,8 @@ def initialize(self, impl, time_func=None, **kwargs):
self.READ)

def close(self, all_fds=False):
with self._callback_lock:
self._closing = True
self._waker.mark_closing()
self._closing = True
self.remove_handler(self._waker.fileno())
if all_fds:
for fd, handler in self._handlers.values():
Expand Down Expand Up @@ -798,12 +798,6 @@ def start(self):

try:
while True:
# Prevent IO event starvation by delaying new callbacks
# to the next iteration of the event loop.
with self._callback_lock:
callbacks = self._callbacks
self._callbacks = []

# Add any timeouts that have come due to the callback list.
# Do not run anything until we have determined which ones
# are ready, so timeouts that call add_timeout cannot
Expand Down Expand Up @@ -831,14 +825,17 @@ def start(self):
if x.callback is not None]
heapq.heapify(self._timeouts)

for callback in callbacks:
self._run_callback(callback)
# Prevent IO event starvation by delaying new callbacks
# to the next iteration of the event loop.
n = len(self._callbacks)
for i in range(n):
self._run_callback(self._callbacks.popleft())
for timeout in due_timeouts:
if timeout.callback is not None:
self._run_callback(timeout.callback)
# Closures may be holding on to a lot of memory, so allow
# them to be freed before we go into our poll wait.
callbacks = callback = due_timeouts = timeout = None
due_timeouts = timeout = None

if self._callbacks:
# If any callbacks or timeouts called add_callback,
Expand Down Expand Up @@ -934,36 +931,20 @@ def remove_timeout(self, timeout):
self._cancellations += 1

def add_callback(self, callback, *args, **kwargs):
if self._closing:
return
# Blindly insert into self._callbacks. This is safe even
# from signal handlers because deque.append is atomic.
self._callbacks.append(functools.partial(
stack_context.wrap(callback), *args, **kwargs))
if thread.get_ident() != self._thread_ident:
# If we're not on the IOLoop's thread, we need to synchronize
# with other threads, or waking logic will induce a race.
with self._callback_lock:
if self._closing:
return
list_empty = not self._callbacks
self._callbacks.append(functools.partial(
stack_context.wrap(callback), *args, **kwargs))
if list_empty:
# If we're not in the IOLoop's thread, and we added the
# first callback to an empty list, we may need to wake it
# up (it may wake up on its own, but an occasional extra
# wake is harmless). Waking up a polling IOLoop is
# relatively expensive, so we try to avoid it when we can.
self._waker.wake()
# This will write one byte but Waker.consume() reads many
# at once, so it's ok to write even when not strictly
# necessary.
self._waker.wake()
else:
if self._closing:
return
# If we're on the IOLoop's thread, we don't need the lock,
# since we don't need to wake anyone, just add the
# callback. Blindly insert into self._callbacks. This is
# safe even from signal handlers because the GIL makes
# list.append atomic. One subtlety is that if the signal
# is interrupting another thread holding the
# _callback_lock block in IOLoop.start, we may modify
# either the old or new version of self._callbacks, but
# either way will work.
self._callbacks.append(functools.partial(
stack_context.wrap(callback), *args, **kwargs))
# If we're on the IOLoop's thread, we don't need to wake anyone.
pass

def add_callback_from_signal(self, callback, *args, **kwargs):
with stack_context.NullContext():
Expand Down
8 changes: 8 additions & 0 deletions tornado/platform/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(self):
# Based on Zope select_trigger.py:
# https://github.com/zopefoundation/Zope/blob/master/src/ZServer/medusa/thread/select_trigger.py

self.closing = False
self.writer = socket.socket()
# Disable buffering -- pulling the trigger sends 1 byte,
# and we want that sent immediately, to wake up ASAP.
Expand Down Expand Up @@ -73,6 +74,10 @@ def write_fileno(self):
return self.writer.fileno()

def wake(self):
if self.closing:
# Avoid issue #875 (race condition when closing the fd in another
# thread).
return
try:
self.writer.send(b"x")
except (IOError, socket.error):
Expand All @@ -90,3 +95,6 @@ def consume(self):
def close(self):
self.reader.close()
self.writer.close()

def mark_closing(self):
self.closing = True
5 changes: 5 additions & 0 deletions tornado/platform/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,10 @@ def close(self):
"""Closes the waker's file descriptor(s)."""
raise NotImplementedError()

def mark_closing(self):
"""Mark the waker as closing."""
raise NotImplementedError()


def monotonic_time():
raise NotImplementedError()
8 changes: 8 additions & 0 deletions tornado/platform/posix.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def _set_nonblocking(fd):

class Waker(interface.Waker):
def __init__(self):
self.closing = False
r, w = os.pipe()
_set_nonblocking(r)
_set_nonblocking(w)
Expand All @@ -51,6 +52,10 @@ def write_fileno(self):
return self.writer.fileno()

def wake(self):
if self.closing:
# Avoid issue #875 (race condition when closing the fd in another
# thread).
return
try:
self.writer.write(b"x")
except IOError:
Expand All @@ -68,3 +73,6 @@ def consume(self):
def close(self):
self.reader.close()
self.writer.close()

def mark_closing(self):
self.closing = True

0 comments on commit 29b1429

Please sign in to comment.