Skip to content

Commit

Permalink
Merge pull request tornadoweb#1876 from pitrou/lockless_add_callback
Browse files Browse the repository at this point in the history
Lockless implementation of add_callback
  • Loading branch information
bdarnell authored Nov 4, 2016
2 parents 4d783c6 + c40e3a9 commit 804ae3f
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 43 deletions.
57 changes: 19 additions & 38 deletions tornado/ioloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from __future__ import absolute_import, division, print_function, with_statement

import collections
import datetime
import errno
import functools
Expand Down Expand Up @@ -693,8 +694,7 @@ def initialize(self, impl, time_func=None, **kwargs):
self.time_func = time_func or time.time
self._handlers = {}
self._events = {}
self._callbacks = []
self._callback_lock = threading.Lock()
self._callbacks = collections.deque()
self._timeouts = []
self._cancellations = 0
self._running = False
Expand All @@ -712,8 +712,7 @@ def initialize(self, impl, time_func=None, **kwargs):
self.READ)

def close(self, all_fds=False):
with self._callback_lock:
self._closing = True
self._closing = True
self.remove_handler(self._waker.fileno())
if all_fds:
for fd, handler in self._handlers.values():
Expand Down Expand Up @@ -800,9 +799,7 @@ def start(self):
while True:
# Prevent IO event starvation by delaying new callbacks
# to the next iteration of the event loop.
with self._callback_lock:
callbacks = self._callbacks
self._callbacks = []
ncallbacks = len(self._callbacks)

# Add any timeouts that have come due to the callback list.
# Do not run anything until we have determined which ones
Expand Down Expand Up @@ -831,14 +828,14 @@ def start(self):
if x.callback is not None]
heapq.heapify(self._timeouts)

for callback in callbacks:
self._run_callback(callback)
for i in range(ncallbacks):
self._run_callback(self._callbacks.popleft())
for timeout in due_timeouts:
if timeout.callback is not None:
self._run_callback(timeout.callback)
# Closures may be holding on to a lot of memory, so allow
# them to be freed before we go into our poll wait.
callbacks = callback = due_timeouts = timeout = None
due_timeouts = timeout = None

if self._callbacks:
# If any callbacks or timeouts called add_callback,
Expand Down Expand Up @@ -934,36 +931,20 @@ def remove_timeout(self, timeout):
self._cancellations += 1

def add_callback(self, callback, *args, **kwargs):
if self._closing:
return
# Blindly insert into self._callbacks. This is safe even
# from signal handlers because deque.append is atomic.
self._callbacks.append(functools.partial(
stack_context.wrap(callback), *args, **kwargs))
if thread.get_ident() != self._thread_ident:
# If we're not on the IOLoop's thread, we need to synchronize
# with other threads, or waking logic will induce a race.
with self._callback_lock:
if self._closing:
return
list_empty = not self._callbacks
self._callbacks.append(functools.partial(
stack_context.wrap(callback), *args, **kwargs))
if list_empty:
# If we're not in the IOLoop's thread, and we added the
# first callback to an empty list, we may need to wake it
# up (it may wake up on its own, but an occasional extra
# wake is harmless). Waking up a polling IOLoop is
# relatively expensive, so we try to avoid it when we can.
self._waker.wake()
# This will write one byte but Waker.consume() reads many
# at once, so it's ok to write even when not strictly
# necessary.
self._waker.wake()
else:
if self._closing:
return
# If we're on the IOLoop's thread, we don't need the lock,
# since we don't need to wake anyone, just add the
# callback. Blindly insert into self._callbacks. This is
# safe even from signal handlers because the GIL makes
# list.append atomic. One subtlety is that if the signal
# is interrupting another thread holding the
# _callback_lock block in IOLoop.start, we may modify
# either the old or new version of self._callbacks, but
# either way will work.
self._callbacks.append(functools.partial(
stack_context.wrap(callback), *args, **kwargs))
# If we're on the IOLoop's thread, we don't need to wake anyone.
pass

def add_callback_from_signal(self, callback, *args, **kwargs):
with stack_context.NullContext():
Expand Down
20 changes: 18 additions & 2 deletions tornado/platform/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,26 @@

import errno
import socket
import time

from tornado.platform import interface


def try_close(f):
# Avoid issue #875 (race condition when using the file in another
# thread).
for i in range(10):
try:
f.close()
except IOError:
# Yield to another thread
time.sleep(1e-3)
else:
break
# Try a last time and let raise
f.close()


class Waker(interface.Waker):
"""Create an OS independent asynchronous pipe.
Expand Down Expand Up @@ -75,7 +91,7 @@ def write_fileno(self):
def wake(self):
try:
self.writer.send(b"x")
except (IOError, socket.error):
except (IOError, socket.error, ValueError):
pass

def consume(self):
Expand All @@ -89,4 +105,4 @@ def consume(self):

def close(self):
self.reader.close()
self.writer.close()
try_close(self.writer)
6 changes: 3 additions & 3 deletions tornado/platform/posix.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import fcntl
import os

from tornado.platform import interface
from tornado.platform import common, interface


def set_close_exec(fd):
Expand Down Expand Up @@ -53,7 +53,7 @@ def write_fileno(self):
def wake(self):
try:
self.writer.write(b"x")
except IOError:
except (IOError, ValueError):
pass

def consume(self):
Expand All @@ -67,4 +67,4 @@ def consume(self):

def close(self):
self.reader.close()
self.writer.close()
common.try_close(self.writer)

0 comments on commit 804ae3f

Please sign in to comment.