forked from snaptec/openWB
-
Notifications
You must be signed in to change notification settings - Fork 0
/
legacy_run_server_test.py
executable file
·38 lines (30 loc) · 1.06 KB
/
legacy_run_server_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import socket
import threading
from pathlib import Path
from unittest.mock import Mock, call
from legacy_run_server import SocketListener
def send_message(path: str, msg: bytes):
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.connect(path)
sock.sendall(msg)
def test_socket_listener_reads_bytes(tmp_path: Path):
# setup
socket_path = tmp_path / "socket"
socket_path_str = str(socket_path)
condition = threading.Condition()
mock = Mock()
def listener(data: bytes):
mock(data)
with condition:
condition.notify()
# execution
socket_listener = SocketListener(socket_path, listener)
thread = threading.Thread(target=socket_listener.handle_connections, daemon=True)
thread.start()
send_message(socket_path_str, b"first")
send_message(socket_path_str, b"second")
# evaluation
with condition:
condition.wait_for(lambda: mock.call_count == 2)
socket_listener.close()
mock.assert_has_calls([call(b"first"), call(b"second")], any_order=True)