基于oam和kfserving实现通用化云原生模型应用部署

背景

如何为算法团队提供高效的工程化上云支持是云原生时代一个很重要的也很有意义的课题,现在开源社区比较完善的应该是 Kubeflow —— 一系列 ML 实验部署环境工具的集合,不过整体来看比较笨重,不适合小团队生产环境快速落地,这里基于 kubevela 和 kfserving 实现一个算法标准化模型的例子,供参考。

项目介绍

项目地址:https://github.com/shikanon/vela-example/tree/main/example/sklearnserver

通过 kubevela 提供了三种对象 mpserver, hpa, httproute。

  • mpserver 主要负责生成 deployment 和 service 资源,是程序运行的主体
  • httroute 主要负责生成对外暴露的端口,访问 url
  • hpa 主要保证服务的可扩展性

部署前准备工作

由于使用到vela,所以需要先下载vela客户端

创建一个 sklearn 的服务

案例放在 exmaple/sklearnserver 下面。

  1. 本地镜像编译并运行:
# 编译
docker build -t swr.cn-north-4.myhuaweicloud.com/hw-zt-k8s-images/sklearnserver:demo-iris -f sklearn.Dockerfile .
  1. 上传到华为云镜像仓库
docker login swr.cn-north-4.myhuaweicloud.com
docker push swr.cn-north-4.myhuaweicloud.com/hw-zt-k8s-images/sklearnserver:demo-iris
  1. 创建一个demo-iris-01.yaml的应用文件
name: demo-iris-01
services:
demo-iris:
type: mpserver
image: swr.cn-north-4.myhuaweicloud.com/hw-zt-k8s-images/sklearnserver:demo-iris
ports: [8080]
cpu: "200m"
memory: "250Mi"
httproute:
gateways: ["external-gateway"]
hosts: ["demo-iris.rcmd.testing.mpengine"]
servernamespace: rcmd
serverport: 8080
hpa:
min: 1
max: 1
cpuPercent: 60

因为这里使用的是rcmd命名空间,在创建的时候需要切换,可以通过vela dashboard 通过可视化界面创建一个 rcmd 命名空间的环境:

vela dashboard

成功后可以通过vela env查看:

$ vela env ls
NAME CURRENT NAMESPACE EMAIL DOMAIN
default default
rcmd * rcmd

  1. 在云原生环境运行应用
$ vela up -f demo-iris-01.yaml
Parsing vela appfile ...
Load Template ...

Rendering configs for service (demo-iris)...
Writing deploy config to (.vela/deploy.yaml)

Applying application ...
Checking if app has been deployed...
App has not been deployed, creating a new deployment...
✅ App has been deployed 🚀🚀🚀
Port forward: vela port-forward demo-iris-01
SSH: vela exec demo-iris-01
Logging: vela logs demo-iris-01
App status: vela status demo-iris-01
Service status: vela status demo-iris-01 --svc demo-iris

测试

部署好后可以测试:

$ curl -i -d '{"instances":[[5.1, 3.5, 1.4, 0.2]]}' -H "Content-Type: application/json" -X POST demo-iris.rcmd.testing.mpengine:8000/v1/models/model:predict
{"predictions": [0]}

实现说明

kfserver 开发算法 server

kfserver 提供了多种常用框架的 server,比如 sklearn, lgb, xgb, pytorch 等多种服务的 server 框架, kfserver 基于 tornado 框架进行开发,其提供了 模型加载,接口健康检测,预测及 参考解释等多个抽象接口,详细见kfserving/kfserving/kfserver.py:

...
def create_application(self):
return tornado.web.Application([
# Server Liveness API returns 200 if server is alive.
(r"/", LivenessHandler),
(r"/v2/health/live", LivenessHandler),
(r"/v1/models",
ListHandler, dict(models=self.registered_models)),
(r"/v2/models",
ListHandler, dict(models=self.registered_models)),
# Model Health API returns 200 if model is ready to serve.
(r"/v1/models/([a-zA-Z0-9_-]+)",
HealthHandler, dict(models=self.registered_models)),
(r"/v2/models/([a-zA-Z0-9_-]+)/status",
HealthHandler, dict(models=self.registered_models)),
(r"/v1/models/([a-zA-Z0-9_-]+):predict",
PredictHandler, dict(models=self.registered_models)),
(r"/v2/models/([a-zA-Z0-9_-]+)/infer",
PredictHandler, dict(models=self.registered_models)),
(r"/v1/models/([a-zA-Z0-9_-]+):explain",
ExplainHandler, dict(models=self.registered_models)),
(r"/v2/models/([a-zA-Z0-9_-]+)/explain",
ExplainHandler, dict(models=self.registered_models)),
(r"/v2/repository/models/([a-zA-Z0-9_-]+)/load",
LoadHandler, dict(models=self.registered_models)),
(r"/v2/repository/models/([a-zA-Z0-9_-]+)/unload",
UnloadHandler, dict(models=self.registered_models)),
])
...

这里我们使用的 sklearn server 的案例主要实现了 predict 接口:

import kfserving
import joblib
import numpy as np
import os
from typing import Dict

MODEL_BASENAME = "model"
MODEL_EXTENSIONS = [".joblib", ".pkl", ".pickle"]


class SKLearnModel(kfserving.KFModel): # pylint:disable=c-extension-no-member
def __init__(self, name: str, model_dir: str):
super().__init__(name)
self.name = name
self.model_dir = model_dir
self.ready = False

def load(self) -> bool:
model_path = kfserving.Storage.download(self.model_dir)
paths = [os.path.join(model_path, MODEL_BASENAME + model_extension)
for model_extension in MODEL_EXTENSIONS]
for path in paths:
if os.path.exists(path):
self._model = joblib.load(path)
self.ready = True
break
return self.ready

def predict(self, request: Dict) -> Dict:
instances = request["instances"]
try:
inputs = np.array(instances)
except Exception as e:
raise Exception(
"Failed to initialize NumPy array from inputs: %s, %s" % (e, instances))
try:
result = self._model.predict(inputs).tolist()
return {"predictions": result}
except Exception as e:
raise Exception("Failed to predict %s" % e)
shikanon wechat
欢迎您扫一扫,订阅我滴↑↑↑的微信公众号!