当前位置:   article > 正文

分布式执行引擎ray入门--(5)Ray Serve

ray serve

  1. import requests
  2. from starlette.requests import Request
  3. from typing import Dict
  4. from ray import serve
  5. # 1: Define a Ray Serve application.
  6. @serve.deployment
  7. class MyModelDeployment:
  8. def __init__(self, msg: str):
  9. # Initialize model state: could be very large neural net weights.
  10. self._msg = msg
  11. def __call__(self, request: Request) -> Dict:
  12. return {"result": self._msg}
  13. app = MyModelDeployment.bind(msg="Hello world!")
  14. # 2: Deploy the application locally.
  15. serve.run(app, route_prefix="/")
  16. # 3: Query the application and print the result.
  17. print(requests.get("http://localhost:8000/").json())
  18. # {'result': 'Hello world!'}

server脚本:

  1. # File name: serve_quickstart.py
  2. from starlette.requests import Request
  3. import ray
  4. from ray import serve
  5. from transformers import pipeline
  6. @serve.deployment(num_replicas=2, ray_actor_options={"num_cpus": 0.2, "num_gpus": 0})
  7. class Translator:
  8. def __init__(self):
  9. # Load model
  10. self.model = pipeline("translation_en_to_fr", model="t5-small")
  11. def translate(self, text: str) -> str:
  12. # Run inference
  13. model_output = self.model(text)
  14. # Post-process output to return only the translation text
  15. translation = model_output[0]["translation_text"]
  16. return translation
  17. async def __call__(self, http_request: Request) -> str:
  18. english_text: str = await http_request.json()
  19. return self.translate(english_text)
  20. translator_app = Translator.bind()

确保server脚本启动 

serve run serve_quickstart:translator_app

默认在服务在http://127.0.0.1:8000/ 运行

client脚本

  1. # File name: model_client.py
  2. import requests
  3. english_text = "Hello world!"
  4. response = requests.post("http://127.0.0.1:8000/", json=english_text)
  5. french_text = response.text
  6. print(french_text)

测试: 

python model_client.py

组合

  1. # File name: serve_quickstart_composed.py
  2. from starlette.requests import Request
  3. import ray
  4. from ray import serve
  5. from ray.serve.handle import DeploymentHandle
  6. from transformers import pipeline
  7. @serve.deployment
  8. class Translator:
  9. def __init__(self):
  10. # Load model
  11. self.model = pipeline("translation_en_to_fr", model="t5-small")
  12. def translate(self, text: str) -> str:
  13. # Run inference
  14. model_output = self.model(text)
  15. # Post-process output to return only the translation text
  16. translation = model_output[0]["translation_text"]
  17. return translation
  18. @serve.deployment
  19. class Summarizer:
  20. def __init__(self, translator: DeploymentHandle):
  21. self.translator = translator
  22. # Load model.
  23. self.model = pipeline("summarization", model="t5-small")
  24. def summarize(self, text: str) -> str:
  25. # Run inference
  26. model_output = self.model(text, min_length=5, max_length=15)
  27. # Post-process output to return only the summary text
  28. summary = model_output[0]["summary_text"]
  29. return summary
  30. async def __call__(self, http_request: Request) -> str:
  31. english_text: str = await http_request.json()
  32. summary = self.summarize(english_text)
  33. translation = await self.translator.translate.remote(summary)
  34. return translation
  35. app = Summarizer.bind(Translator.bind())

serve run serve_quickstart_composed:app
  1. # File name: composed_client.py
  2. import requests
  3. english_text = (
  4. "It was the best of times, it was the worst of times, it was the age "
  5. "of wisdom, it was the age of foolishness, it was the epoch of belief"
  6. )
  7. response = requests.post("http://127.0.0.1:8000/", json=english_text)
  8. french_text = response.text
  9. print(french_text)

测试:

python composed_client.py

结果:

c'était le meilleur des temps, c'était le pire des temps .
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/804275
推荐阅读
相关标签
  

闽ICP备14008679号