Skip to content

Commit

Permalink
fix: while the main thread is blocked signals might pile up and cause…
Browse files Browse the repository at this point in the history
… a "maximum recursion depth exceeded" error when they are eventually executed all at once

closes snaptec#2003, closes snaptec#1994
  • Loading branch information
yankee42 committed Feb 8, 2022
1 parent 9335751 commit 3c0aa20
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 59 deletions.
22 changes: 22 additions & 0 deletions packages/helpermodules/skip_while_unchanged.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import threading
from typing import Callable, TypeVar

T = TypeVar("T")


def skip_while_unchanged(source: Callable, initial=None):
"""Before each call check if value given by `source` has changed. If it has not, ignore the call"""
def wrap(function: T) -> T:
previous = [initial]
lock = threading.Lock()

def wrapper(*args, **kwargs):
with lock:
next = source()
if previous[0] != next:
function(*args, **kwargs)
previous[0] = next

return wrapper

return wrap
55 changes: 55 additions & 0 deletions packages/helpermodules/skip_while_unchanged_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import itertools
from unittest.mock import Mock, call

import pytest

from helpermodules.skip_while_unchanged import skip_while_unchanged


def test_skip_while_unchanged_calls_original_with_arguments():
# setup
mock = Mock()

# execution
decorated = skip_while_unchanged(itertools.count().__next__)(mock)
decorated(1, 2, some_key="some value")
decorated(3)

# evaluation
expected_calls = [call(1, 2, some_key="some value"), call(3)]
mock.assert_has_calls(expected_calls)
assert mock.call_count == len(expected_calls)


def test_skip_while_unchanged_skips_calls_on_no_change():
# setup
mock = Mock()

# execution
decorated = skip_while_unchanged([1, 1, 1, 2, 2].__iter__().__next__)(mock)
decorated(1)
decorated(2)
decorated(3)
decorated(4)
decorated(5)

# evaluation
expected_calls = [call(1), call(4)]
mock.assert_has_calls(expected_calls)
assert mock.call_count == len(expected_calls)


def test_skip_while_unchanged_recalls_if_function_throws_exception():
# setup
mock = Mock(side_effect=Exception("dummy"))

# execution
decorated = skip_while_unchanged(itertools.count().__next__)(mock)
for i in range(1, 3):
with pytest.raises(Exception):
decorated(i)

# evaluation
expected_calls = [call(1), call(2)]
mock.assert_has_calls(expected_calls)
assert mock.call_count == len(expected_calls)
64 changes: 5 additions & 59 deletions packages/legacy_run_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@
environment first.
"""
import contextlib
import fcntl
import importlib
import io
import json
import logging
import os
import re
import signal
import socket
import sys
import threading
Expand All @@ -25,8 +22,10 @@
from typing import Callable

from helpermodules.log import setup_logging_stdout
from helpermodules.skip_while_unchanged import skip_while_unchanged

log = logging.getLogger("legacy run server")
openwb_conf_path = Path(__file__).parents[1] / "openwb.conf"


def read_all_bytes(connection: socket.socket):
Expand Down Expand Up @@ -104,19 +103,15 @@ def handle_message(message: bytes):
message_str = message.decode("utf-8").strip()
time_start = time.time()
log.debug("Received command %.100s", message_str)
update_log_level_from_config()
parsed = json.loads(message_str)
importlib.import_module(parsed[0]).main(parsed[1:])
log.debug("Completed running command in %.2fs: %.100s", time.time() - time_start, message_str)


@skip_while_unchanged(lambda: openwb_conf_path.stat().st_mtime)
def try_update_log_level_from_config():
try:
config_file_contents = (Path(__file__).parents[1] / "openwb.conf").read_text("utf-8")
except Exception as e:
# In case we cannot read the config file (maybe due to some lock, race conditions or someone moved the file
# temporarily), we just ignore the change
log.debug("Could not read openwb.conf. Ignoring.", exc_info=e)
return
config_file_contents = openwb_conf_path.read_text("utf-8")
match = re.search("^debug=([012])", config_file_contents, re.MULTILINE)
if match is None:
logging.getLogger().setLevel(logging.DEBUG)
Expand All @@ -140,58 +135,9 @@ def update_log_level_from_config():
log.error("Could not update log level from openwb.conf", exc_info=e)


class AtomicInteger:
def __init__(self):
self._value = 0
self._lock = threading.Lock()

def increment_and_get(self):
with self._lock:
self._value += 1
return self._value

def get(self):
with self._lock:
return self._value


def watch_config():
"""This function watches the openwb.conf file for modifications. If it is modified, the log level is refreshed
The function uses the F_NOTIFY Linux kernel feature to receive a notification if a file in the directory where
openwb.conf is stored changes.
However the linux kernel is very fast in sending notifications. If a process changes a file it is likely that the
file is changed in multiple write operations that all happen within a few milliseconds. In this case we also receive
multiple notifications. Thus we cannot tell if the process modifying our file has finished updating the file (best
would be to use file locks, but openWB does not use file locks. It would require changes at a vast number of
places).
To workaround the issue we simply wait 200ms after a change was detected. If within these 200ms another change is
detected, the timer is reset. If there has not been any notification for 200ms we assume that there are no pending
updates.
"""
latest_signal_id = AtomicInteger()

def signal_handler_delayed(current_signal_id: int):
time.sleep(.2)
if current_signal_id == latest_signal_id.get():
# The signal id has not changed. This means there have not been any further updates during the last 200ms.
update_log_level_from_config()

def signal_handler(_signum, _frame):
threading.Thread(target=signal_handler_delayed, args=(latest_signal_id.increment_and_get(),)).start()

signal.signal(signal.SIGIO, signal_handler)
file_descriptor = os.open(str(Path(__file__).parents[1]), os.O_RDONLY)
fcntl.fcntl(file_descriptor, fcntl.F_SETSIG, 0)
fcntl.fcntl(file_descriptor, fcntl.F_NOTIFY, fcntl.DN_MODIFY | fcntl.DN_MULTISHOT)


if __name__ == '__main__':
setup_logging_stdout()
sys.excepthook = exception_handler
update_log_level_from_config()
watch_config()
log.info("Starting legacy run server")
SocketListener(Path(__file__).parent / "legacy_run_server.sock", handle_message).handle_connections()

0 comments on commit 3c0aa20

Please sign in to comment.