当前位置:   article > 正文

easy_sanic更便捷实用sanic,支持orm、restful_sanic orm

sanic orm

我们知道python的异步web框架sanic 的高性能,并发特性甚至接近golang, 底层使用uvloop。https://github.com/huge-success/sanic

 

虽然sanic写简单的web应用特别容易,但python3 异步语法和 async 需要配套相关的数据库异步库使用起来不方便,同时满足使用django等的习惯,由于目前没看见sanic有orm相关处理,以及restful 也不符合使用习惯。因此决定自定义一个:https://github.com/laoyin/easy_sanic

 

a、支持简单orm

b、异步pg数据库支持

c、支持restful

d、支持redis等操作

 

TODO:目前easy_sanic 已经支持上述,接下来将进行swagger支持。

 

如何安装:

pip install easy_sanic

 

如何使用:

github上已经进行了easy sanic使用说明,再次进行描述,希望对sanic感兴趣的人有帮助。

easy sanic 框架,集成了sanic,同时自定义async orm, (目前支持postgres) easyrestful。简单好用,你可以完全不用掌握python3 aysncio相关知识 也能写出高性能服务。

easy sanic 目标是 快速打造微服务。

easy sanic framework.

创建项目入口:app.py

  1. import asyncio
  2. import opentracing
  3. from sanic import Sanic
  4. from aioredis import create_redis_pool
  5. from easy_sanic.utils.ascci_helper import init_text
  6. from easy_sanic.utils.aio_redis import SanicRedis
  7. from easy_sanic.db.db import ConnectionPool
  8. from url import app_url # 此处url 为之定义url文件,需要自己添加,文档有介绍如何引用
  9. app = Sanic(__name__)
  10. app.config.update({
  11. 'DB_CONFIG':{
  12. 'user':'postgres',
  13. 'password':'password',
  14. 'database':'',
  15. 'host':'',
  16. 'port':''
  17. }
  18. })
  19. redis_conf = {
  20. 'REDIS':{
  21. 'address': ("REDIS_HOST", "REDIS_PORT"),
  22. 'db': 1,
  23. }
  24. }
  25. redis_conf['REDIS']['password'] = "REDIS_PASSWORD"
  26. app.config.update(redis_conf)
  27. @app.listener('before_server_start')
  28. async def before_server_start(app, loop):
  29. app_url(app)# 引用url
  30. queue = asyncio.Queue()
  31. app.queue = queue
  32. app.db = await ConnectionPool(loop=loop).init(app.config['DB_CONFIG'])
  33. _c = dict(loop=loop)
  34. config = app.config.get('REDIS')
  35. for key in ['address', 'db', 'password', 'ssl', 'encoding', 'minsize',
  36. 'maxsize', 'create_connection_timeout']:
  37. if key in config:
  38. _c.update({key: config.get(key)})
  39. _redis = await create_redis_pool(**_c)
  40. app.redis = _redis
  41. app.conn = _redis
  42. @app.listener('before_server_stop')
  43. async def before_server_stop(app, loop):
  44. app.redis.close()
  45. await app.redis.wait_closed()
  46. await app.service.deregister()
  47. app.queue.join()
  48. if __name__ == '__main__':
  49. print(init_text)
  50. app.run(host='0.0.0.0', port=7001)
 

如何定义url:

url:

  1. from yourview.py import YourClass
  2. def app_url(app):
  3. app.router.add(uri='/hello', methods=['GET'], handler=YourClass().as_views)
  1. #yourviews.py
  1. from sanic.response import json
  2. from easy_sanic.restful.operation_handler import ResourceBase, operation
  3. class RestStatus:
  4. @classmethod
  5. def response_status(cls, ret, message, data=""):
  6. return json({"ret": ret, "message": message, "data":data})
  7. class YourClass(ResourceBase):
  8. async def get(self, request):
  9. return RestStatus.response_status(200, "ok", data=data)
  10. async def post(self, request):
  11. request_data = request.form
  12. return RestStatus.response_status(200, "ok", data=data)
  13. def delete(self, request):
  14. print("i am delete")
  15. return RestStatus.response_status(400, "request method error")
  16. @operation(flag=True)
  17. def custom_url(self, request):
  18. print("i am print hh")
  19. return RestStatus.response_status(400, "request method error")
  20. @operation(flag=False)
  21. def hello(self, request):
  22. print("afwefaewfaw")
  23. return RestStatus.response_status(200, "pengfeng")
现在你可以通过url 进行 get、post、delete 访问了,支持http(get、post、delete、put) 同时可以自定义方法

使用operation, flag=True 为get方法, False 为 post方法,使用如下:

http://127.0.0.1:port/hello?operation=custom_url

如何定义orm models: orm: models.py

  1. from easy_sanic.db.orm import SqlObject, FieldObject, TableName
  2. #User message
  3. class User(metaclass=SqlObject):
  4. id = FieldObject('id', 'varchar(200) primary key')
  5. name = FieldObject('name', 'varchar(200)')
  6. password = FieldObject('password', 'varchar(200)')
  7. table_name = TableName('users')

如何使用model orm

在view 里面

  1. from easy_sanic.restful.operation_headler import ResourceBase, operation
  2. class ProvilegeRole(ResourceBase):
  3. async def get(self, request):
  4. data = await User.filter(request, id='yinxingpan')
  5. new_obj = User(id="yinxingpan", name="haha2", password="123")
  6. result = await new_obj.save(request)
  7. print(data)
  8. return RestStatus.response_status(200, "ok", data=data)
其中 model.filter、model.save 必须传递request方法

目前支持postgres,redis

redis的使用:

  1. with await request.app.conn as conn:
  2. # await conn.get("NOT_RESTRICT_URL") restrict
  3. url_status = await conn.execute('SISMEMBER', "key", "value")
部署: gunicorn app:app --bind 0.0.0.0:7001 --worker-class sanic.worker.GunicornWorker -w 2
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/136928?site
推荐阅读
相关标签
  

闽ICP备14008679号