forked from deepspeedai/DeepSpeed-MII
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
188 lines (161 loc) · 7.81 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
import base64
import os
import subprocess
import sys
import tempfile
import time
from collections import defaultdict
from typing import List
from deepspeed.accelerator import get_accelerator
from deepspeed.runtime.config_utils import DeepSpeedConfigModel
from mii.config import ModelConfig, MIIConfig, ReplicaConfig
from mii.logging import logger
def config_to_b64_str(config: DeepSpeedConfigModel) -> str:
# convert json str -> bytes
json_bytes = config.json().encode()
# base64 encoded bytes
b64_config_bytes = base64.urlsafe_b64encode(json_bytes)
# bytes -> str
return b64_config_bytes.decode()
class MIIServer:
"""Initialize the model, setup the server for the model"""
def __init__(self, mii_config: MIIConfig) -> None:
self.task = mii_config.model_config.task
self.port_number = mii_config.port_number
if not os.path.isfile(mii_config.hostfile):
logger.info(f"Hostfile {mii_config.hostfile} not found, creating hostfile.")
num_gpu = get_accelerator().device_count()
with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_file:
temp_file.write(f"localhost slots={num_gpu}")
mii_config.hostfile = temp_file.name
mii_config.generate_replica_configs()
processes = self._initialize_service(mii_config)
self._wait_until_server_is_live(processes,
mii_config.model_config.replica_configs)
def _wait_until_server_is_live(self,
processes: List[subprocess.Popen],
deployment: List[ReplicaConfig]):
for process, repl_config in zip(processes, deployment):
sockets_open = False
while not sockets_open:
sockets_open = all(
self._is_socket_open(repl_config.hostname,
port)
for port in repl_config.tensor_parallel_ports)
process_alive = self._is_server_process_alive(process)
if not process_alive:
raise RuntimeError(
"server crashed for some reason, unable to proceed")
time.sleep(4)
logger.info("waiting for server to start...")
# TODO: Fix displaying outputs from logger
# When we launch processes on multiple nodes using " --force_multi",
# all the outputs from logger to stdout is displayed when the process is stopped.
# This is confusing because you see the message "server has started ..." when you stop the process.
logger.info(
f"server has started on ports {repl_config.tensor_parallel_ports}")
def _is_socket_open(self, host: str, port: int) -> bool:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((host, port))
sock.close()
return result == 0
def _is_server_process_alive(self, process: subprocess.Popen) -> bool:
if process is None:
return True
try:
process.wait(1)
except subprocess.TimeoutExpired as err:
# timeout means we're still running and all (probably) okay
is_alive = True
else:
# no exception case
is_alive = False
return is_alive
def _launch_server_process(self,
model_config: ModelConfig,
msg_server_type: str,
ds_launch_str: str = "",
server_args: List[str] = None) -> subprocess.Popen:
launch_str = f"{sys.executable} -m mii.launch.multi_gpu_server"
b64_config_str = config_to_b64_str(model_config)
if server_args is None:
server_args = []
server_args.append(f"--model-config {b64_config_str}")
server_args_str = " ".join(server_args)
cmd = f"{ds_launch_str} {launch_str} {server_args_str}".strip().split(" ")
logger.info(f"msg_server launch: {cmd}")
return subprocess.Popen(cmd)
def _generate_ds_launch_str(self,
replica_config: ReplicaConfig,
hostfile: str,
use_multiple_hosts) -> str:
# use different hostfiles for replica instances
# pass /dev/null when no replica is used
#worker_str = f"-H {hostfile} "
worker_str = ""
# pin deepspeed launch to specific gpu id(s)
included_gpus = f"{replica_config.hostname}:{','.join(map(str, replica_config.gpu_indices))}"
worker_str += f"-i {included_gpus} "
# adjust torch dist port depending on rank, otherwise multi-replica deployments will conflict
# assign different ports to replicas because they could be on the same host
worker_str += f"--master_port {replica_config.torch_dist_port}"
ds_launch_str = f"deepspeed {worker_str} --master_addr localhost --no_ssh_check --no_local_rank --no_python"
if use_multiple_hosts:
ds_launch_str += f" --force_multi"
return ds_launch_str
def _initialize_service(self, mii_config: MIIConfig) -> List[subprocess.Popen]:
processes = []
server_args = [
f"--deployment-name {mii_config.deployment_name}",
f"--load-balancer-port {mii_config.port_number}",
f"--restful-gateway-port {mii_config.restful_api_port}",
f"--restful-gateway-host {mii_config.restful_api_host}",
f"--restful-gateway-procs {mii_config.restful_processes}"
]
host_gpus = defaultdict(list)
for repl_config in mii_config.model_config.replica_configs:
host_gpus[repl_config.hostname].extend(repl_config.gpu_indices)
use_multiple_hosts = len(
set(repl_config.hostname
for repl_config in mii_config.model_config.replica_configs)) > 1
# Start replica instances
for repl_config in mii_config.model_config.replica_configs:
hostfile = tempfile.NamedTemporaryFile(delete=False)
hostfile.write(
f"{repl_config.hostname} slots={max(host_gpus[repl_config.hostname])+1}\n"
.encode())
ds_launch_str = self._generate_ds_launch_str(repl_config,
hostfile.name,
use_multiple_hosts)
processes.append(
self._launch_server_process(
mii_config.model_config,
"MII server",
ds_launch_str=ds_launch_str,
server_args=server_args + [
f"--server-port {repl_config.tensor_parallel_ports[0]} --zmq-port {repl_config.zmq_port}"
],
))
# start load balancer here. We don't use deepspeed launcher for the
# load balancer because it does not need a GPU. The deepspeed
# launcher determines the number of processes to launch based on
# GPUs available on the host or CUDA_VISIBLE_DEVICES, and it is
# expected to assign one GPU to one process.
processes.append(
self._launch_server_process(
mii_config.model_config,
"load balancer",
server_args=server_args + ["--load-balancer"],
))
if mii_config.enable_restful_api:
processes.append(
self._launch_server_process(
mii_config.model_config,
"restful api gateway",
server_args=server_args + ["--restful-gateway"],
))
return processes