forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_word_count.py
65 lines (55 loc) · 1.93 KB
/
test_word_count.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import os
import sys
import ray
from ray.streaming import StreamingContext
from ray.test_utils import wait_for_condition
def test_word_count():
ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
ctx = StreamingContext.Builder() \
.build()
ctx.read_text_file(__file__) \
.set_parallelism(1) \
.flat_map(lambda x: x.split()) \
.map(lambda x: (x, 1)) \
.key_by(lambda x: x[0]) \
.reduce(lambda old_value, new_value:
(old_value[0], old_value[1] + new_value[1])) \
.filter(lambda x: "ray" not in x) \
.sink(lambda x: print("result", x))
ctx.submit("word_count")
import time
time.sleep(3)
ray.shutdown()
def test_simple_word_count():
ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
ctx = StreamingContext.Builder() \
.build()
sink_file = "/tmp/ray_streaming_test_simple_word_count.txt"
if os.path.exists(sink_file):
os.remove(sink_file)
def sink_func(x):
with open(sink_file, "a") as f:
line = "{}:{},".format(x[0], x[1])
print("sink_func", line)
f.write(line)
ctx.from_values("a", "b", "c") \
.set_parallelism(1) \
.flat_map(lambda x: [x, x]) \
.map(lambda x: (x, 1)) \
.key_by(lambda x: x[0]) \
.reduce(lambda old_value, new_value:
(old_value[0], old_value[1] + new_value[1])) \
.sink(sink_func)
ctx.submit("word_count")
def check_succeed():
if os.path.exists(sink_file):
with open(sink_file, "r") as f:
result = f.read()
return "a:2" in result and "b:2" in result and "c:2" in result
return False
wait_for_condition(check_succeed, timeout=60, retry_interval_ms=1000)
print("Execution succeed")
ray.shutdown()
if __name__ == "__main__":
test_word_count()
test_simple_word_count()