forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_submit_cpp_job.py
84 lines (68 loc) · 2.57 KB
/
test_submit_cpp_job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import os
import shutil
import sys
import tempfile
import pytest
from ray._private.test_utils import (
format_web_url,
wait_for_condition,
wait_until_server_available,
)
from ray.job_submission import JobStatus, JobSubmissionClient
from ray.tests.conftest import _ray_start
@pytest.fixture(scope="module")
def headers():
return {"Connection": "keep-alive", "Authorization": "TOK:<MY_TOKEN>"}
@pytest.fixture(scope="module")
def job_sdk_client(headers):
with _ray_start(
include_dashboard=True, num_cpus=1, _node_ip_address="0.0.0.0"
) as ctx:
address = ctx.address_info["webui_url"]
assert wait_until_server_available(address)
yield JobSubmissionClient(format_web_url(address), headers=headers)
def _check_job_succeeded(client: JobSubmissionClient, job_id: str) -> bool:
status = client.get_job_status(job_id)
if status == JobStatus.FAILED:
logs = client.get_job_logs(job_id)
raise RuntimeError(f"Job failed\nlogs:\n{logs}")
return status == JobStatus.SUCCEEDED
def test_submit_simple_cpp_job(job_sdk_client):
client = job_sdk_client
simple_job_so_path = os.environ["SIMPLE_DRIVER_SO_PATH"]
simple_job_so_filename = os.path.basename(simple_job_so_path)
simple_job_main_path = os.environ["SIMPLE_DRIVER_MAIN_PATH"]
simple_job_main_filename = os.path.basename(simple_job_main_path)
with tempfile.TemporaryDirectory() as tmp_dir:
working_dir = os.path.join(tmp_dir, "cpp_worker")
os.makedirs(working_dir)
shutil.copy2(
simple_job_so_path, os.path.join(working_dir, simple_job_so_filename)
)
shutil.copy2(
simple_job_main_path,
os.path.join(working_dir, simple_job_main_filename),
)
shutil.copymode(
simple_job_main_path,
os.path.join(working_dir, simple_job_main_filename),
)
entrypoint = (
f"chmod +x {simple_job_main_filename} && ./{simple_job_main_filename}"
)
runtime_env = dict(
working_dir=working_dir,
env_vars={"TEST_KEY": "TEST_VALUE"},
)
job_id = client.submit_job(
entrypoint=entrypoint,
runtime_env=runtime_env,
)
wait_for_condition(
_check_job_succeeded, client=client, job_id=job_id, timeout=120
)
logs = client.get_job_logs(job_id)
print(f"================== logs ================== \n {logs}")
assert "try to get TEST_KEY: TEST_VALUE" in logs
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))