-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy paththread_test.py
67 lines (51 loc) · 1.78 KB
/
thread_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import jsonata
import pytest
import time
class TestThread:
def test_reuse(self):
expr = jsonata.Jsonata("a")
assert expr.evaluate({"a": 1}) == 1
assert expr.evaluate({"a": 1}) == 1
def test_now(self):
now = jsonata.Jsonata("$now()")
r1 = now.evaluate(None)
time.sleep(1)
r2 = now.evaluate(None)
assert r1 != r2
@pytest.mark.asyncio
async def test_reuse_with_variable(self):
expr = jsonata.Jsonata("($x := a; $wait(a); $x)")
def lambda1(a):
time.sleep(a)
return None
expr.register_lambda("wait", lambda1)
async def threaded_function():
return expr.evaluate({"a": 10})
# make sure outer thread is initialized and in $wait
time.sleep(5)
# this thread uses the same expr and terminates before thread is done
assert expr.evaluate({"a": 3}) == 3
# the outer thread is unaffected by the previous operations
outer = await threaded_function()
assert outer == 10
@pytest.mark.asyncio
async def test_add_env_and_input(self):
expr = jsonata.Jsonata("$eval('$count($keys($))')")
input1 = {"input": 1}
input2 = {"input": 2, "other": 3}
frame1 = expr.create_frame()
frame2 = expr.create_frame()
frame1.bind("variable", 1)
frame2.bind("variable", 2)
count = 10000
async def threaded_function():
total = 0
for i in range(0, count):
total += int(expr.evaluate(input1))
return total
total = 0
for i in range(0, count):
total += int(expr.evaluate(input2))
outer = await threaded_function()
assert outer == count
assert total == 2 * count