forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmicrobenchmarks.py
122 lines (105 loc) · 4.5 KB
/
microbenchmarks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import unittest
import ray
import sys
import time
import numpy as np
import ray.test.test_functions as test_functions
if sys.version_info >= (3, 0):
from importlib import reload
class MicroBenchmarkTest(unittest.TestCase):
def testTiming(self):
reload(test_functions)
ray.init(num_workers=3)
# Measure the time required to submit a remote task to the scheduler.
elapsed_times = []
for _ in range(1000):
start_time = time.time()
test_functions.empty_function.remote()
end_time = time.time()
elapsed_times.append(end_time - start_time)
elapsed_times = np.sort(elapsed_times)
average_elapsed_time = sum(elapsed_times) / 1000
print("Time required to submit an empty function call:")
print(" Average: {}".format(average_elapsed_time))
print(" 90th percentile: {}".format(elapsed_times[900]))
print(" 99th percentile: {}".format(elapsed_times[990]))
print(" worst: {}".format(elapsed_times[999]))
# average_elapsed_time should be about 0.00038.
# Measure the time required to submit a remote task to the scheduler
# (where the remote task returns one value).
elapsed_times = []
for _ in range(1000):
start_time = time.time()
test_functions.trivial_function.remote()
end_time = time.time()
elapsed_times.append(end_time - start_time)
elapsed_times = np.sort(elapsed_times)
average_elapsed_time = sum(elapsed_times) / 1000
print("Time required to submit a trivial function call:")
print(" Average: {}".format(average_elapsed_time))
print(" 90th percentile: {}".format(elapsed_times[900]))
print(" 99th percentile: {}".format(elapsed_times[990]))
print(" worst: {}".format(elapsed_times[999]))
# average_elapsed_time should be about 0.001.
# Measure the time required to submit a remote task to the scheduler
# and get the result.
elapsed_times = []
for _ in range(1000):
start_time = time.time()
x = test_functions.trivial_function.remote()
ray.get(x)
end_time = time.time()
elapsed_times.append(end_time - start_time)
elapsed_times = np.sort(elapsed_times)
average_elapsed_time = sum(elapsed_times) / 1000
print("Time required to submit a trivial function call and get the "
"result:")
print(" Average: {}".format(average_elapsed_time))
print(" 90th percentile: {}".format(elapsed_times[900]))
print(" 99th percentile: {}".format(elapsed_times[990]))
print(" worst: {}".format(elapsed_times[999]))
# average_elapsed_time should be about 0.0013.
# Measure the time required to do do a put.
elapsed_times = []
for _ in range(1000):
start_time = time.time()
ray.put(1)
end_time = time.time()
elapsed_times.append(end_time - start_time)
elapsed_times = np.sort(elapsed_times)
average_elapsed_time = sum(elapsed_times) / 1000
print("Time required to put an int:")
print(" Average: {}".format(average_elapsed_time))
print(" 90th percentile: {}".format(elapsed_times[900]))
print(" 99th percentile: {}".format(elapsed_times[990]))
print(" worst: {}".format(elapsed_times[999]))
# average_elapsed_time should be about 0.00087.
ray.worker.cleanup()
def testCache(self):
ray.init(num_workers=1)
A = np.random.rand(1, 1000000)
v = np.random.rand(1000000)
A_id = ray.put(A)
v_id = ray.put(v)
a = time.time()
for i in range(100):
A.dot(v)
b = time.time() - a
c = time.time()
for i in range(100):
ray.get(A_id).dot(ray.get(v_id))
d = time.time() - c
if d > 1.5 * b:
if os.getenv("TRAVIS") is None:
raise Exception("The caching test was too slow. "
"d = {}, b = {}".format(d, b))
else:
print("WARNING: The caching test was too slow. "
"d = {}, b = {}".format(d, b))
ray.worker.cleanup()
if __name__ == "__main__":
unittest.main(verbosity=2)