Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve LW3 protocol handling, implement source selection and player state #6

Merged
merged 10 commits into from
Sep 11, 2024
4 changes: 2 additions & 2 deletions .github/workflows/ruff.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
name: Ruff
name: Python
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
ruff:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand Down
22 changes: 22 additions & 0 deletions .github/workflows/unittest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: Python
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v3
with:
python-version: "3.12"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install homeassistant
- name: Run tests
run: |
python -m unittest discover -s tests/ -v
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,9 @@
# ha-vinx

Custom integration for controlling Lightware VINX encoders and decoders

## Tests

```bash
python3 -m unittest discover -s tests/ -v
```
2 changes: 1 addition & 1 deletion custom_components/vinx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from custom_components.vinx.const import DOMAIN
from custom_components.vinx.lw3 import LW3

PLATFORMS: list[Platform] = [Platform.MEDIA_PLAYER]
PLATFORMS: list[Platform] = [Platform.MEDIA_PLAYER, Platform.BUTTON]


@dataclass
Expand Down
52 changes: 52 additions & 0 deletions custom_components/vinx/button.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import logging

from homeassistant.components.button import ButtonDeviceClass, ButtonEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.helpers.device_registry import DeviceInfo

from custom_components.vinx import LW3, DeviceInformation, VinxRuntimeData

_LOGGER = logging.getLogger(__name__)


async def async_setup_entry(hass, entry: ConfigEntry, async_add_entities):
# Extract stored runtime data
runtime_data: VinxRuntimeData = entry.runtime_data
_LOGGER.info(f"Runtime data: {runtime_data}")

# Add entity to Home Assistant
async_add_entities([VinxRebootButtonEntity(runtime_data.lw3, runtime_data.device_information)])


class VinxRebootButtonEntity(ButtonEntity):
def __init__(self, lw3: LW3, device_information: DeviceInformation) -> None:
self._lw3 = lw3
self._device_information = device_information

_attr_device_class = ButtonDeviceClass.RESTART

@property
def unique_id(self) -> str | None:
return f"vinx_{self._device_information.mac_address}_reboot_button"

@property
def device_info(self) -> DeviceInfo:
return self._device_information.device_info

@property
def name(self):
# Use increasingly less descriptive names depending on what information is available
device_label = self._device_information.device_label
serial_number = self._device_information.device_info.get("serial_number")

if device_label:
return f"{self._device_information.device_label} reboot button"
elif serial_number:
return f"VINX {serial_number} reboot button"
else:
return "VINX reboot button"

async def async_press(self) -> None:
async with self._lw3.connection():
_LOGGER.info("Issuing device reset")
await self._lw3.call("/SYS", "reset(1)")
149 changes: 124 additions & 25 deletions custom_components/vinx/lw3.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,107 @@

from asyncio import StreamReader, StreamWriter
from dataclasses import dataclass
from enum import Enum


@dataclass
class Response:
class SingleLineResponse:
prefix: str
path: str


@dataclass
class PropertyResponse(Response):
class PropertyResponse(SingleLineResponse):
value: str

def __str__(self):
return self.value


@dataclass
class ErrorResponse(Response):
class ErrorResponse(SingleLineResponse):
code: int
message: str

def __str__(self):
return self.message


@dataclass
class NodeResponse(SingleLineResponse):
pass


@dataclass
class MethodResponse(SingleLineResponse):
name: str

def __str__(self):
return self.name


type MultiLineResponse = list[SingleLineResponse]
type Response = SingleLineResponse | MultiLineResponse


class ResponseType(Enum):
Node = 1
Property = 2
Error = 3
Method = 4


def get_response_type(response: str) -> ResponseType:
if response[1] == "E":
return ResponseType.Error
elif response[0] == "p":
return ResponseType.Property
elif response[0] == "n":
return ResponseType.Node
elif response[0] == "m":
return ResponseType.Method

raise ValueError("Unknown response type")


def parse_single_line_response(response: str) -> SingleLineResponse:
match get_response_type(response):
case ResponseType.Error:
matches = re.search(r"^(.E) (.*) %(E[0-9]+):(.*)$", response)
return ErrorResponse(matches.group(1), matches.group(2), matches.group(3), matches.group(4))
case ResponseType.Property:
matches = re.fullmatch(r"^p(.*) (.*)=(.*)$", response)
return PropertyResponse(f"p{matches.group(1)}", matches.group(2), matches.group(3))
case ResponseType.Node:
matches = re.fullmatch(r"^n(.*) (.*)$", response)
return NodeResponse(f"n{matches.group(1)}", matches.group(2))
case ResponseType.Method:
matches = re.fullmatch(r"^m(.*) (.*):(.*)$", response)
return MethodResponse(f"m{matches.group(1)}", matches.group(2), matches.group(3))


