Skip to content

Commit

Permalink
add cache-dir, detect crash, clean-up (microsoft#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffra authored Mar 29, 2022
1 parent 61696ff commit e54e7e9
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 173 deletions.
13 changes: 13 additions & 0 deletions mii/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import enum


#TODO naming..
class DeploymentType(enum.Enum):
LOCAL = 1
#expose GPUs
LOCAL_AML = 2
AML_ON_AKS = 3


MII_CACHE_PATH = "MII_CACHE_PATH"
MII_CACHE_PATH_DEFAULT = "/tmp/mii_cache"
48 changes: 17 additions & 31 deletions mii/deployment.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
'''
Copyright 2022 The Microsoft DeepSpeed Team
'''
import urllib.request
import os
import enum
import mii
from mii.utils import logger
import json

import mii
from mii.constants import DeploymentType
from mii.utils import logger, mii_cache_path

try:
from azureml.core import Environment
from azureml.core.model import InferenceConfig
Expand All @@ -16,21 +16,12 @@
except ImportError:
azureml = None


#TODO naming..
class DeploymentType(enum.Enum):
LOCAL = 1
#expose GPUs
LOCAL_AML = 2
AML_ON_AKS = 3


ENV_NAME = "MII-Image-CUDA-11.3-grpc"


#TODO do this properly
def check_if_supported(task_name, model_name):
assert task_name in ['text-generation', 'sequence-classification', 'question-answer'], "Not a supported task type"
assert task_name in ['text-generation', 'sequence-classification', 'question-answering'], "Not a supported task type"
supported = False
if task_name in ['text-generation']:
supported_models = ['gpt2']
Expand All @@ -45,32 +36,27 @@ def check_if_supported(task_name, model_name):
assert False, "Does not support model {model_name} for task {task_name}"


#TODO do this properly
def create_score_file(task_name, model_name, parallelism_config):
config_dict = {}
config_dict['task_name'] = task_name
config_dict['model_name'] = model_name
config_dict['parallelism_config'] = parallelism_config
config_to_append = json.dumps(config_dict)
import subprocess
subprocess.run(['pwd'])

#open text file in read mode
#TODO how to locate the absolute path of this file
source_file_template = open("../mii/models/generic_model/score.py", "r")

#read whole file to a string
source_in_str = source_file_template.read()
if len(mii.__path__) > 1:
logger.warning(
f"Detected mii path as multiple sources: {mii.__path__}, might cause unknown behavior"
)

#close file
source_file_template.close()
with open(os.path.join(mii.__path__[0], "models/generic_model/score.py"), "r") as fd:
score_src = fd.read()

source_with_config = source_in_str + "\n" + "configs=" + config_to_append + "\n"
# update score file w. global config dict
source_with_config = f"{score_src}\n"
source_with_config += f"configs = {json.dumps(config_dict)}"

#TODO should we write this file
f = open("score.py", "w+")
f.write(source_with_config)
f.close()
with open(os.path.join(mii_cache_path(), "score.py"), "w") as fd:
fd.write(source_with_config)
fd.write("\n")


def deploy(task_name,
Expand Down
4 changes: 4 additions & 0 deletions mii/models/generic_model/score.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Copyright 2022 The Microsoft DeepSpeed Team
'''
import os
import json
import mii

model = None
Expand Down Expand Up @@ -39,3 +40,6 @@ def run(request):
global model
request_dict = json.loads(request)
return model.query(request_dict)


### Auto-generated config will be appended below at run-time
33 changes: 26 additions & 7 deletions mii/server_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import asyncio
import torch
import sys
import os
import subprocess
import time
import grpc
Expand Down Expand Up @@ -50,20 +49,22 @@ def __init__(self,
self.model = None

if self.initialize_service:
self._initialize_service(model_name, model_path)
self.process = self._initialize_service(model_name, model_path)
self._wait_until_server_is_live()

if self.initialize_grpc_client and self.use_grpc_server:
self.stubs = []
self.asyncio_loop = asyncio.get_event_loop()
self._initialize_grpc_client()

self._wait_until_server_is_live()

def _wait_until_server_is_live(self):
sockets_open = False
while not sockets_open:
sockets_open = self._is_socket_open(self.port_number)
time.sleep(5)
process_alive = self._is_server_process_alive()
if not process_alive:
raise RuntimeError("server crashed for some reason, unable to proceed")
time.sleep(4)
logger.info("waiting for server to start...")
logger.info(f"server has started on {self.port_number}")

Expand All @@ -74,11 +75,28 @@ def _is_socket_open(self, port):
sock.close()
return result == 0

def _initialize_service(self, model_name, model_path):
def _is_server_process_alive(self):
if self.process is None:
return True
try:
self.process.wait(1)
except subprocess.TimeoutExpired as err:
# timeout means we're still running and all (probably) okay
is_alive = True
else:
# no exception case
is_alive = False
return is_alive

def _initialize_service(self, model_name, model_path):
process = None
if not self.use_grpc_server:
self.model = mii.load_model(model_name, model_path)
else:
if self._is_socket_open(self.port_number):
raise RuntimeError(
f"Server is already running on port {self.port_number}, please shutdown to use different port."
)
#TODO we need to dump the log from these executions for debugging. Currently these are all lost
ds_launch_str = f"deepspeed --num_gpus {self.num_gpus} --no_local_rank --no_python"
launch_str = f"{sys.executable} -m mii.launch.multi_gpu_server"
Expand All @@ -87,6 +105,7 @@ def _initialize_service(self, model_name, model_path):
print(cmd)
process = subprocess.Popen(cmd)
#TODO: do we need to hold onto this process handle for clean-up purposes?
return process

def _initialize_grpc_client(self):
channels = []
Expand Down Expand Up @@ -134,7 +153,7 @@ def _request_response(self, request_dict):
response = self.model(question=request_dict['query'],
context=request_dict['context'])
else:
assert False, "unknown task"
raise NotSupportedError(f"task is not supported: {self.task}")
return response

def query(self, request_dict):
Expand Down
Empty file removed mii/tasks/__init__.py
Empty file.
Empty file removed mii/tasks/generation/__init__.py
Empty file.
122 changes: 0 additions & 122 deletions mii/tasks/generation/generation.py

This file was deleted.

27 changes: 14 additions & 13 deletions mii/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import logging
import importlib

from mii.constants import MII_CACHE_PATH, MII_CACHE_PATH_DEFAULT


def get_model_path():
aml_model_dir = os.getenv('AZUREML_MODEL_DIR')
Expand All @@ -30,21 +32,20 @@ def set_model_path(model_path):
os.environ['MII_MODEL_DIR'] = model_path


#def import_score_file(task_name, model_name):
def import_score_file():

#TODO: dynamically create score file for model in ~/.cache/mii path
def mii_cache_path():
cache_path = os.environ.get(MII_CACHE_PATH, MII_CACHE_PATH_DEFAULT)
if not os.path.isdir(cache_path):
os.makedirs(cache_path)
return cache_path

#import mii.models.generic_model.score as score
import score

#TODO: this should be done as part of dynamic score file creation in ~/.cache/mii_path
#score.model_name = model_name
#score.task = task_name

# spec=importlib.util.spec_from_file_location('score', f'models/{model_name}/score.py')
# score = importlib.util.module_from_spec(spec)
# spec.loader.exec_module(score)
def import_score_file():
spec = importlib.util.spec_from_file_location(
'score',
os.path.join(mii_cache_path(),
"score.py"))
score = importlib.util.module_from_spec(spec)
spec.loader.exec_module(score)
return score


Expand Down

0 comments on commit e54e7e9

Please sign in to comment.