-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbuffer_test.py
87 lines (80 loc) · 2.43 KB
/
buffer_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import unittest
from autotx.utils.buffer import Buffer
class TestBuffer(unittest.TestCase):
def test_create(self):
size = 10
buffer = Buffer(size)
self.assertEqual(size, buffer.Cap())
def test_bufPut(self):
size = 10
buffer = Buffer(size)
data = []
# 情况一:新缓存
for i in range(0, size):
data.append(i)
count = 0
for item in data:
ok, err = buffer.Put(item)
self.assertIsNone(err)
self.assertTrue(ok)
count += 1
self.assertEqual(buffer.Len(), count)
# 情况二:缓存已满
ok, err = buffer.Put(data)
self.assertIsNone(err)
self.assertFalse(ok)
buffer.Close()
# 情况三:缓存已关闭
ok, err = buffer.Put(data)
self.assertIsNotNone(err)
def test_bufGet(self):
size = 10
# 情况一:新缓存
buffer = Buffer(size)
for i in range(0, size):
buffer.Put(i)
count = size
while count >= 1:
data, err = buffer.Get()
self.assertEqual(data, size - count)
self.assertIsNone(err)
count -= 1
# 情况二:缓存已空
data, err = buffer.Get()
self.assertIsNone(data)
self.assertIsNone(err)
buffer.Close()
# 情况三:缓存已关闭
data, err = buffer.Get()
self.assertIsNone(data)
self.assertIsNotNone(err)
def test_bufPutAndGet(self):
size = 10
buffer = Buffer(size)
data = []
# 情况一:新缓存
for i in range(0, size):
data.append(i)
count = 0
# 情况二:把输入放入缓存
for item in data:
ok, err = buffer.Put(item)
self.assertIsNone(err)
self.assertTrue(ok)
count += 1
self.assertEqual(buffer.Len(), count)
# 情况三:取出缓存数据
while count >= 4:
val, err = buffer.Get()
self.assertEqual(val, data[size - count])
self.assertIsNone(err)
count -= 1
# 情况四:缓存已关闭
buffer.Close()
ok, err = buffer.Put(data)
self.assertIsNotNone(err)
data, err = buffer.Get()
self.assertIsNone(data)
self.assertIsNotNone(err)
if __name__ == '__main__':
unittest.main()