Skip to content

Commit

Permalink
Add support for subscribe entities (home-assistant#250)
Browse files Browse the repository at this point in the history
Co-authored-by: J. Nick Koston <[email protected]>
  • Loading branch information
balloob and bdraco authored Mar 12, 2022
1 parent d241673 commit 95f166b
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 17 deletions.
1 change: 1 addition & 0 deletions .prettierignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ yarn-error.log
LICENSE.md
CLA.md
CODE_OF_CONDUCT.md
.reify-cache
24 changes: 20 additions & 4 deletions example.html
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,31 @@

(async () => {
let auth;
const storeAuth = true;
const authOptions = storeAuth
? {
async loadTokens() {
try {
return JSON.parse(localStorage.hassTokens);
} catch (err) {
return undefined;
}
},
saveTokens: (tokens) => {
localStorage.hassTokens = JSON.stringify(tokens);
},
}
: {};
try {
auth = await getAuth();
auth = await getAuth(authOptions);
} catch (err) {
if (err === ERR_HASS_HOST_REQUIRED) {
const hassUrl = prompt(
authOptions.hassUrl = prompt(
"What host to connect to?",
"http://localhost:8123"
);
if (!hassUrl) return;
auth = await getAuth({ hassUrl });
if (!authOptions.hassUrl) return;
auth = await getAuth(authOptions);
} else {
alert(`Unknown error: ${err}`);
return;
Expand All @@ -51,6 +66,7 @@
})();

function renderEntities(connection, entities) {
window.entities = entities;
const root = document.querySelector("tbody");
while (root.lastChild) root.removeChild(root.lastChild);

Expand Down
2 changes: 1 addition & 1 deletion lib/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ export class Connection {
*
* @param message the message to start the subscription
* @param callback the callback to be called when a new item arrives
* @param [options.resubscribe] re-established a subscription after a reconnect
* @param [options.resubscribe] re-established a subscription after a reconnect. Defaults to true.
* @returns promise that resolves to an unsubscribe function
*/
async subscribeMessage<Result>(
Expand Down
136 changes: 130 additions & 6 deletions lib/entities.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,132 @@
import { getCollection } from "./collection.js";
import { HassEntities, StateChangedEvent, UnsubscribeFunc } from "./types.js";
import {
Context,
HassEntities,
StateChangedEvent,
UnsubscribeFunc,
} from "./types.js";
import { Connection } from "./connection.js";
import { Store } from "./store.js";
import { getStates } from "./commands.js";
import { atLeastHaVersion } from "./util.js";

function processEvent(store: Store<HassEntities>, event: StateChangedEvent) {
interface EntityState {
/** state */
s: string;
/** attributes */
a: { [key: string]: any };
/** context */
c: Context | string;
/** last_changed; if set, also applies to lu */
lc: number;
/** last_updated */
lu: number;
}

interface EntityDiff {
/** additions */
"+"?: Partial<EntityState>;
/** subtractions */
"-"?: Pick<EntityState, "a">;
}

interface StatesUpdates {
/** add */
a?: Record<string, EntityState>;
/** remove */
r?: string[]; // remove
/** change */
c: Record<string, EntityDiff>;
}

function processEvent(store: Store<HassEntities>, updates: StatesUpdates) {
const state = { ...store.state };

if (updates.a) {
for (const entityId in updates.a) {
const newState = updates.a[entityId];
let last_changed = new Date(newState.lc * 1000).toISOString();
state[entityId] = {
entity_id: entityId,
state: newState.s,
attributes: newState.a,
context:
typeof newState.c === "string"
? { id: newState.c, parent_id: null, user_id: null }
: newState.c,
last_changed: last_changed,
last_updated: newState.lu
? new Date(newState.lu * 1000).toISOString()
: last_changed,
};
}
}

if (updates.r) {
for (const entityId of updates.r) {
delete state[entityId];
}
}

if (updates.c) {
for (const entityId in updates.c) {
let entityState = state[entityId];

if (!entityState) {
console.warn("Received state update for unknown entity", entityId);
continue;
}

entityState = { ...entityState };

const { "+": toAdd, "-": toRemove } = updates.c[entityId];

if (toAdd) {
if (toAdd.s) {
entityState.state = toAdd.s;
}
if (toAdd.c) {
if (typeof toAdd.c === "string") {
entityState.context = { ...entityState.context, id: toAdd.c };
} else {
entityState.context = { ...entityState.context, ...toAdd.c };
}
}
if (toAdd.lc) {
entityState.last_updated = entityState.last_changed = new Date(
toAdd.lc * 1000
).toISOString();
} else if (toAdd.lu) {
entityState.last_updated = new Date(toAdd.lu * 1000).toISOString();
}
if (toAdd.a) {
entityState.attributes = { ...entityState.attributes, ...toAdd.a };
}
}
if (toRemove) {
const attributes = { ...entityState.attributes };
for (const key in toRemove.a) {
delete attributes[key];
}
entityState.attributes = attributes;
}

state[entityId] = entityState;
}
}

store.setState(state, true);
}

const subscribeUpdates = (conn: Connection, store: Store<HassEntities>) =>
conn.subscribeMessage<StatesUpdates>((ev) => processEvent(store, ev), {
type: "subscribe_entities",
});

function legacyProcessEvent(
store: Store<HassEntities>,
event: StateChangedEvent
) {
const state = store.state;
if (state === undefined) return;

Expand All @@ -18,7 +140,7 @@ function processEvent(store: Store<HassEntities>, event: StateChangedEvent) {
}
}

async function fetchEntities(conn: Connection): Promise<HassEntities> {
async function legacyFetchEntities(conn: Connection): Promise<HassEntities> {
const states = await getStates(conn);
const entities: HassEntities = {};
for (let i = 0; i < states.length; i++) {
Expand All @@ -28,14 +150,16 @@ async function fetchEntities(conn: Connection): Promise<HassEntities> {
return entities;
}

const subscribeUpdates = (conn: Connection, store: Store<HassEntities>) =>
const legacySubscribeUpdates = (conn: Connection, store: Store<HassEntities>) =>
conn.subscribeEvents<StateChangedEvent>(
(ev) => processEvent(store, ev as StateChangedEvent),
(ev) => legacyProcessEvent(store, ev as StateChangedEvent),
"state_changed"
);

export const entitiesColl = (conn: Connection) =>
getCollection(conn, "_ent", fetchEntities, subscribeUpdates);
atLeastHaVersion(conn.haVersion, 2022, 4, 0)
? getCollection(conn, "_ent", () => Promise.resolve({}), subscribeUpdates)
: getCollection(conn, "_ent", legacyFetchEntities, legacySubscribeUpdates);

export const subscribeEntities = (
conn: Connection,
Expand Down
13 changes: 8 additions & 5 deletions lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ export type MessageBase = {
[key: string]: any;
};

export type Context = {
id: string;
user_id: string | null;
parent_id: string | null;
};

export type HassEventBase = {
origin: string;
time_fired: string;
context: {
id: string;
user_id: string;
};
context: Context;
};

export type HassEvent = HassEventBase & {
Expand Down Expand Up @@ -68,7 +71,7 @@ export type HassEntityBase = {
last_changed: string;
last_updated: string;
attributes: HassEntityAttributeBase;
context: { id: string; user_id: string | null };
context: Context;
};

export type HassEntityAttributeBase = {
Expand Down
21 changes: 21 additions & 0 deletions lib/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,24 @@ export const debounce = <T extends (...args: any[]) => unknown>(
}
};
};

export const atLeastHaVersion = (
version: string,
major: number,
minor: number,
patch?: number
): boolean => {
const [haMajor, haMinor, haPatch] = version.split(".", 3);

return (
Number(haMajor) > major ||
(Number(haMajor) === major &&
(patch === undefined
? Number(haMinor) >= minor
: Number(haMinor) > minor)) ||
(patch !== undefined &&
Number(haMajor) === major &&
Number(haMinor) === minor &&
Number(haPatch) >= patch)
);
};
3 changes: 2 additions & 1 deletion test/entities.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ const MOCK_SWITCH = {

const MOCK_ENTITIES = [MOCK_LIGHT, MOCK_SWITCH];

describe("subscribeEntities", () => {
describe("subscribeEntities legacy", () => {
let conn: MockConnection;
let awaitableEvent: AwaitableEvent;

beforeEach(() => {
conn = new MockConnection();
conn.haVersion = "2022.3.0";
conn.mockResponse("get_states", MOCK_ENTITIES);
awaitableEvent = new AwaitableEvent();
});
Expand Down

0 comments on commit 95f166b

Please sign in to comment.