forked from zigpy/bellows
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_multicast.py
188 lines (143 loc) · 5.87 KB
/
test_multicast.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import pytest
from zigpy.endpoint import Endpoint
import bellows.ezsp
import bellows.multicast
import bellows.types as t
from .async_mock import AsyncMock, MagicMock, sentinel
CUSTOM_SIZE = 12
@pytest.fixture
def ezsp_f():
e = MagicMock()
e.getConfigurationValue = AsyncMock(return_value=[0, CUSTOM_SIZE])
return e
@pytest.fixture
def multicast(ezsp_f):
m = bellows.multicast.Multicast(ezsp_f)
return m
async def test_initialize(multicast):
group_id = 0x0200
mct = active_multicasts = 4
async def mock_get(*args):
nonlocal group_id, mct
entry = t.EmberMulticastTableEntry()
if mct > 0:
entry.endpoint = t.uint8_t(group_id % 3 + 1)
else:
entry.endpoint = t.uint8_t(0)
entry.multicastId = t.EmberMulticastId(group_id)
entry.networkIndex = t.uint8_t(0)
group_id += 1
mct -= 1
return [t.EmberStatus.SUCCESS, entry]
multicast._ezsp.getMulticastTableEntry.side_effect = mock_get
await multicast._initialize()
assert multicast._ezsp.getMulticastTableEntry.call_count == CUSTOM_SIZE
assert len(multicast._available) == CUSTOM_SIZE - active_multicasts
async def test_initialize_fail_configured_size(multicast):
multicast._ezsp.getConfigurationValue.return_value = t.EmberStatus.ERR_FATAL, 16
await multicast._initialize()
ezsp = multicast._ezsp
assert ezsp.getMulticastTableEntry.call_count == 0
assert len(multicast._available) == 0
async def test_initialize_fail(multicast):
group_id = 0x0200
async def mock_get(*args):
nonlocal group_id
entry = t.EmberMulticastTableEntry()
entry.endpoint = t.uint8_t(group_id % 3 + 1)
entry.multicastId = t.EmberMulticastId(group_id)
entry.networkIndex = t.uint8_t(0)
group_id += 1
return [t.EmberStatus.ERR_FATAL, entry]
multicast._ezsp.getMulticastTableEntry.side_effect = mock_get
await multicast._initialize()
ezsp = multicast._ezsp
assert ezsp.getMulticastTableEntry.call_count == CUSTOM_SIZE
assert len(multicast._available) == 0
async def test_startup(multicast):
coordinator = MagicMock()
ep1 = MagicMock(spec_set=Endpoint)
ep1.member_of = [sentinel.grp, sentinel.grp, sentinel.grp]
coordinator.endpoints = {0: sentinel.ZDO, 1: ep1}
multicast._initialize = AsyncMock()
multicast.subscribe = MagicMock()
multicast.subscribe.side_effect = AsyncMock()
await multicast.startup(coordinator)
assert multicast._initialize.await_count == 1
assert multicast.subscribe.call_count == len(ep1.member_of)
assert multicast.subscribe.call_args[0][0] == sentinel.grp
def _subscribe(multicast, group_id, success=True):
async def mock_set(*args):
if success:
return [t.EmberStatus.SUCCESS]
return [t.EmberStatus.ERR_FATAL]
multicast._ezsp.setMulticastTableEntry = MagicMock()
multicast._ezsp.setMulticastTableEntry.side_effect = mock_set
return multicast.subscribe(group_id)
async def test_subscribe(multicast):
grp_id = 0x0200
multicast._available.add(1)
multicast._ezsp = ezsp_f
ret = await _subscribe(multicast, grp_id, success=True)
assert ret == t.EmberStatus.SUCCESS
set_entry = multicast._ezsp.setMulticastTableEntry
assert set_entry.call_count == 1
assert set_entry.call_args[0][1].multicastId == grp_id
assert grp_id in multicast._multicast
set_entry.reset_mock()
ret = await _subscribe(multicast, grp_id, success=True)
assert ret == t.EmberStatus.SUCCESS
set_entry = multicast._ezsp.setMulticastTableEntry
assert set_entry.call_count == 0
assert grp_id in multicast._multicast
async def test_subscribe_fail(multicast):
grp_id = 0x0200
multicast._available.add(1)
ret = await _subscribe(multicast, grp_id, success=False)
assert ret != t.EmberStatus.SUCCESS
set_entry = multicast._ezsp.setMulticastTableEntry
assert set_entry.call_count == 1
assert set_entry.call_args[0][1].multicastId == grp_id
assert grp_id not in multicast._multicast
assert len(multicast._available) == 1
async def test_subscribe_no_avail(multicast):
grp_id = 0x0200
ret = await _subscribe(multicast, grp_id, success=True)
assert ret != t.EmberStatus.SUCCESS
def _unsubscribe(multicast, group_id, success=True):
async def mock_set(*args):
if success:
return [t.EmberStatus.SUCCESS]
return [t.EmberStatus.ERR_FATAL]
multicast._ezsp.setMulticastTableEntry = MagicMock()
multicast._ezsp.setMulticastTableEntry.side_effect = mock_set
return multicast.unsubscribe(group_id)
async def test_unsubscribe(multicast):
grp_id = 0x0200
multicast._available.add(1)
await _subscribe(multicast, grp_id, success=True)
multicast._ezsp.setMulticastTableEntry.reset_mock()
ret = await _unsubscribe(multicast, grp_id, success=True)
assert ret == t.EmberStatus.SUCCESS
set_entry = multicast._ezsp.setMulticastTableEntry
assert set_entry.call_count == 1
assert grp_id not in multicast._multicast
assert len(multicast._available) == 1
multicast._ezsp.setMulticastTableEntry.reset_mock()
ret = await _unsubscribe(multicast, grp_id, success=True)
assert ret != t.EmberStatus.SUCCESS
set_entry = multicast._ezsp.setMulticastTableEntry
assert set_entry.call_count == 0
assert grp_id not in multicast._multicast
assert len(multicast._available) == 1
async def test_unsubscribe_fail(multicast):
grp_id = 0x0200
multicast._available.add(1)
await _subscribe(multicast, grp_id, success=True)
multicast._ezsp.setMulticastTableEntry.reset_mock()
ret = await _unsubscribe(multicast, grp_id, success=False)
assert ret != t.EmberStatus.SUCCESS
set_entry = multicast._ezsp.setMulticastTableEntry
assert set_entry.call_count == 1
assert grp_id in multicast._multicast
assert len(multicast._available) == 0