赞
踩
XActivity
from fastapi import FastAPI, WebSocket,WebSocketDisconnect from starlette.middleware.cors import CORSMiddleware origins = ["*", ] class XActivity: def __init__(self): self.app = FastAPI() self.app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) self.init_base() self.init_utils() self.init_fragment() def init_fragment(self): pass def init_base(self): pass def init_utils(self): pass
XFragment
class XFragment:
def __init__(self,app,name):
self.name = name
self.run(app)
def get_name(self, tag):
mind = "{}/{}".format(self.name, tag)
if mind[0] != "/":
mind = "/" + mind
return mind
def run(self, app):
raise Exception
XMaster
from fastapi import FastAPI, WebSocket, WebSocketDisconnect from redis import Redis import asyncio, time, gc, sys, gc from .XFragment import XFragment from pydantic import BaseModel from dataclasses import dataclass from ..XUtils.XUtils import XUta class LoginItem(BaseModel): user: str password: str class R: def __init__(self, db): self.r = Redis(db=db, decode_responses=True) self.p = self.r.pipeline() self.r.flushdb() class R0(R): def __init__(self): R.__init__(self, 5) class R1(R): def __init__(self): R.__init__(self, 4) def iscrea(self, aka, dt, cand): data = MasterItem(aka, dt, cand).show() self.p.multi() self.p.hmset(aka, data) self.p.expire(aka, 60) self.p.execute() return SlaveItem(aka, dt).show() def iscand(self, aka): return self.r.hget(aka, "cand") class Mind: def show(self): return self.__dict__ @dataclass() class SlaveItem(Mind): aka: str dt: str @dataclass() class MasterItem(SlaveItem): cand: str @dataclass() class AntItem(Mind): cand: str dt: str por: str class XMaster(XFragment, XUta): def __init__(self, app: FastAPI): XFragment.__init__(self, app, "main") self.r0 = R0() self.r1 = R1() def run(self, app): self._websocket(app) self._login(app) def _login(self, app): @app.post(self.get_name("login")) async def login(item: LoginItem): return self.login(item.user, item.password) def add(self, func, id, seconds=1): self.s0.add_job(func, 'interval', seconds=seconds, id=id) def login(self, user, password): raise Exception def _websocket(self, app: FastAPI): raise Exception def keeplive(self, cand, por): return self.r0.r.hget(cand, "por") == por def crea(self, cand, aka): if self.r0.r.exists(cand) == 1: self.kill(cand) self.r1.r.delete(aka) por = self.get_uid() data = AntItem(cand, self.get_dt(), por).show() self.r0.p.multi() self.r0.p.hmset(cand, data) self.r0.p.expire(cand, 60) self.r0.p.execute() return por
XSlave
from fastapi import WebSocket,WebSocketDisconnect import asyncio,os,sys from ..XUtils.XUtils import XUta class XSlave(XUta): def __init__(self,websocket: WebSocket,aka,cand,por): XUta.__init__(self) self.websocket = websocket self.cand = cand self.aka = aka self.por = por self.keep = True def shark(self,msg): raise Exception def startlight(self): pass def shark(self,msg): print(msg) def keeplive(self,cand,por): raise Exception
XUtils
import time, datetime, os, shutil from uuid import uuid4 class XUta: def get_dt(self, t=True): dt = datetime.datetime.now().replace(microsecond=0) if t: dt = str(dt) return dt def get_uid(self): return uuid4().hex def make_dt(self, date): return datetime.datetime.strptime(date, "%Y-%m-%d %H:%M:%S") class XUtb: def new_bag(self, path): if os.path.exists(path) == False: os.makedirs(path) return path def new_file(self, old_path, new_path, cur_path): if os.path.exists(old_path): os.remove(old_path) shutil.copy(cur_path, new_path) def __ispath(self, path, na): ge = os.path.exists(path) if ge: return path else: if na: raise Exception def hard_path(self, path): self.__ispath(path, True) def simp_path(self, path): return self.__ispath(path, False) class XUtils(XUta, XUtb): def __init__(self): XUta.__init__(self) XUtb.__init__(self)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。