def parse_multiline_response(lines: list[str]) -> MultiLineResponse:
return [parse_single_line_response(response) for response in lines]


def parse_response(response: str) -> Response:
lines = response.split("\r\n")

# Determine if we're dealing with a single line response or multiple
if len(lines) == 3:
return parse_single_line_response(lines[1])
else:
return parse_multiline_response(lines[1:-1])


def is_encoder_discovery_node(node: Response) -> bool:
return isinstance(node, NodeResponse) and "TX" in node.path


def is_decoder_discovery_node(node: Response) -> bool:
return isinstance(node, NodeResponse) and "RX" in node.path


class LW3:
def __init__(self, hostname: str, port: int, timeout: int = 5):
self._hostname = hostname
Expand Down Expand Up @@ -57,51 +133,74 @@ async def _disconnect(self):
self._writer.close()
await self._writer.wait_closed()

@staticmethod
def _is_error_response(response: str) -> bool:
return response[1] == "E"

@staticmethod
def parse_response(response: str) -> PropertyResponse | ErrorResponse:
if LW3._is_error_response(response):
matches = re.search(r"^(.E) (.*) %(E[0-9]+):(.*)$", response)
return ErrorResponse(matches.group(1), matches.group(2), matches.group(3), matches.group(4))

matches = re.fullmatch(r"^(.*) (.*)=(.*)$", response)
return PropertyResponse(matches.group(1), matches.group(2), matches.group(3))

async def _read_and_parse_response(self) -> PropertyResponse:
response = await self._read_until("\r\n")
async def _read_and_parse_response(self) -> Response:
# All commands are wrapped with a signature, so read until the end delimiter
response = await self._read_until("}")

if response is None:
raise EOFError("Reached EOF while reading, connection probably lost")

result = self.parse_response(response.strip())
result = parse_response(response.strip())

if isinstance(result, ErrorResponse):
raise ValueError(result)

return result

async def _run_get_property(self, path: str) -> PropertyResponse:
async def _run_get(self, path: str) -> Response:
async with self._semaphore:
self._writer.write(f"GET {path}\r\n".encode())
self._writer.write(f"0000#GET {path}\r\n".encode())
await self._writer.drain()

return await self._read_and_parse_response()

async def _run_set_property(self, path: str, value: str) -> PropertyResponse:
async def _run_set(self, path: str, value: str) -> Response:
async with self._semaphore:
self._writer.write(f"SET {path}={value}\r\n".encode())
self._writer.write(f"0000#SET {path}={value}\r\n".encode())
await self._writer.drain()

return await self._read_and_parse_response()

async def _run_get_all(self, path: str) -> Response:
async with self._semaphore:
self._writer.write(f"0000#GETALL {path}\r\n".encode())
await self._writer.drain()

return await self._read_and_parse_response()

async def _run_call(self, path: str, method: str) -> Response:
async with self._semaphore:
self._writer.write(f"0000#CALL {path}:{method}\r\n".encode())
await self._writer.drain()

return await self._read_and_parse_response()

async def get_property(self, path: str) -> PropertyResponse:
return await asyncio.wait_for(self._run_get_property(path), self._timeout)
response = await asyncio.wait_for(self._run_get(path), self._timeout)

if not isinstance(response, PropertyResponse):
raise ValueError(f"Requested path {path} does not return a property")

return response

async def set_property(self, path: str, value: str) -> PropertyResponse:
return await asyncio.wait_for(self._run_set_property(path, value), self._timeout)
response = await asyncio.wait_for(self._run_set(path, value), self._timeout)

if not isinstance(response, PropertyResponse):
raise ValueError(f"Requested path {path} does not return a property")

return response

async def get_all(self, path: str) -> Response:
return await asyncio.wait_for(self._run_get_all(path), self._timeout)

async def call(self, path: str, method: str) -> MethodResponse:
response = await asyncio.wait_for(self._run_call(path, method), self._timeout)

if not isinstance(response, MethodResponse):
raise ValueError(f"Called method {path}:{method} does not return a method response")

return response


class LW3ConnectionContext:
Expand Down
2 changes: 1 addition & 1 deletion custom_components/vinx/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"documentation": "https://www.home-assistant.io/integrations/vinx",
"homekit": {},
"iot_class": "local_polling",
"requirements": [],
"requirements": ["bidict"],
"ssdp": [],
"zeroconf": ["_lwr3._tcp.local."]
}
Loading
Loading