Skip to content

Commit

Permalink
add security event handling (lbbrhzn#634)
Browse files Browse the repository at this point in the history
  • Loading branch information
drc38 authored Oct 12, 2022
1 parent 62e4d6a commit ea5cba0
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 0 deletions.
14 changes: 14 additions & 0 deletions custom_components/ocpp/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,20 @@ def on_diagnostics_status(self, status, **kwargs):
)
return call_result.DiagnosticsStatusNotificationPayload()

@on(Action.SecurityEventNotification)
def on_security_event(self, type, timestamp, **kwargs):
"""Handle security event notification."""
_LOGGER.info(
"Security event notification received: %s at %s [techinfo: %s]",
type,
timestamp,
kwargs.get(om.tech_info.name, "none"),
)
self.hass.async_create_task(
self.notify_ha(f"Security event notification received: {type}")
)
return call_result.SecurityEventNotificationPayload()

def get_authorization_status(self, id_tag):
"""Get the authorization status for an id_tag."""
# get the domain wide configuration
Expand Down
1 change: 1 addition & 0 deletions custom_components/ocpp/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class OcppMisc(str, Enum):
feature_profile_reservation = "Reservation"
feature_profile_remote = "RemoteTrigger"
feature_profile_auth = "LocalAuthListManagement"
tech_info = "techInfo"

# for use with Smart Charging
current = "Current"
Expand Down
10 changes: 10 additions & 0 deletions tests/test_charge_point.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ async def test_services(hass, socket_enabled):
cp.send_authorize(),
cp.send_heartbeat(),
cp.send_status_notification(),
cp.send_security_event(),
cp.send_firmware_status(),
cp.send_data_transfer(),
cp.send_start_transaction(),
Expand Down Expand Up @@ -877,3 +878,12 @@ async def send_stop_transaction(self, delay: int = 0):
)
resp = await self.call(request)
assert resp.id_tag_info["status"] == AuthorizationStatus.accepted.value

async def send_security_event(self):
"""Send a security event notification."""
request = call.SecurityEventNotificationPayload(
type="SettingSystemTime",
timestamp="2022-09-29T20:58:29Z",
tech_info="BootNotification",
)
await self.call(request)

0 comments on commit ea5cba0

Please sign in to comment.