当前位置:   article > 正文

0.FastApi+WebSocket框架_fastapi websocket headers

fastapi websocket headers

XActivity

from fastapi import FastAPI, WebSocket,WebSocketDisconnect
from starlette.middleware.cors import CORSMiddleware
origins = ["*", ]
class XActivity:
    def __init__(self):
        self.app = FastAPI()
        self.app.add_middleware(
            CORSMiddleware,
            allow_origins=origins,
            allow_credentials=True,
            allow_methods=["*"],
            allow_headers=["*"])
        self.init_base()
        self.init_utils()
        self.init_fragment()

    def init_fragment(self):
        pass

    def init_base(self):
        pass

    def init_utils(self):
        pass
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

XFragment

class XFragment:
    def __init__(self,app,name):
        self.name = name
        self.run(app)

    def get_name(self, tag):
        mind = "{}/{}".format(self.name, tag)
        if mind[0] != "/":
            mind = "/" + mind
        return mind

    def run(self, app):
        raise Exception
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
'
运行

XMaster

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from redis import Redis
import asyncio, time, gc, sys, gc
from .XFragment import XFragment
from pydantic import BaseModel
from dataclasses import dataclass
from ..XUtils.XUtils import XUta


class LoginItem(BaseModel):
    user: str
    password: str


class R:
    def __init__(self, db):
        self.r = Redis(db=db, decode_responses=True)
        self.p = self.r.pipeline()
        self.r.flushdb()


class R0(R):
    def __init__(self):
        R.__init__(self, 5)


class R1(R):
    def __init__(self):
        R.__init__(self, 4)

    def iscrea(self, aka, dt, cand):
        data = MasterItem(aka, dt, cand).show()
        self.p.multi()
        self.p.hmset(aka, data)
        self.p.expire(aka, 60)
        self.p.execute()
        return SlaveItem(aka, dt).show()

    def iscand(self, aka):
        return self.r.hget(aka, "cand")


class Mind:
    def show(self):
        return self.__dict__


@dataclass()
class SlaveItem(Mind):
    aka: str
    dt: str


@dataclass()
class MasterItem(SlaveItem):
    cand: str


@dataclass()
class AntItem(Mind):
    cand: str
    dt: str
    por: str


class XMaster(XFragment, XUta):
    def __init__(self, app: FastAPI):
        XFragment.__init__(self, app, "main")
        self.r0 = R0()
        self.r1 = R1()

    def run(self, app):
        self._websocket(app)
        self._login(app)

    def _login(self, app):
        @app.post(self.get_name("login"))
        async def login(item: LoginItem):
            return self.login(item.user, item.password)

    def add(self, func, id, seconds=1):
        self.s0.add_job(func, 'interval', seconds=seconds, id=id)

    def login(self, user, password):
        raise Exception

    def _websocket(self, app: FastAPI):
        raise Exception

    def keeplive(self, cand, por):
        return self.r0.r.hget(cand, "por") == por

    def crea(self, cand, aka):
        if self.r0.r.exists(cand) == 1:
            self.kill(cand)
        self.r1.r.delete(aka)
        por = self.get_uid()
        data = AntItem(cand, self.get_dt(), por).show()
        self.r0.p.multi()
        self.r0.p.hmset(cand, data)
        self.r0.p.expire(cand, 60)
        self.r0.p.execute()
        return por
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103

XSlave

from fastapi import WebSocket,WebSocketDisconnect
import asyncio,os,sys
from ..XUtils.XUtils import XUta
class XSlave(XUta):
    def __init__(self,websocket: WebSocket,aka,cand,por):
        XUta.__init__(self)
        self.websocket = websocket
        self.cand = cand
        self.aka = aka
        self.por = por
        self.keep = True

    def shark(self,msg):
        raise Exception

    def startlight(self):
        pass

    def shark(self,msg):
        print(msg)

    def keeplive(self,cand,por):
        raise Exception
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

XUtils

import time, datetime, os, shutil
from uuid import uuid4
class XUta:

    def get_dt(self, t=True):
        dt = datetime.datetime.now().replace(microsecond=0)
        if t:
            dt = str(dt)
        return dt

    def get_uid(self):
        return uuid4().hex

    def make_dt(self, date):
        return datetime.datetime.strptime(date, "%Y-%m-%d %H:%M:%S")


class XUtb:

    def new_bag(self, path):
        if os.path.exists(path) == False:
            os.makedirs(path)
        return path

    def new_file(self, old_path, new_path, cur_path):
        if os.path.exists(old_path):
            os.remove(old_path)
        shutil.copy(cur_path, new_path)

    def __ispath(self, path, na):

        ge = os.path.exists(path)
        if ge:
            return path
        else:
            if na:
                raise Exception

    def hard_path(self, path):
        self.__ispath(path, True)

    def simp_path(self, path):
        return self.__ispath(path, False)


class XUtils(XUta, XUtb):
    def __init__(self):
        XUta.__init__(self)
        XUtb.__init__(self)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
'
运行
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/869457
推荐阅读
相关标签
  

闽ICP备14008679号