forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_stream.py
51 lines (44 loc) · 1.99 KB
/
test_stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import sys
import ray
from ray.streaming import StreamingContext
def test_data_stream():
ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
ctx = StreamingContext.Builder().build()
stream = ctx.from_values(1, 2, 3)
java_stream = stream.as_java_stream()
python_stream = java_stream.as_python_stream()
assert stream.get_id() == java_stream.get_id()
assert stream.get_id() == python_stream.get_id()
python_stream.set_parallelism(10)
assert stream.get_parallelism() == java_stream.get_parallelism()
assert stream.get_parallelism() == python_stream.get_parallelism()
ray.shutdown()
def test_key_data_stream():
ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
ctx = StreamingContext.Builder().build()
key_stream = ctx.from_values(
"a", "b", "c").map(lambda x: (x, 1)).key_by(lambda x: x[0])
java_stream = key_stream.as_java_stream()
python_stream = java_stream.as_python_stream()
assert key_stream.get_id() == java_stream.get_id()
assert key_stream.get_id() == python_stream.get_id()
python_stream.set_parallelism(10)
assert key_stream.get_parallelism() == java_stream.get_parallelism()
assert key_stream.get_parallelism() == python_stream.get_parallelism()
ray.shutdown()
def test_stream_config():
ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
ctx = StreamingContext.Builder().build()
stream = ctx.from_values(1, 2, 3)
stream.with_config("k1", "v1")
print("config", stream.get_config())
assert stream.get_config() == {"k1": "v1"}
stream.with_config(conf={"k2": "v2", "k3": "v3"})
print("config", stream.get_config())
assert stream.get_config() == {"k1": "v1", "k2": "v2", "k3": "v3"}
java_stream = stream.as_java_stream()
java_stream.with_config(conf={"k4": "v4"})
config = java_stream.get_config()
print("config", config)
assert config == {"k1": "v1", "k2": "v2", "k3": "v3", "k4": "v4"}
ray.shutdown()