-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update CI, lint, prepare 1.1.0 release #16
Conversation
cast @caiolombello |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much better, I can learn a lot here with you. I'll test it later.
from pysignalr.protocol.json import JSONProtocol | ||
from pysignalr.transport.abstract import Transport | ||
from pysignalr.transport.websocket import ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why you do imports from same module on each line rather than use a grouped import?
import pytest | ||
import requests | ||
from docker.client import DockerClient # type: ignore[import-untyped] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't know about this possibility, good idea
I don't know why I can't push the changes I made to the tests. I fixed local test errors and resolved warnings, but here is the code: import asyncio
import atexit
import logging
import time
from contextlib import suppress
from pathlib import Path
from typing import Any, cast
import _pytest.outcomes
import pytest
import requests
from docker.client import DockerClient # type: ignore[import-untyped]
from pysignalr.client import SignalRClient
from pysignalr.exceptions import AuthorizationError
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
def get_docker_client() -> DockerClient:
"""Get Docker client instance if socket is available; skip test otherwise."""
docker_socks = (
Path('/var/run/docker.sock'),
Path.home() / 'Library' / 'Containers' / 'com.docker.docker' / 'Data' / 'vms' / '0' / 'docker.sock',
Path.home() / 'Library' / 'Library' / 'Containers' / 'com.docker.docker' / 'Data' / 'docker.sock',
)
for path in docker_socks:
if path.exists():
return DockerClient(base_url=f'unix://{path}')
raise _pytest.outcomes.Skipped(
'Docker socket not found',
allow_module_level=True,
)
@pytest.fixture(scope='module')
async def aspnet_server() -> str:
"""Run dummy ASPNet server container (destroyed on exit) and return its IP and port."""
docker = get_docker_client()
logging.info('Building ASPNet server image (this may take a while)')
docker.images.build(
path=Path(__file__).parent.parent.parent.joinpath('AspNetAuthExample').as_posix(),
tag='aspnet_server',
)
logging.info('Starting ASPNet server container')
container = docker.containers.run(
image='aspnet_server',
environment={
'ASPNETCORE_ENVIRONMENT': 'Development',
'ASPNETCORE_URLS': 'http://+:5000', # Ensure the port is 5000
},
ports={'5000/tcp': 5000}, # Expose port 5000 to host
detach=True,
remove=True,
)
atexit.register(container.stop)
container.reload()
host_port = container.attrs['NetworkSettings']['Ports']['5000/tcp'][0]['HostPort']
ip = cast(str, container.attrs['NetworkSettings']['IPAddress'])
logging.info('Container is running at IP: %s and HostPort: %s', ip, host_port)
logging.info('Waiting for server to start')
try:
wait_for_server(f'http://{ip}:{host_port}/api/auth/login')
except TimeoutError:
ip = "127.0.0.1"
wait_for_server(f'http://{ip}:{host_port}/api/auth/login')
return f"{ip}:{host_port}"
def wait_for_server(url: str, timeout: int = 20) -> None:
"""
Waits for the server to be ready.
Args:
url (str): The URL to check the server status.
timeout (int): The maximum time to wait for the server to be ready.
"""
start = time.time()
while True:
try:
response = requests.post(url, json={'username': 'test', 'password': 'password'}, timeout=10)
if response.status_code in [200, 401, 403]:
logging.info('Server is up and running at %s', url)
break
except requests.exceptions.RequestException as e:
logging.info('Waiting for server: %s', e)
if time.time() - start > timeout:
raise TimeoutError('Server did not start in time')
time.sleep(2)
@pytest.mark.asyncio(scope='module')
class TestPysignalr:
async def test_connection(self, aspnet_server: str) -> None:
"""
Tests connection to the SignalR server.
"""
url = f'http://{aspnet_server}/weatherHub'
logging.info('Testing connection to %s', url)
client = SignalRClient(url)
task = asyncio.create_task(client.run())
async def _on_open() -> None:
logging.info('Connection opened, cancelling task')
task.cancel()
client.on_open(_on_open)
with suppress(asyncio.CancelledError):
await task
async def test_connection_with_token(self, aspnet_server: str) -> None:
"""
Tests connection to the SignalR server with a valid token.
"""
login_url = f'http://{aspnet_server}/api/auth/login'
logging.info('Attempting to log in at %s', login_url)
login_data = {'username': 'test', 'password': 'password'}
response = requests.post(login_url, json=login_data, timeout=10)
token = response.json().get('token')
if not token:
pytest.fail('Failed to obtain token from login response')
url = f'http://{aspnet_server}/weatherHub'
logging.info('Testing connection with token to %s', url)
def token_factory() -> str:
return cast(str, token)
client = SignalRClient(
url=url,
access_token_factory=token_factory,
headers={'mycustomheader': 'mycustomheadervalue'},
)
task = asyncio.create_task(client.run())
async def _on_open() -> None:
logging.info('Connection with token opened, cancelling task')
task.cancel()
client.on_open(_on_open)
with suppress(asyncio.CancelledError):
await task
# Verify the token in the connection headers
assert 'Authorization' in client._transport._headers
assert client._transport._headers['Authorization'] == f'Bearer {token}'
async def test_invalid_token(self, aspnet_server: str) -> None:
"""
Tests connection to the SignalR server with an invalid token.
"""
url = f'http://{aspnet_server}/weatherHub'
logging.info('Testing connection with invalid token to %s', url)
def invalid_token_factory() -> str:
return 'invalid_token' # Simulate an invalid token
client = SignalRClient(
url=url,
access_token_factory=invalid_token_factory,
headers={'mycustomheader': 'mycustomheadervalue'},
)
task = asyncio.create_task(client.run())
async def _on_open() -> None:
logging.info('Connection with invalid token opened, cancelling task')
task.cancel()
client.on_open(_on_open)
with suppress(asyncio.CancelledError):
try:
await task
except AuthorizationError:
logging.info('AuthorizationError caught as expected')
pass
# Verify if the AuthorizationError was raised correctly
assert task.cancelled() is True
async def test_send_and_receive_message(self, aspnet_server: str) -> None:
"""
Tests sending and receiving a message with the SignalR server.
"""
login_url = f'http://{aspnet_server}/api/auth/login'
logging.info('Attempting to log in at %s', login_url)
login_data = {'username': 'test', 'password': 'password'}
response = requests.post(login_url, json=login_data, timeout=10)
token = response.json().get('token')
if not token:
logging.error('Failed to obtain token from login response')
raise AssertionError('Failed to obtain token from login response')
logging.info('Obtained token: %s', token)
url = f'http://{aspnet_server}/weatherHub'
logging.info('Testing send and receive message with token to %s', url)
def token_factory() -> str:
return cast(str, token)
client = SignalRClient(
url=url,
access_token_factory=token_factory,
headers={'mycustomheader': 'mycustomheadervalue'},
)
received_messages = []
async def on_message_received(arguments: Any) -> None:
user, message = arguments
logging.info('Message received from %s: %s', user, message)
received_messages.append((user, message))
if len(received_messages) >= 1:
task.cancel()
client.on('ReceiveMessage', on_message_received)
task = asyncio.create_task(client.run())
async def _on_open() -> None:
logging.info('Connection with token opened, sending message')
await client.send('SendMessage', ['testuser', 'Hello, World!']) # type: ignore[arg-type]
client.on_open(_on_open)
with suppress(asyncio.CancelledError):
await task
assert received_messages, 'Expected to receive at least one message'
assert received_messages[0] == ('testuser', 'Hello, World!')
@pytest.fixture(scope="module", autouse=True)
async def ensure_all_tasks_completed():
try:
yield
finally:
pending = asyncio.all_tasks()
for task in pending:
if not task.done():
task.cancel()
with suppress(asyncio.CancelledError):
await task
with suppress(RuntimeError):
await asyncio.get_running_loop().shutdown_asyncgens() I also found out that the pipeline probably committed as root, which is why I was having errors saving the coverage.xml. Additionally, my test file was not updating at all when running the test command. When I recreated the file, the changes were running as expected. |
Hi, will this branch be merged into master? |
d602d27
to
1d4b3b4
Compare
docker-py
, see the fixture.