Skip to content

Commit

Permalink
Merge branch 'shbert-messages'
Browse files Browse the repository at this point in the history
* shbert-messages:
  Reworked SHC Light model. Added CommunicationQualityService.
  Added type messageCode
  fixed minor issues
  Corrected parameter count for register
  Introduced type SHCMessage
  • Loading branch information
tschamm committed Jan 6, 2025
2 parents d913862 + da84099 commit d8182ee
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 1 deletion.
1 change: 1 addition & 0 deletions boschshcpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@
from .scenario import SHCScenario
from .session import SHCSession
from .userdefinedstate import SHCUserDefinedState
from .message import SHCMessage
6 changes: 6 additions & 0 deletions boschshcpy/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ def get_userdefinedstates(self):
api_url, expected_element_type="userDefinedState"
)

def get_messages(self):
api_url = f"{self._api_root}/messages"
return self._get_api_result_or_fail(
api_url, expected_element_type="message"
)

def get_devices(self):
api_url = f"{self._api_root}/devices"
return self._get_api_result_or_fail(api_url, expected_element_type="device")
Expand Down
64 changes: 64 additions & 0 deletions boschshcpy/message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import typing
import logging

logger = logging.getLogger("boschshcpy")

class SHCMessage:
def __init__(self, api, raw_message):
self.api = api
self._raw_message = raw_message

class MessageCode:
def __init__(self, message_code):
self._message_code = message_code

@property
def name(self):
return self._message_code["name"]

@property
def category(self):
return self._message_code["category"]

@property
def id(self):
return self._raw_message["id"]

@property
def message_code(self) -> MessageCode:
return self._raw_message["messageCode"]

@property
def source_type(self):
return self._raw_message["sourceType"]

@property
def timestamp(self):
return self._raw_message["timestamp"]

@property
def flags(self):
return self._raw_message["flags"]

@property
def arguments(self):
return self._raw_message["arguments"]

def summary(self):
print(f"Message : {self.id}")
print(f" Source : {self.source_type}")
print(f" Timestamp : {self.timestamp}")
print(f" MessageCode: {self.message_code}")
_flags_string = "; ".join(self.flags)
print(f" Flags : {_flags_string}")
#_arguments_string = "; ".join(self.arguments)
#print(f" Arguments : {_arguments_string}")
print(f" Arguments : {self.arguments}")

def process_long_polling_result(self, raw_result):
assert raw_result["@type"] == "message"
message_id = raw_result["id"]
logger.debug(
f"Got long polling result, not yet supported {message_id}"
)

18 changes: 18 additions & 0 deletions boschshcpy/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .information import SHCInformation
from .room import SHCRoom
from .scenario import SHCScenario
from .message import SHCMessage
from .userdefinedstate import SHCUserDefinedState
from .services_impl import SUPPORTED_DEVICE_SERVICE_IDS

Expand Down Expand Up @@ -41,6 +42,7 @@ def __init__(self, controller_ip: str, certificate, key, lazy=False, zeroconf=No
self._devices_by_id = {}
self._services_by_device_id = defaultdict(list)
self._domains_by_id = {}
self._messages_by_id = {}
self._userdefinedstates_by_id = {}
self._subscribers = []

Expand All @@ -62,6 +64,7 @@ def _enumerate_all(self):
self._enumerate_devices()
self._enumerate_rooms()
self._enumerate_scenarios()
self._enumerate_messages()
self._enumerate_userdefinedstates()
self._initialize_domains()

Expand Down Expand Up @@ -121,6 +124,13 @@ def _enumerate_scenarios(self):
scenario = SHCScenario(api=self._api, raw_scenario=raw_scenario)
self._scenarios_by_id[scenario_id] = scenario

def _enumerate_messages(self):
raw_messages = self._api.get_messages()
for raw_message in raw_messages:
message_id = raw_message["id"]
message = SHCMessage(api=self._api, raw_message=raw_message)
self._messages_by_id[message_id] = message

def _enumerate_userdefinedstates(self):
raw_states = self._api.get_userdefinedstates()
for raw_state in raw_states:
Expand Down Expand Up @@ -327,6 +337,10 @@ def scenario_names(self) -> typing.Sequence[str]:
def scenario(self, scenario_id) -> SHCScenario:
return self._scenarios_by_id[scenario_id]

@property
def messages(self) -> typing.Sequence[SHCMessage]:
return list(self._messages_by_id.values())

@property
def userdefinedstates(self) -> typing.Sequence[SHCUserDefinedState]:
return list(self._userdefinedstates_by_id.values())
Expand Down Expand Up @@ -368,6 +382,7 @@ def rawscan_commands(self):
"device_service",
"rooms",
"scenarios",
"messages",
"info",
"information",
"public_information",
Expand Down Expand Up @@ -398,6 +413,9 @@ def rawscan(self, **kwargs):

case "scenarios":
return self._api.get_scenarios()

case "messages":
return self._api.get_messages()

case "info" | "information":
return self._api.get_information()
Expand Down
45 changes: 45 additions & 0 deletions examples/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/usr/bin/env python

# Use this script to see messages from api

import os, sys
import time
import logging
from datetime import datetime

sys.path.append(os.path.join(os.path.dirname(__file__), '..'))

import boschshcpy

logger = logging.getLogger("boschshcpy")

def api_test():
session = boschshcpy.SHCSession(args.ip_address, args.access_cert, args.access_key, False)
#session.information.summary()
logger.debug("getting messages")
for _message in session.messages:
print(f"Timestamp of message: {datetime.fromtimestamp(_message.timestamp/1000.0)}")
_message.summary()


if __name__ == "__main__":
import argparse, sys

logging.basicConfig(level=logging.DEBUG)

parser = argparse.ArgumentParser()
parser.add_argument("-ac", "--access_cert",
help="Path to access certificate.",
default="keystore/boschshc-cert.pem")
parser.add_argument("-ak", "--access_key",
help="Path to access key.",
default="keystore/boschshc-key.pem")
parser.add_argument("-ip", "--ip_address",
help="IP of the smart home controller.")
args = parser.parse_args()

if len(sys.argv) == 1:
parser.print_help()
sys.exit()

api_test()
3 changes: 2 additions & 1 deletion examples/register_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
def registering():
# Create a BoschSHC client with the specified ACCESS_CERT and ACCESS_KEY.
helper = SHCRegisterClient(args.ip_address, args.password)
result = helper.register(args.id, args.name, args.access_cert)
#result = helper.register(args.id, args.name, args.access_cert)
result = helper.register(args.id, args.name)

if result != None:
print('successful registered new device with token {}'.format(result["token"]))
Expand Down

0 comments on commit d8182ee

Please sign in to comment.