下的多个内文本同时读出
赞
踩
当我们使用python构建AI模型算法的过程中,经常会遇到如下的问题:
本项目通过一个实际的例子,展示如何基于FastAPI实现AI模型服务与nacos的完美融合,从而实现AI模型服务与其他微服务的互通、负载均衡、以及故障转移。本项目实现以下四个目标:
服务接口的输入参数:{"points":[[x1,y1],[x2,y2],[x3,y3],......,[xn,yn]]}
服务接口的输出:{"results": [center1,center1,center2,......,centerx]}
输出每个point值对应的center编号
FastAPI是一个现代的,快速(高性能)python web框架。基于标准的python类型提示,使用python3.6+构建API的Web框架。
FastAPI的主要特点如下:
快速:非常高的性能,与NodeJS和Go相当(这个要感谢Starlette和Pydantic),是最快的Python框架之一。
快速编码:将开发速度提高约200%到300%。
更少的bug:减少大约40%的开发人员人为引起的错误。
直观:强大的编辑器支持,调试时间更短。
简单:易于使用和学习。减少阅读文档的时间。
代码简洁:尽量减少代码重复。每个参数可以声明多个功能,减少程序的bug。
健壮:生产代码会自动生成交互式文档。
基于标准:基于并完全兼容API的开放标准:OpenAPI和JSON模式。
Nacos 是一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。
Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。 Nacos 是构建以“服务”为中心的现代应用架构 (例如微服务范式、云原生范式) 的服务基础设施
Nacos 支持如下核心特性:
- from sklearn.cluster import KMeans
-
- class SklKMeans:
- def __init__(self, clusters):
- self.model = KMeans(n_clusters=clusters, random_state=9)
-
- def fit(self, inputs):
- y_pred = self.model.fit_predict(inputs)
- return y_pred
基于sklearn实现,clusters作为入参,聚类的中心点个数。
实际模型初始化是,这个参数是从nacos配置中心同步过来的。
资源地址为:'/api/skl-kmeans'
访问参数为:{points:[ [1,2],[2,3],[3,5],[4,6],[8,8],[10,11],[12,12],[12,15],[10,12]]}
返回结果为:{ "results": [1, 1, 1, 1, 2, 0, 0, 0, 0]}
默认聚类中心点的个数:3
- from fastapi import APIRouter
- from pydantic import BaseModel
- from skl_kmeans import SklKMeans
- import settings
- import numpy as np
-
- router = APIRouter()
-
- class ClusterRequest(BaseModel):
- points: list
-
- @router.post('/api/skl-kmeans')
- async def api_skl_kmeans(req: ClusterRequest):
- try:
- model = SklKMeans(settings.clusters)
- response = model.fit(np.asarray(req.points))
- response = {"results": response.tolist()}
- except Exception as ex:
- response = None
- return response
服务注册到nacos,服务名称为“skl-kmeans”,服务端口为9999,心跳间隔为30s
- import nacos
- from net_utils import net_helper
-
- class NacosHelper:
- service_name = None
- service_port = None
- service_group = None
-
- def __init__(self, server_endpoint, namespace_id, username=None, password=None):
- self.client = nacos.NacosClient(server_endpoint,
- namespace=namespace_id,
- username=username,
- password=password)
- self.endpoint = server_endpoint
- self.service_ip = net_helper.get_host_ip()
-
- def register(self):
- self.client.add_naming_instance(self.service_name,
- self.service_ip,
- self.service_port,
- group_name=self.service_group)
-
- def unregister(self):
- self.client.remove_naming_instance(self.service_name,
- self.service_ip,
- self.service_port)
-
- def set_service(self, service_name, service_port, service_group):
- self.service_name = service_name
- self.service_port = service_port
- self.service_group = service_group
-
- async def beat_callback(self):
- self.client.send_heartbeat(self.service_name,
- self.service_ip,
- self.service_port)
-
- def load_conf(self, data_id, group):
- return self.client.get_config(data_id=data_id, group=group, no_snapshot=True)
-
- def add_conf_watcher(self, data_id, group, callback):
- self.client.add_config_watcher(data_id=data_id, group=group, cb=callback)
nacos必备的参数有endpoint、namespace_id,group_name, username, password, data_id
- nacos_endpoint = '192.168.1.238:8848'
- nacos_namespace_id = '1bc91fa5-37e3-4269-8e41-3f4e4efb7633'
- nacos_group_name = 'DEFAULT_GROUP'
- nacos_username = 'nacos'
- nacos_password = 'nacos'
- nacos_data_id = 'skl-kmeans'
-
- service_name = 'skl-kmeans'
- service_port = 9999
- beat_interval = 30
-
- application.include_router(api_skl_kmeans.router, tags=['skl-kmeans'])
-
- nacos = NacosHelper(nacos_endpoint, nacos_namespace_id, nacos_username, nacos_password)
- nacos.set_service(service_name, service_port, nacos_group_name)
- nacos.register()
-
- @application.on_event('startup')
- def init_scheduler():
- scheduler = AsyncIOScheduler()
- scheduler.add_job(nacos.beat_callback, 'interval', seconds=beat_interval)
- scheduler.start()
配置中心中的配置文件data_id为“skl-kmeans”,项目启动时,强制同步一次配置;更改配置文件时,程序能够自动监控配置文件变化,并自动更新到内存中。
- service_name = 'skl-kmeans'
- service_port = 9999
- beat_interval = 30
-
- nacos_endpoint = '192.168.1.238:8848'
- nacos_namespace_id = '1bc91fa5-37e3-4269-8e41-3f4e4efb7633'
- nacos_group_name = 'DEFAULT_GROUP'
- nacos_username = 'nacos'
- nacos_password = 'nacos'
- nacos_data_id = 'skl-kmeans'
-
- def load_config(content):
- yaml_config = yaml.full_load(content)
- clusters = yaml_config['clusters']
- settings.clusters = clusters
-
- def nacos_config_callback(args):
- content = args['raw_content']
- load_config(content)
-
- # 注册配置变更监控回调
- nacos.add_conf_watcher(nacos_data_id, nacos_group_name, nacos_config_callback)
-
- # 启动时,强制同步一次配置
- data_stream = nacos.load_conf(nacos_data_id,nacos_group_name)
- load_config(data_stream)
配置文件的data_id为skl-kmeans
内容为:
查看skl-kmeans算法服务在nacos的注册情况
已经注册成功,下面通过postman调用(也可以通过java微服务调用),构造请求参数points,实现points的聚类
{"points":[[1,2],[2,3],[3,5],[4,6],[8,8],[10,11],[12,12],[12,15],[10,12]]}
- {
- "results": [
- 1,
- 1,
- 1,
- 1,
- 2,
- 0,
- 0,
- 0,
- 0
- ]
- }
动态调整nacos中的clusters参数,将3个中心点改为4个中心点,调整完成,kmeans算法已经自动同步了配置变化。通过postman再次调用接口,返回结果已经聚类到4个点
从而说明kmeans算法能够实时监控nacos中配置的变化,实现动态聚类
代码地址:完整code
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。