Skip to content

Commit

Permalink
Auto load models when api server start (OpenCSGs#133)
Browse files Browse the repository at this point in the history
* Auto load models when api server start

* update

* update version

* fix version conflict

* update version

* Update model scaling config

* update version
  • Loading branch information
SeanHH86 authored May 8, 2024
1 parent d7efb05 commit 035c862
Show file tree
Hide file tree
Showing 37 changed files with 164 additions and 155 deletions.
2 changes: 1 addition & 1 deletion build_llmserve_image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set -euxo pipefail
GIT_COMMIT=`git rev-parse HEAD | cut -b 1-12`

DOCKER_REPO="${LLMSERVE_DOCKER_REPO:-registry.cn-beijing.aliyuncs.com/opencsg_public/llmray}"
VERSION="0.0.2"
VERSION="0.1.0"
DOCKER_TAG="$DOCKER_REPO:$VERSION-$GIT_COMMIT"
DOCKER_FILE="${LLMSERVE_DOCKER_FILE:-deploy/ray/Dockerfile}"

Expand Down
2 changes: 1 addition & 1 deletion build_llmserve_image_base.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
set -euxo pipefail

DOCKER_REPO="${LLMSERVE_DOCKER_REPO:-registry.cn-beijing.aliyuncs.com/opencsg_public/llmray}"
VERSION="0.0.2"
VERSION="0.0.3"
DOCKER_TAG="$DOCKER_REPO:base-$VERSION"
DOCKER_FILE="${LLMSERVE_DOCKER_FILE:-deploy/ray/Dockerfile-base}"

Expand Down
2 changes: 1 addition & 1 deletion deploy/kuberay/starray.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
name: opencsg
namespace: ray
spec:
rayVersion: '2.8.0' # should match the Ray version in the image of the containers
rayVersion: '2.20.0' # should match the Ray version in the image of the containers
# Ray head pod template
headGroupSpec:
# The `rayStartParams` are used to configure the `ray start` command.
Expand Down
2 changes: 1 addition & 1 deletion deploy/ray/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# build from Dockerfile-base
FROM registry.cn-beijing.aliyuncs.com/opencsg_public/llmray:base-0.0.2
FROM registry.cn-beijing.aliyuncs.com/opencsg_public/llmray:base-0.0.3

RUN sudo apt-get update && sudo apt-get install -y vim && sudo apt-get clean

Expand Down
2 changes: 2 additions & 0 deletions docs/quick_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ If you start the model using `llm-serve start serving-rest`, you can also run th

```
curl -H "Content-Type: application/json" -X POST -d '{"prompt": "What can I do"}' "http://127.0.0.1:8000/api/v1/default/facebook--opt-125m/run/predict"
curl -H "Content-Type: application/json" -X POST -d '{"prompt": ["What can I do", "How are you"]}' "http://127.0.0.1:8000/default-d2b9814399fd/facebook--opt-125m/run/predict"
```

Run the following command `curl` to call the model predict API will return data in OpenAI style.
Expand Down
138 changes: 71 additions & 67 deletions llmserve/backend/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class ModelIdentifier(BaseModel):
"initial_replicas": 2,
"max_replicas": 8,
},
max_concurrent_queries=2, # Maximum backlog for a single replica
max_ongoing_requests=2, # Maximum backlog for a single replica
health_check_period_s=10,
health_check_timeout_s=30,
)
Expand Down Expand Up @@ -445,7 +445,7 @@ def __repr__(self) -> str:
"initial_replicas": 1,
"max_replicas": 2,
},
max_concurrent_queries=50, # Maximum backlog for a single replica
max_ongoing_requests=50, # Maximum backlog for a single replica
)
@serve.ingress(app)
class RouterDeployment:
Expand All @@ -459,14 +459,14 @@ def __init__(self, models: Dict[str, DeploymentHandle], model_configurations: Di
async def predict(
self,
model: str,
pramas: InvokeParams
params: InvokeParams
) -> Union[Dict[str, Any], List[Dict[str, Any]], List[Any]]:
prompt = pramas.prompt
prompt = params.prompt
if not isinstance(prompt, list):
prompt = [prompt]
prompt = [p if isinstance(p, Prompt) else Prompt(prompt=p, use_prompt_format=False) for p in prompt]

generate_kwargs = pramas.dict(exclude={"prompt"}, exclude_none=True)
generate_kwargs = params.dict(exclude={"prompt"}, exclude_none=True)
logger.info(f"get generation params: {generate_kwargs}")
logger.info(f"url: {model}, keys: {self._models.keys()}")
modelKeys = list(self._models.keys())
Expand Down Expand Up @@ -617,7 +617,7 @@ def convertToJson(self, object: str, keyName: str, model: str, usage: Union[dict
"initial_replicas": 1,
"max_replicas": 2,
},
max_concurrent_queries=50, # Maximum backlog for a single replica
max_ongoing_requests=50, # Maximum backlog for a single replica
)
class ExperimentalDeployment(GradioIngress):
def __init__(
Expand Down Expand Up @@ -679,15 +679,14 @@ def _chose_ui(self) -> Callable:
else:
return lambda: gr.Interface(self.query, **gr_params, title=self._model_configuration.model_config.model_id)


@serve.deployment(
# TODO make this configurable in llmserve run
autoscaling_config={
"min_replicas": 1,
"initial_replicas": 1,
"max_replicas": 2,
},
max_concurrent_queries=50, # Maximum backlog for a single replica
max_ongoing_requests=50, # Maximum backlog for a single replica
)
@serve.ingress(app)
class ApiServer:
Expand Down Expand Up @@ -761,12 +760,12 @@ def load_model(self, models: Union[List[str], List[ModelConfig], List[LLMApp]],
user_config = model.dict()
deployment_config = model.deployment_config.dict()
deployment_config = deployment_config.copy()
max_concurrent_queries = deployment_config.pop(
"max_concurrent_queries", None
max_ongoing_requests = deployment_config.pop(
"max_ongoing_requests", None
) or (user_config["model_config"]["generation"].get("max_batch_size", 1) if user_config["model_config"]["generation"] else 1)
deployment = LLMDeployment.options( # pylint:disable=no-member
name=name,
max_concurrent_queries=max_concurrent_queries,
max_ongoing_requests=max_ongoing_requests,
user_config=user_config,
**deployment_config,
).bind()
Expand Down Expand Up @@ -800,13 +799,13 @@ def load_model_args(self, args: ModelConfig) -> Dict[str, Any]:
else:
deployment_config = model.deployment_config
deployment_config = deployment_config.copy()
max_concurrent_queries = deployment_config.pop(
"max_concurrent_queries", None
max_ongoing_requests = deployment_config.pop(
"max_ongoing_requests", None
) or (user_config["model_config"]["generation"].get("max_batch_size", 1) if user_config["model_config"]["generation"] else 1)

deployment = LLMDeployment.options( # pylint:disable=no-member
name=_reverse_prefix(args.model_id),
max_concurrent_queries=max_concurrent_queries,
max_ongoing_requests=max_ongoing_requests,
user_config=user_config,
**deployment_config,
).bind()
Expand Down Expand Up @@ -843,23 +842,6 @@ async def start_experimental(self, models: Union[ModelConfig, str] = Body(..., e
route_prefix="/" + serve_name, _blocking=False)
return {"start_experimental": serve_name, "models": self.model_configs}

# @app.get("/serving_status")
# async def serving_status(self, models: List[str] = Body(..., description="models name", embed=True)) -> Dict[str, Any]:
# serve_details = ServeInstanceDetails(
# **ServeSubmissionClient(CONFIG.RAY_AGENT_ADDRESS).get_serve_details())
# serving_status = {}
# for model in models:
# model_id = _reverse_prefix(model)
# app_status = ""
# deployment_status = {}
# if model_id in serve_details.applications.keys():
# apps = serve_details.applications[model_id].dict()
# app_status = apps.get("status").value
# for k, v in apps.get("deployments").items():
# deployment_status[k] = v.get("status").value
# serving_status[model_id] = {"application_status": app_status, "deployments_status": deployment_status}
# return serving_status

@app.post("/start_serving")
async def start_serving(self, user_name: Annotated[str, Header()],
models: Union[List[ModelConfig],
Expand All @@ -875,25 +857,23 @@ async def start_serving(self, user_name: Annotated[str, Header()],
model_config = {}
deployment[key] = self.compare_deployments[key]
model_config[key] = value
app = RouterDeployment.bind( # pylint:disable=no-member
deployment, model_config)
app = RouterDeployment.bind(deployment, model_config)
ray._private.usage.usage_lib.record_library_usage("llmserve")
# deployment_config = model_config[key].deployment_config.dict()
user_config = value.dict()
model_id = _replace_prefix(
user_config["model_config"].get("model_id"))
model_id = _replace_prefix(user_config["model_config"].get("model_id"))
# TBD: To add revision to model_config, that's needed to be implemented for CLI (in YAML) and API together.
# ... For now, that's "main" before implement this.
model_revision = user_config["model_config"].get(
"model_revision", "main")
model_revision = user_config["model_config"].get("model_revision", "main")
model_identifier = model_id.strip() + "-" + model_revision.strip()
logger.info(f"Starting serving for {model_identifier} ...")
model_hash = hashlib.md5(
model_identifier.encode()).hexdigest()[:12]
model_hash = hashlib.md5(model_identifier.encode()).hexdigest()[:12]
serving_name = user_name.strip() + "-" + model_hash
serve.run(app, host=CONFIG.SERVE_RUN_HOST, name=serving_name,
route_prefix="/" + serving_name, _blocking=False)
# serve.run(app, host=CONFIG.SERVE_RUN_HOST, name=serving_name, route_prefix="/" + serving_name, _blocking=False)
logger.info(f"Starting serving for {model_identifier} ...")
serve.run(target=app, name=serving_name, route_prefix="/" + serving_name, blocking=False)
logger.info(f"Done serving model {model_id} on /{serving_name}")
started_serving[serving_name] = value
logger.info(f"start all serving done")
return started_serving

@app.get("/list_serving")
Expand Down Expand Up @@ -1007,20 +987,6 @@ async def list_oob_models(self) -> Dict[str, Any]:
"image-to-text": image2text,
}

# @app.get("/metadata")
# async def metadata(self, models: List[str] = Body(..., description="models name", embed=True)) -> Dict[str, Any]:
# metadata = {}
# for model in models:
# #model = _replace_prefix(model)
# metadata = self.model_configs[model].dict(
# exclude={
# "model_config": {"initialization": {"s3_mirror_config", "runtime_env"}}
# }
# )
# logger.info(metadata)
# metadata[model] = metadata
# return metadata

@app.get("/models")
async def get_model(self, models: List[str] = Body(..., description="models name", embed=True)) -> Dict[str, Any]:
model_config = {}
Expand All @@ -1045,13 +1011,13 @@ async def update_model(self, model: ModelConfig = Body(..., embed=True)) -> Dict
user_config = md.dict()
deployment_config = md.deployment_config.dict() # pylint:disable=no-member
deployment_config = deployment_config.copy()
max_concurrent_queries = deployment_config.pop(
"max_concurrent_queries", None
max_ongoing_requests = deployment_config.pop(
"max_ongoing_requests", None
) or (user_config["model_config"]["generation"].get("max_batch_size", 1) if user_config["model_config"]["generation"] else 1)

deployment = LLMDeployment.options( # pylint:disable=no-member
name=serve_conf["name"],
max_concurrent_queries=max_concurrent_queries,
max_ongoing_requests=max_ongoing_requests,
user_config=user_config,
**deployment_config,
).bind()
Expand All @@ -1074,37 +1040,44 @@ def load_model_for_comparation(self, models: List[Union[ModelConfig, str]]):
self.compare_model_configs = {}

for model in models:
logger.info(model)
logger.info(f"load model: {model}")
parsed_models = []
template = []
if isinstance(model, str):
logger.info(f"parse model string: {model}")
parsed_models = parse_args(model)
else:
if model.is_oob:
logger.info(f"parse mode_id: {model.model_id}")
logger.info(f"parse oob model_id: {model.model_id}")
parsed_models = parse_args(model.model_id)
else:
logger.info(f"parse non-oob model_id: {model.model_id}")
template = CONFIG.COMPARATION_LLMTEMPLATE
parsed_model = copy.deepcopy(template)
if model.scaling_config:
for key, value in model.scaling_config.__dict__.items():
setattr(parsed_model.scaling_config, key, value)
parsed_model.model_config.model_id = model.model_id
parsed_models.append(parsed_model)
# set scaling_config
if model.scaling_config:
for key, value in model.scaling_config.__dict__.items():
setattr(parsed_models[0].scaling_config, key, value)

for md in parsed_models:
user_config = md.dict()
if model.is_oob:
deployment_config = md.deployment_config.dict()
else:
deployment_config = md.deployment_config
deployment_config = deployment_config.copy()
max_concurrent_queries = deployment_config.pop(
"max_concurrent_queries", None
max_ongoing_requests = deployment_config.pop(
"max_ongoing_requests", None
) or (user_config["model_config"]["generation"].get("max_batch_size", 1) if user_config["model_config"]["generation"] else 1)

name = _reverse_prefix(md.model_config.model_id)
logger.info(f"LLMDeployment.options for {name} with deployment_config={deployment_config}")
logger.info(f"LLMDeployment.options for {name} with user_config={user_config}")
deployment = LLMDeployment.options( # pylint:disable=no-member
name=name,
max_concurrent_queries=max_concurrent_queries,
max_ongoing_requests=max_ongoing_requests,
user_config=user_config,
**deployment_config,
).bind()
Expand Down Expand Up @@ -1218,3 +1191,34 @@ async def delete_app(self, names: List[str] = Body(..., description="model id or
serve.delete("cmp_models_" + name, _blocking=False)
serve.delete("cmp_" + name, _blocking=False)
return {"comparation": "Delete" + name + "Successful"}

# @app.get("/serving_status")
# async def serving_status(self, models: List[str] = Body(..., description="models name", embed=True)) -> Dict[str, Any]:
# serve_details = ServeInstanceDetails(
# **ServeSubmissionClient(CONFIG.RAY_AGENT_ADDRESS).get_serve_details())
# serving_status = {}
# for model in models:
# model_id = _reverse_prefix(model)
# app_status = ""
# deployment_status = {}
# if model_id in serve_details.applications.keys():
# apps = serve_details.applications[model_id].dict()
# app_status = apps.get("status").value
# for k, v in apps.get("deployments").items():
# deployment_status[k] = v.get("status").value
# serving_status[model_id] = {"application_status": app_status, "deployments_status": deployment_status}
# return serving_status

# @app.get("/metadata")
# async def metadata(self, models: List[str] = Body(..., description="models name", embed=True)) -> Dict[str, Any]:
# metadata = {}
# for model in models:
# #model = _replace_prefix(model)
# metadata = self.model_configs[model].dict(
# exclude={
# "model_config": {"initialization": {"s3_mirror_config", "runtime_env"}}
# }
# )
# logger.info(metadata)
# metadata[model] = metadata
# return metadata
Loading

0 comments on commit 035c862

Please sign in to comment.