赞
踩
1.前端网站模板 http://www.cssmoban.com/ 2.前端开源项目CDN https://www.bootcdn.cn/ 3.前端W3C https://www.w3school.com.cn/ 4.HTML、CSS 和 JS 框架 https://v3.bootcss.com/ 5.字体图标,和bootcss完美兼容 http://www.fontawesome.com.cn/ 6.按钮弹出框,与bootcss完美兼容 https://lipis.github.io/bootstrap-sweetalert/ 7. web 前端学习网站:https://segmentfault.com/a/1190000033134496 7. 黑马视频库 http://yun.itheima.com/ 8. AI开放平台(代码开源) https://ai.baidu.com/ 9.讯飞开放平台(语音识别做的好) https://www.xfyun.cn/ 10.图灵机器人 http://www.turingapi.com/ 11.软件项目的托管平台 https://github.com 12.码云 https://gitee.com 10程序员临时在线工具箱 https://tool.lu/ 11.程序员客栈 https://www.proginn.com/ 12.阿里七牛云(图片、视频图床) https://www.qiniu.com/ 13.支付宝开发者中心(收钱接口)
(1) 处理一:
netstat -tlnp|grep 8004 kill -9 进程id
(2) 处理二:
lsof -i :5000 # 这个命令针对我用的port 5000
kill -9 进程id
结束占用某个端口的进程
(1)、打开cmd命令窗口,输入命令:netstat -ano | findstr 8080,根据端口号查找对应的PID
(2)、根据PID找进程名称,输入命令:tasklist | findstr 9268,发现是占用8080端口的进程为:javaw.exe。
(3)、根据PID结束对应进程。输入命令taskkill -PID 2188 -F,强制关闭PID为2188的进程。
redis-cli -h 192.168.2.1 -p 26379
auth 密码
安装ssh服务
sudo apt-get install openssh-server
启动ssh服务
sudo /etc/init.d/ssh start
设置开机自启动
sudo systemctl enable ssh
关闭ssh开机自动启动命令
sudo systemctl disable ssh
单次开启ssh
sudo systemctl start ssh
单次关闭ssh
sudo systemctl stop ssh
设置好后重启
reboot
ssh 到指定端口 ssh -p xx user@ip xx 为 端口号 user为用户名 ip为要登陆的ip
sudo apt-get autoremove mongod
sudo apt-get autoclean mongodb #清除残留数据
dpkg -l |grep ^rc|awk '{print $2}' |tr ["\n"] [" "]|sudo xargs dpkg -P
1.查询版本
aptitude show 软件名 或者 dpkg -l软件名
2.查询安装路径
dpkg -L 软件名 或者 whereis 软件名
indicator-sysmonitor &
bin/elasticsearch -d
linux终端实现分屏 终端安装terminator:sudo apt-get install terminator 安装完成后重启终端 终端输入:terminator 垂直分屏:Ctrl + shift + O 水平分屏:Ctrl + shift + E Ctrl+Shift+E 垂直分割窗口 Ctrl+Shift+O 水平分割窗口 F11 全屏 Ctrl+Shift+C 复制 Ctrl+Shift+V 粘贴 Ctrl+Shift+N 或者 Ctrl+Tab 在分割的各窗口之间切换 Ctrl+Shift+X 将分割的某一个窗口放大至全屏使用 Ctrl+Shift+Z 从放大至全屏的某一窗口回到多窗格界面 Ctrl+Shift+o 水平分割终端(分成上下两个窗口) Ctrl+Shift+e 垂直分割终端(分成左右两个窗口) Ctrl+Shift+w 关闭当前终端 Ctrl+Shift+q 关闭所有终端(退出程序) Ctrl+Shift+x 放大(还原)当前终端 Ctrl+Shift+g 清屏(全清) Ctrl+l 清屏(留最新一行) Ctrl+Tab 在不同的工作区间循环 Ctrl+p 在不同的工作区间循环 Alt+上/下/左/右 在不同的工作区间移动 Ctrl+Shift+左/右 在垂直分割的终端中将分割条向左/右移动 Ctrl+Shift+上/下 在垂直分割的终端中将分割条向上/下移动 Ctrl+Shift+s 隐藏/显示滚动条 F11 全屏/退出全屏
curl host:port/
sudo ufw disable ufw 3.开启/禁用 端口 sudo ufw allow|deny [service] 打开或关闭某个端口,例如: sudo ufw allow smtp 允许所有的外部IP访问本机的25/tcp (smtp)端口 sudo ufw allow 22/tcp 允许所有的外部IP访问本机的22/tcp (ssh)端口 sudo ufw allow 53 允许外部访问53端口(tcp/udp) sudo ufw allow from 192.168.1.100 允许此IP访问所有的本机端口 sudo ufw allow proto udp 192.168.0.1 port 53 to 192.168.0.2 port 53 sudo ufw deny smtp 禁止外部访问smtp服务 sudo ufw delete allow smtp 删除上面建立的某条规则 4.查看防火墙状态 sudo ufw status 一般用户,只需如下设置: sudo apt-get install ufw sudo ufw enable sudo ufw default deny 以上三条命令已经足够安全了,如果你需要开放某些服务,再使用sudo ufw allow开启。 开启/关闭防火墙 (默认设置是’disable’) sudo ufw enable|disable
方法一:Edit -> Convert Indents -> To Spaces
方法二:ctrl + shift + A => 在弹出的窗口中输入“To Spaces”就可以将所有的tab转为space
docker stats --format "table{{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}" (格式化输出的结果) .Container 根据用户指定的名称显示容器的名称或 ID。 .Name 容器名称。 .ID 容器 ID。 .CPUPerc CPU 使用率。 .MemUsage 内存使用量。 .NetIO 网络 I/O。 .BlockIO 磁盘 I/O。 .MemPerc 内存使用率。 .PIDs PID 号。 ———————————————— 原文链接:https://blog.csdn.net/QMW19910301/article/details/88058769
https://blog.csdn.net/yjk13703623757/article/details/80283729
import grequests
req_list = [ # 请求列表
grequests.get('http://httpbin.org/get?a=1&b=2'),
grequests.post('http://httpbin.org/post', data={'a':1,'b':2}),
grequests.put('http://httpbin.org/post', json={'a': 1, 'b': 2}),
]
res_list = grequests.map(req_list) # 并行发送,等最后一个运行完后返回
print(res_list[0].text) # 打印第一个请求的响应文本
du -h --max-depth=1
pip install --default-timeout=1000 --no-cache-dir -r requirements.txt pip3 install fitz-i https://pypi.doubanio.com/simple/ pip install -i https://pypi.doubanio.com/simple/ fitz 切换镜像源: 豆瓣:http://pypi.douban.com/simple/ 清华:https://pypi.tuna.tsinghua.edu.cn/simple 阿里云:http://mirrors.aliyun.com/pypi/simple/ 山东理工大学:http://pypi.sdutlinux.org/ 中国科技大学 https://pypi.mirrors.ustc.edu.cn/simple/ 华中科技大学:http://pypi.hustunique.com 在或者: pip3 --default-timeout=1688 install 包名称 -f https://download.pytorch.org/whl/cu113/torch_stable.html -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com --default-timeout=1688其实意思就是让其检测延迟的时间变长,以防止因为你的网络问题而直接报错
ps -ef |grep pycharm |awk '{print $2}'|xargs kill -9
1, 安装ANT
sudo apt install ant
(1), http://mirrors.hust.edu.cn/apache/lucene/pylucene/pylucene-7.7.1-src.tar.gz
下面版本错误
whereis+文件名
find / -name +文件名
locate+文件名
which+可执行文件名
redis-cli -h 127.0.0.1 -p 6379
config set requirepass 123456
"""
Args:
origin_authorization: 回调时请求Header中的Authorization字段
url: 回调请求的url
body: 回调请求的body
content_type: 回调请求body的Content-Type
Returns:
返回true表示验证成功,返回false表示验证失败
"""
curl -XPOST http:///api/v1/appid
unzip -O GBK/GB18030CP936 xx.zip
conda create -n py_flask_small python=3.7
conda create -n 环境名称 python=python版本号
(1), 安装
pip install virtualenv
(2), 查找python3解释器
# find / -name python3
(3), 环境创建
# virtualenv --no-site-packages -p /usr/local/bin/python3 环境名
(4),激活
source 环境名/bin/activate
(5), 退出环境
deactivate
安装字体
sudo apt-get install fonts-droid-fallback ttf-wqy-zenhei ttf-wqy-microhei fonts-arphic-ukai fonts-arphic-uming
参考链接:
https://blog.csdn.net/aimill/article/details/82152173
(1),安装alembic
pip install alembic
(2), alembic 具体使用
todo
pytest -s -n auto --cov=./ --cov-report=html --cov-config .coveragerc
-s 打印输出
-n auto 可以自动检测到系统的CPU核数;从测试结果来看,检测到的是逻辑处理器的数量,即假12核
使用auto等于利用了所有CPU来跑用例,此时CPU占用率会特别高
-n 电脑核心数
--cov=./ --cov-report=html --cov-config .coveragerc cov 配置
模块级别:setup_module、teardown_module
函数级别:setup_function、teardown_function,不在类中的方法
类级别:setup_class、teardown_class
方法级别:setup_method、teardown_method
方法细化级别:setup、teardown
import time import datetime # 把datetime转成字符串 def datetime_toString(dt, format="%Y-%m-%d %H:%M:%S"): return dt.strftime(format) # 把字符串转成datetime def string_toDatetime(string, format="%Y-%m-%d %H:%M:%S"): return datetime.datetime.strptime(string, format) # 把重新格式化时间字符串 def string_time_format(string, srcformat="%Y%m%d", target_format="%Y-%m-%d %H:%M:%S"): dt = string_toDatetime(string, srcformat) target_string = datetime_toString(dt, target_format) return target_string # 把字符串转成时间戳形式 def string_toTimestamp(strTime): return time.mktime(string_toDatetime(strTime).timetuple()) # 把时间戳转成字符串形式 def timestamp_toString(stamp): return time.strftime("%Y-%m-%d-%H", time.localtime(stamp)) TODAY = datetime.datetime.today() YESTERDAY = TODAY - datetime.timedelta(days=1) TOMORROW = TODAY + datetime.timedelta(days=1)
'//ul[@class="dropdown-menu"]/li[@role="option"]'
driver.find_elements_by_xpath(‘//ul[@class=“dropdown-menu”]/li[@role=“option”]’)
$x(‘//li[contains(text(),“我的待办”)]’)
url_list = html.xpath(“//ul[@class=‘imgList’]/li/a/@href”)
3、根据XPath来选择元素 :
*xpath=//table[@id=‘table1’]//tr[4]/td[2]
xpath=//a[contains(@href,‘#id1’)]
xpath=//a[contains(@href,‘#id1’)]/@class
xpath=(//table[@class=‘stylee’])//th[text()=‘theHeaderText’]/…/td
xpath=//input[@name=‘name2’ and @value=‘yes’]
xpath=//*[text()=“right”]
https://blog.csdn.net/crystal_ooo/article/details/9312769
nohup jupyter notebook --allow-root > jupyter.log 2>&1 &
参考: https://blog.csdn.net/u012206617/article/details/92797037?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_title-0&spm=1001.2101.3001.4242
···
import glob, os
directory = r"D:\Desktop\44"
res_paths = os.path.join(directory, ‘/.pdf’)
glob.glob(res_paths)
···
https://blog.csdn.net/zhangjunli/article/details/104092847
https://docs.python.org/zh-cn/3/py-modindex.html
rm -rf /usr/local/python
删除 默认 python 链接
ln -s /usr/local/python3/bin/python3 /usr/bin/python
建立 python3 软连接
Tab 自动路径补全 Ctrl+T 建立新页签 Ctrl+W 关闭页签 Ctrl+Tab 切换页签 Alt+F4 关闭所有页签 Alt+Shift+1 开启cmd.exe Alt+Shift+2 开启powershell.exe Alt+Shift+3 开启powershell.exe (系统管理员权限) Ctrl+1 快速切换到第1个页签 Ctrl+n 快速切换到第n个页签( n值无上限) Alt + enter 切换到全屏状态 Ctr+r 历史命令搜索 Tab 自动路径补全 Ctrl+T 建立新页签 Ctrl+W 关闭页签 Ctrl+Tab 切换页签 Alt+F4 关闭所有页签 Alt+Shift+1 开启cmd.exe Alt+Shift+2 开启powershell.exe Alt+Shift+3 开启powershell.exe (系统管理员权限) Ctrl+1 快速切换到第1个页签 Ctrl+n 快速切换到第n个页签( n值无上限) Alt + enter 切换到全屏状态 Ctr+r 历史命令搜索 Win+Alt+P 开启工具选项视窗 安装及中文乱码解决参考: https://www.jianshu.com/p/ef116c6953f6
https://github.com/miaomiaosoft/PandaOCR
def deal_xlsx_to_xls():
import openpyxl as xl
suffix = os.path.split(self.export_path)[-1].split('.')[-1]
if suffix == "xlsx":
wb = xl.load_workbook(self.export_path)
export_path = os.path.join(
os.path.split(self.export_path)[0], f"{os.path.split(self.export_path)[-1].split('.')[0]}.xls"
)
wb.save(export_path)
return export_path
return self.export_path
nginx.exe -s quit #停止服务
nginx.exe -s stop #停止服务
nginx.exe -s reload #重启服务
参考:
from tornado.httpserver import HTTPServer from tornado.ioloop import IOLoop from tornado.wsgi import WSGIContainer from application import app def main(): http_server = HTTPServer(WSGIContainer(app)) http_server.listen(5000) # 开启多线程,但start需要调用os.fork()函数,windows下os没有该函数 # http_server.bind(5000) # http_server.start(0) IOLoop.instance().start() """ 单进程: http_server = HTTPServer(WSGIContainer(app)) http_server.listen(5000) IOLoop.instance().start() +++++++++++++++++++++++++++++ 多进程 http_server = HTTPServer(WSGIContainer(app)) # 开启多线程,但start需要调用os.fork()函数,windows下os没有该函数 # http_server.bind(5000) # http_server.start(0) # 指定fork子进程数量 IOLoop.instance().start() +++++++++++++++++++++++++++++ 高级多进程 sockets = tornado.netutil.bind_sockets(8888) tornado.process.fork_processes(0) server = HTTPServer(app) server.add_sockets(sockets) IOLoop.current().start() """ if __name__ == '__main__': main()
tpis:
https://segmentfault.com/a/1190000018117085
https://www.cnblogs.com/jie-fang/p/13437335.html https://blog.csdn.net/qimowei/article/details/119517167
yum install wget jq psmisc vim net-tools telnet yum-utils device-mapper-persistent-data lvm2 git -y
设置yum源为阿里云
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
解决pip安装库报错 ImportError: cannot import name 'SCHEME_KEYS'
解决"pip Fatal error in launcher: Unable to create process using … "的错误
// 先打开管理员cmd:
python -m ensurepip
// 找到pip位置然后删除文件夹,重新安装pip:
python -m ensurepip
// 更新pip:
python -m pip install --upgrade pip
`
当单元格公式过长时,openpyxl 会进行截断操作,并只将一部分写入单元格中,从而造成数据丢失的情况。这通常发生在公式中引用了大量单元格,并且这些单元格的地址以字符串常量的形式直接嵌入了公式中。
为了避免这种情况的发生,可以使用openpyxl提供的openpyxl.utils.formula模块中的range_boundaries函数,该函数可以将一个范围字符串转换成一个二元组,表示范围的起始行和结束行。使用这个函数可以避免在公式中直接写入引用单元格的地址,而是使用起始行和结束行来替代这些地址,从而减少公式长度。
from openpyxl import Workbook from openpyxl.utils.formula import range_boundaries def write_formula(sheet, formula, cell_range): # 获取单元格范围的起始行和结束行 start_row, end_row, _start_col, _end_col = range_boundaries(cell_range) # 构造新的单元格范围字符串,使用起始行和结束行 new_range = f"A{start_row}:A{end_row}" # 将新的单元格范围字符串替换公式中原来的范围字符串 formula = formula.replace(cell_range, new_range) # 在公式中使用替换后的范围字符串,并写入单元格 sheet.cell(row=start_row, column=1).value = formula wb = Workbook() ws = wb.active # 首先写入数值 for i in range(1, 11): ws.cell(row=i, column=1).value = i # 然后写入公式,引用前10行单元格之和 write_formula(ws, "=SUM({A1}:{A10})", 'A1:A10') wb.save("test.xlsx") 使用range_boundaries函数获得了单元格范围起始行和结束行,然后构造了一个新的单元格范围字符串并替换了原来的范围字符串。在构造公式时,使用新的单元格范围字符串代替了原来的范围字符串,从而减少了公式长度。 这样就可以避免公式过长而导致截断的问题了。
Target database is not up to date
解决方式:
https://blog.csdn.net/weixin_43573931/article/details/130478235
netstat -tlnp|grep 8004
kill -9 进程id
redis-cli -h 192.168.2.1 -p 26379
uth 密码
ssh root@192.168.1.100 # 利用远程机的用户登录
#上传
scp` /home/omd/h.txt root@192.168.25.137:``/home/omd/
scp /home/omd/h.txt root@192.168.25.137:``/home/omd/
scp ./redis-5.0.4.tar.gz root@172.16.5.123 /home/
#下载
scp [-r] jiangzhaowei@211.154.xxx.xxx:/kk/jiangzhaowei/share/webCompileOut.sql ./
ls -lrt
ls -lt
chmod -R shart_server.sh 777
文件或目录的权限又分为3种:只读、只写、可执行。
权限 | 权限数值 | 具体作用 |
---|---|---|
r | 4 | read,读取。当前用户可以读取文件内容,当前用户可以浏览目录。 |
w | 2 | write,写入。当前用户可以新增或修改文件内容,当前用户可以删除、移动目录或目录内文件。 |
x | 1 | execute,执行。当前用户可以执行文件,当前用户可以进入目录。 |
依照上面的表格,权限组合就是对应权限值求和,如下:
7 = 4 + 2 + 1 读写运行权限
5 = 4 + 1 读和运行权限
4 = 4 只读权限
(1) 更改shell
chsh -s /bin/zsh
(2) zsh 的配置
export ZSH="/home/duanweiye/.oh-my-zsh"
ZSH_THEME="robbyrussell"
plugins=(
git
zsh-syntax-highlighting
zsh-autosuggestions
)
source $ZSH/oh-my-zsh.sh
. /usr/share/autojump/autojump.sh
export PATH="/Users/�username/anaconda3/bin:$PATH"
export PATH="/Users/�username/anaconda3/bin:$PATH"
export PATH="/Users/�username/anaconda3/bin:$PATH"
export PATH="/home/duanweiye/anaconda3/bin:$PATH"
source ~/.oh-my-zsh/custom/plugins/zsh-syntax-highlighting/zsh-syntax-highlighting.zsh
https://ywnz.com/linuxyffq/4913.html
~/.ssh/config
添加所有远程主机详细信息,如下所示:
Host webserver
HostName 192.168.2.199
Port 4567(默认 22 可以不写)
User aikaka
alias cls='clear'
alias tlog='tail -f *.log'
10、 获取当前文件夹下面文件数量大小
ls -l | grep "^-" | wc -l
主分支 master 主分支,所有提供给用户使用的正式版本,都在这个主分支上发布
开发分支 dev 开发分支,永远是功能最新最全的分支
功能分支 feature-* 新功能分支,某个功能点正在开发阶段
发布版本 release-* 发布定期要上线的功能
修复分支 bug-* 修复线上代码的 bug
常用指令:
""" 1.创建分支 >: git branch 分支名 2.查看分支 >: git branch 3.切换分支 >: git checkout 分支名 4.创建并切换到分支 >: git checkout -b 分支名 5.删除分支 >: git branch -d 分支名 6.查看远程分支 >: git branch -a 7.合并分支 先到主分支master,再把次分支dev合并到主分支 >: git merge 分支名 8.回滚本地分支 git reset --hard 版本号 9.本地版本强行提交给服务器 git push origin dev -f """
git 分支命名:缩写+release + 版本号
Git冲突:commit your changes or stash them before you can merge.
第一种方法:(简单易懂)
1、git add .(后面有一个点,意思是将你本地所有修改了的文件添加到暂存区)
2、git commit -m""(引号里面是你的介绍,就是你的这次的提交是什么内容,便于你以后查看,这个是将索引的当前内容与描述更改的用户和日志消息一起存储在新的提交中)
3、git pull origin master 这是下拉代码,将远程最新的代码先跟你本地的代码合并一下,如果确定远程没有更新,可以不用这个,最好是每次都执行以下,完成之后打开代码查看有没有冲突,并解决,如果有冲突解决完成以后再次执行1跟2的操作
4、git push origin master 将代码推至远程就可以了
第二种方法: 1、git stash (这是将本地代码回滚值至上一次提交的时候,就是没有你新改的代码) 2、git pull origin master(将远程的拉下来) 3、git stash pop(将第一步回滚的代码释放出来,相等于将你修改的代码与下拉的代码合并) 然后解决冲突,你本地的代码将会是最新的代码 4、git add . 5、git commit -m"" 6、git push origin master 这几步将代码推至了远程 最后再git pull origin master 一下,确保远程的全部拉下来,有的你刚提交完有人又提交了,你再拉一下会避免比的不是最新的问题 git stash 详细介绍: 接下来diff一下此文件看看自动合并的情况,并作出相应的修改。 git stash:备份当前的工作区,从最近一次提交中读取相关内容,让工作区保持和上一次提交的内容一致。同时,将工作区的内容保存到git栈中。 git stash pop:从git栈中读取最近一次保存的内容,恢复工作区的相关内容。由于可能存在多个stash的内容,所以用栈来管理,pop会从最近一个stash中读取内容并恢复到工作区。 git stash list:显示git栈内的所有备份,可以利用这个列表来决定从那个地方恢复。 git stash clear:情况git栈。
git tag -a tagName -m "my tag"
git tag -a v1.2 9fceb02 -m "my tag"
git tag -a dev_v_1.0 -m "初始化项目"
git push origin dev_v_1.0
打tag不必要在head之上,也可在之前的版本上打,这需要你知道某个提交对象的校验和(通过git log获取,取校验和的前几位数字即可)。
同提交代码后,使用git push
来推送到远程服务器一样,tag
也需要进行推送才能到远端服务器。
使用git push origin [tagName]
推送单个分支。
git push origin v1.0
推送本地所有tag,使用git push origin --tags
。
跟分支一样,可以直接切换到某个tag去。这个时候不位于任何分支,处于游离状态,可以考虑基于这个tag创建一个分支。
git checkout 版本号
本地删除
git tag -d v0.1.2
远端删除
git push origin :refs/tags/
git push origin :refs/tags/v0.1.2
查询 tag 下面信息
git show 标签名称 查询标签的具体信息
git checkout dev
git pull
git checkout master
git merge dev
git push -u origin master
git checkout master
git pull
git checkout dev
git merge master
git push -u origin dev
git reset 文件
第一种方法:git 命令
找到commit id
456dcfaa55823476b30b6b2e5cbbb9c00bbcbf56
Git命令 回退到某个版本命令
git reset --hard 456dcfaa55823476b30b6b2e5cbbb9c00bbcbf56
强制提交到dev分支
git push -f -u origin dev
git push origin --delete [branch_name]
删除本地分支
git branch -D <BranchName>
本地回滚之后,
$ git push --force origin
在本地工作区做了无用的修改后,可以用远程remote的代码强制将覆盖本地代码,操作如下
git fetch –all
git reset –hard origin/master
merge request 出現diff 處理
解决方式一: git check master git pull git checkout local #切换到local分支后, 就是修改代码 #修改完了, 就正常提交代码-------git commit #如果有多次local分支的提交,就合并,只有一次可以不合并 git rebase -i HEAD~2 //合并提交 --- 2表示合并两个 #将master内容合并到local git rebase master---->解决冲突--->git rebase --continue #再起切换到master或其他目标分支 git checkout master #将local合并到master git merge local #推送到远程仓库 git push 解决方式二: git fetch 远程库 git rebase upstream/master 解决冲突 git rebase --continue #同步起点 git push -f # 提交
git checkout -f newBranch
修改已经push 到仓库的commit
1, git commit --amend
//修改提交信息操作
2, git push --force-with-lease origin master
修改本地 commit
1, git commit --amend
2, 正常git push
1. 本地分支重命名(还没有推送到远程)
git branch -m oldname newname
2. 远程分支重命名 (已经推送远程-假设本地分支和远程对应分支名称相同)
a. 重命名远程分支对应的本地分支
git branch -m oldName newName
b. 删除远程分支
git push --delete origin old
c. 上传新命名的本地分支
git push oringin newName
d.把修改后的本地分支与远程分支关联
git branch --set-upstream-to origin newName
error: failed to push some refs to 'github.com:17865135532/leetcode_.git'
hint: Updates were rejected because the tip of your current branch is behind
hint: its remote counterpart. Integrate the remote changes (e.g.
hint: 'git pull ...') before pushing again.
hint: See the 'Note about fast-forwards' in 'git push --help' for details.
**原因:**本地库与代码库未同步
解决办法:
git pull --rebase origin main
git push origin main
feat: 新功能(feature)
fix: 修补bug
docs: 文档(documentation)
style: 格式(不影响代码运行的变动)
refactor: 重构(即不是新增功能,也不是修改bug的代码变动)
test: 增加测试
chore: 构建过程或辅助工具的变动
https://www.jianshu.com/p/beee3e89f163
参考: https://www.cnblogs.com/ruanraun/p/supervisor.html
pip install supervisor #因为supervisor目前只支持py2,所以不能用pip3进行安装 emmmmmm
echo_supervisord_conf > supervisor.conf # 导出配置模板
然后supervisor.conf 尾部添加内容如下:
[program:hello_world] ;hello_word 是自己给进程取的名,随意
command=gunicorn -c gunc.py hello:app ; supervisor启动命令
directory=/www/Flask-Learning/hello_world ; 项目的文件夹路径
startsecs=0 ; 启动时间
stopwaitsecs=0 ; 终止等待时间
autostart=false ; 是否自动启动
autorestart=false ; 是否自动重启
stdout_logfile=/www/log/hello.log ; log 日志
stderr_logfile=/www/log/hello.err ; 错误日志
#关闭所有任务
supervisorctl shutdown
# 启动某个进程
supervisorctl start programxxx
# 重启某个进程
supervisorctl restart programxxx
# 停止全部进程 注:start、restart、stop都不会载入最新的配置文件
supervisorctl stop all
# 载入最新的配置文件,停止原有进程并按新的配置启动、管理所有进程。
supervisorctl reload
# 根据最新的配置文件,启动新配置或有改动的进程,配置没有改动的进程不会受影响而重启。
supervisorctl update
注意:显式用stop停止掉的进程,用reload或者update都不会自动重启
运行创建配置文件
echo_supervisord_conf>/etc/supervisord.conf # /etc 目录 需要 root 权限
echo_supervisord_conf > supervisord.conf # 如果没有root权限,选择当前项目目录
参考: https://zhuanlan.zhihu.com/p/271730138
# 先启动supervisor
supervisord -c supervisor.conf
# 在 restart all
错误:
Starting supervisor: Error: Another program is already listening on a port that one of our HTTP servers is configured to use. Shut this program down first before starting supervisord.
For help, use /usr/bin/supervisord -h
解决办法:
ps -ef | grep supervisord
kill -9 pid
supervisord -c /etc/supervisor/supervisord.conf
参考
supervisor安装配置以及常见的错误
https://blog.csdn.net/qq_38234594/article/details/89923583
错误:
unix:///tmp/supervisor.sock no such file
原因:
linux自动清掉 /tmp 文件夹
解决方案:
修改supervisord的配置文件
/tmp/supervisor.sock 改成 /var/run/supervisor.sock,
/tmp/supervisord.log 改成 /var/log/supervisor.log,
/tmp/supervisord.pid 改成 /var/run/supervisor.pid
serverurl=unix:///tmp/supervisor.sock 改成serverurl=unix:///var/run/supervisor.sock,
IOError: [Errno 13] Permission denied: '/var/log/supervisor/supervisord.log'
原因: /var/log/supervisor/supervisord.log没有写权限,赋予权限即可:
sudo chmod -R 777 /var/log/supervisor/supervisord.log
Error: Cannot open an HTTP server: socket.error reported errno.EACCES (13)
配置文件中 /var/run 文件夹,没有授予启动 supervisord 的相应用户的写权限。/var/run 文件夹实际上是链接到 /run,因此我们修改 /run 的权限
sudo chmod 777 /run
一般情况下,我们可以用 root 用户启动 supervisord 进程,然后在其所管理的进程中,再具体指定需要以那个用户启动这些进程。
'INFO spawnerr: unknown error making dispatchers for 'app_name': EACCES'
修改日志文件的权限
sudo chmod 777 /usr/log/supervisor/supervisor.log
sudo chmod 777 /usr/log/supervisor/youAppName.log
Exited too quickly (process log may have details)
有可能是当前文件已经运行
kill 调当前的进程,再试试运行
执行指令
supervisord -c /etc/supervisord.conf
supervisorctl start all
supervisor 日志异常及对应解决方案参考:
https://blog.csdn.net/JineD/article/details/109748436
使用supervisorctl 命令报错unix:///var/run/supervisor.sock no such file
其实是因为你的supervisorctl没有启动。
执行指令:
supervisord -c /etc/supervisord.conf
杀死所有正在运行的容器 docker kill $(docker ps -a -q) 删除所有已经停止的容器 docker rm $(docker ps -a -q) 1 批量删除容器 docker rm docker ps -a -q || docker rm $(docker ps -a -q) 删除所有未打 dangling 标签的镜 docker rmi $(docker images -q -f dangling=true) 删除所有镜像 docker rmi $(docker images -q) 强制删除 无法删除的镜像 docker rmi -f <IMAGE_ID> docker rmi -f $(docker images -q) 2 批量删除镜像 docker rmi docker images -q 3 按条件过滤删除 docker rmi -f docker images | grep '<none>' | awk '{print $3}' #删除名称或标签为none的镜像 4 查看所有容器id docker ps -a -q 5 停止所有容器 docker stop $(docker ps -a -q)
MongoEngine是基于Python的对象系统设计的MongoDB专用的ORM框架。与SQLAlchemy不同的是,MongoEngine会自动生成一个唯一的标识,用ID属性表示。当然MongoEngine与SQLAlchemy还有很对不同的地方,比如字段类型等。
操作符的表示形式为:加在关键字后面使用"+操作符"(此处是两个" _ "),例如:publish_datagt
ne:不等于
lt:小于
lte:小于或等于
gt:大于
gte:大于或等于
not:对一个操作符取否,例如publish_data__not__gt
in:值在列表中
nin:值不在列表中
mod:值%a==b,a和b用(a,b)的方式传递
all:列表中的所有值都在该字段中
size:列表的大小
existes:在该字段中存在这个值
exact:字符串相等
iexact:字符串相等(大小写不敏感)
contains:字符串包含该值
icontains:字符串包含该值(大小写不敏感)
startswith:字符串以该值开始
istartswith:字符串以该值开始(大小写不敏感)
endswith:字符串以该值结束
iendswith:字符串以该值结束(大小写不敏感)
set:设置一个值
unset:删除一个值
inc:将值自增
dec:将值自减
push:把一个值加到列表的末尾
push_all:把几个值加到列表的末尾
pop:移除列表中的第一个或者是最后一个值
pull:移除列表中的值
pull_all:移除列表中的几个值
add_to_set:当且晋档某值不在列表中时,将其添加进列表
(1)查找表的所有 collection name conn.list_collection_names(session=None) (2)按条件查找数据 conn.ocrRecognizesData.find({"date":{'$gt':"2019-10-23 13:41"}}, {"字段1":1, "字段2":1}) (3)按条件删除数据 conn.ocrRecognizesData.remove({"date":{'$gt':"2019-10-23 17:26:38"}}) (4)按条件更新数据 #新增字段 conn.image_log.update({}, {'$set': {"rollInvoice":{}}}) (5)按条件删除数据 #删除字段 unset conn.plane_log.update({},{"$unset":{'detail':1}}) (6)更新字段名称 conn.collectionName.update_many({}, {'$rename': {'oldname': 'newname'}}) (7)对content字段里的title/caseType/judgementType建立索引 db.getCollection('chongqing').createIndex({"content.title":1,"content.judgementType":1, "content.caseType":1}) (8)查询集合索引 db.getCollection('chongqing').getIndexes() (9)查看索引集合大小 db.getCollection('chongqing').totalIndexSize() (10)删除集合所有索引 db.getCollection('chongqing').dropIndexes() (11) 删除集合指定索引 db.getCollection('chongqing').dropIndex('索引名')
(1)data = db.find({"COMP_NAME":{"$in":comp_name_list},"REPORT_PERIOD":{"$in":report_time_list}},dict(zip(dic.keys(),[1]*len(dic.keys()))))
(2) sql_yzh = 'select * from risk_ratmatrix_threshold_formula_plus'
df_yzh = pd.read_sql(sql=sql_yzh,con=mysql_conn)
单行 apply 操作 传参
(1) df_fenshu[col] = df[col].apply(util.get_score_by_get_rule,**{"zhi_list":val_list,"formula_list":formula_panduan,"flag":flag})
from mongoengine import
from datetime import datetime
连接数据库
connect('blog') # 连接本地blog数据库**
如需验证和指定主机名
connect('blog', host='192.168.3.1', username='root', password='1234')
(1) 定义模型 class Categories(Document): ' 继承Document类,为普通文档 ' name = StringField(max_length=30, required=True) artnum = IntField(default=0, required=True) date = DateTimeField(default=datetime.now(), required=True) (2)插入 cate = Categories(name="Linux") # 如果required为True则必须赋予初始值,如果有default,赋予初始值则使用默认值 cate.save() # 保存到数据库 (3)查询和更新 返回集合里的所有文档对象的列表 cate = Categories.objects.all() 返回所有符合查询条件的结果的文档对象列表 cate = Categories.objects(name="Python") 更新查询到的文档: cate.name = "LinuxZen" cate.update()
1, 建立运行脚本:
cd /home/erick/xmind-8-update7-linux/XMind_amd64/
转到指定的解压目录下。
sudo gedit run.sh
建立脚本文件
在文件中输入下面的内容:
cd /home/erick/xmind-8-update7-linux/XMind_amd64/` #安装目录
`/home/erick/xmind-8-update7-linux/XMind_amd64/XMind #启动指令
我们必须转到XMind运行文件所在的目录,才能正确运行,否则会报错!!
在完成文件的编辑后,输入sudo chmod +x ./run.sh
2, 建立.desktop文件*
建立文件之前,自己去百度一个喜欢的图标,作为XMind快捷方式的图标。
输入命名:
cd /usr/share/applications
转到建立运行程序的目录
sudo gedit xmind.desktop
建立图标,xmind是可以自己命名的
在文件中输入:
[Desktop Entry]
Name=XMind
Exec=/home/erick/xmind-8-update7-linux/XMind_amd64/run.sh
Icon=/home/erick/Pictures/xmind.jpg
Type=Application
Categories=GTK;GNOME;Office;
Exec=后面是我们之前建立脚本的目录
Icon=后面是自己定义的图标的目录
Type=Application 说明这是一个应用程序
Categories=GTK;GNOME;Office; Office表示所属的大目录是Office的分类,大家可以根据实际情况具体更改
最后,就可以再Office的目录下看到图标了,也可以加入快速启动栏,vim
示例:
as_views(Templates)
/docx-templates post 上传生成文档
/docx-templates get 条件查询接口
as_views(Template)
/docx-template/<id> post 更新单一文档
/docx-template/<id> get id查询单一文档
/docx-template/<id> delete 删除单一文档
Sanic 使用技巧
https://github.com/howie6879/Sanic-For-Pythoneer/blob/master/docs/part1/6.%E5%B8%B8%E7%94%A8%E7%9A%84%E6%8A%80%E5%B7%A7.md
参考url:https://www.cnblogs.com/haozi0804/p/12534884.html
(1), Celery是Python开发的分布式任务调度模块,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子:
异步发送验证邮件
定时每晚统计报表
Celery 在执行任务时需要通过一个消息中间件(RabbitMQ或者Redis)来接收和发送任务消息,以及存储任务结果(RabbitMQ、Redis、MySQL、MongoDB等)
(2),Celery有以下优点:
(3), Celery基本工作流程图
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5Q1ZV0Kr-1608184987606)(/home/duanweiye/图片/2020-05-03 16-25-40 的屏幕截图.png)]
这里采用Redis作为消息中间件,MongoDB作为任务结果存储
$ sudo pip install -U "celery[redis]"
$ sudo pip install celery # celery版本4.1或以上
from celery import Celery # configs是自己定义的一字典,包含数据库的密码等等信息 class CeleryConfig: # 任务消息队列 BROKER_URL = "redis://:{0}@{1}:{2}/{3}".format( configs["redis"]["mydb"]["PASSWORD"], configs["redis"]["mydb"]["HOST"], configs["redis"]["mydb"]["PORT"], configs["redis"]["mydb"]["DB_CELERY"], ) # 序列化方式 CELERY_TASK_SERIALIZER = 'msgpack' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # backend 可用 mq, reddis, mogodb 都可以 # 结果保存, 这里也可以继续保存到redis,但是断电结果会消失 CELERY_RESULT_BACKEND = 'mongodb://{0}:{1}/{2}'.format( configs["mongodb"]["mydb"]["HOST"], configs["mongodb"]["mydb"]["PORT"], configs["mongodb"]["mydb"]["DB"], ) CELERY_MONGODB_BACKEND_SETTINGS = { "taskmeta_collection": "celery" # 表 # "user": , # "password": } # 完成一定数量后重启该worker CELERY_WORKER_MAX_TASKS_PER_CHILD = 2000 CELERY_TIMEZONE = "Asia/Shanghai" # 第二步 ctask = Celery("lawplatform") ctask.config_from_object(CeleryConfig) #第三步 @ctask.task(name="test") def add(x, y): return x + y # 第四步 from flask_celery import add app = Flask(__name__) @app.route("/test") def test(): add.delay(3, 4) return ''
celery worker -l INFO -A celery_tasks
这里是两种不同的定时方式,crontab是基于时间(所以最好要先指定好时区),timedelta是基于时间周期 from celery.schedules import crontab from datetime import timedelta CeleryConfig.CELERYBEAT_SCHEDULE = { # 1 'every-3am': { 'task': 'test', # 这里就是task的name属性 "schedule": crontab(minute=0, hour=3), # 每天的凌晨3点执行 "args": (1, 3) # 如果任务不需要参数,这里可以省略 }, # 2 "test": { "task": 'test', "schedule": timedelta(seconds=5), "args": (1, 3) } } ctask = Celery("lawplatform") ctask.config_from_object(CeleryConfig)
celery worker -l DEBUG -A celery_tasks -B # 多了一个-B的参数
celery的使用方式就是这么简单,其他还有更高级的方法(bind、retry等等)
https://docs.celeryproject.org/en/latest/
Example | Meaning |
---|---|
crontab() | Execute every minute. |
crontab(minute=0, hour=0) | Execute daily at midnight. |
crontab(minute=0, hour='*/3' ) | Execute every three hours: 3am, 6am, 9am, noon, 3pm, 6pm, 9pm. |
crontab(minute=0,hour=’0,3,6,9,12,15,18,21’) | Same as previous. |
crontab(minute='*/15' ) | Execute every 15 minutes. |
crontab(day_of_week=’sunday’) | Execute every minute (!) at Sundays. |
crontab(minute='*' ,hour='*' , day_of_week=’sun’) | Same as previous. |
crontab(minute='*/10' ,hour=’3,17,22’, day_of_week=’thu,fri’) | Execute every ten minutes, but only between 3-4 am, 5-6 pm and 10-11 pm on Thursdays or Fridays. |
crontab(minute=0, hour='*/2,*/3' ) | Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm |
crontab(minute=0, hour='*/5' ) | Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5). |
crontab(minute=0, hour='*/3,8-17' ) | Execute every hour divisible by 3, and every hour during office hours (8am-5pm). |
crontab(day_of_month=’2’) | Execute on the second day of every month. |
crontab(day_of_month=’2-30/3’) | Execute on every even numbered day. |
crontab(day_of_month=’1-7,15-21’) | Execute on the first and third weeks of the month. |
crontab(day_of_month=’11’,month_of_year=’5’) | Execute on 11th of May every year. |
crontab(month_of_year='*/3' ) | Execute on the first month of every quarter. |
往桶更新文件
import boto3 def upload_file(file_name, bucket): """ Function to upload a file to an S3 bucket """ object_name = file_name # s3_client = boto3.client('s3') s3_client = boto3.client( 's3', aws_access_key_id="AKIA5ZE6DXVFYBDBB4NP", #设置accesskey、secretkey、sessiontoken aws_secret_access_key="ZhNueULyW1HFPcFD32Ub/co/7UOIea8S3pYYUWY6", # aws_session_token="sessionToken", region_name="ap-northeast-1", # endpoint_url="theast-1.compute.amazonaws.com # 13.115.91.185" ) response = s3_client.upload_file(file_name, bucket, object_name) return response
删除文件
s3_dest_client.delete_object(
Bucket=DesBucket,
Key=prefix_and_key
)
从桶中下载文件
def download_file(file_name, bucket): """ Function to download a given file from an S3 bucket """ # s3 = boto3.resource('s3') s3 = boto3.resource( 's3', aws_access_key_id="AKIA5ZE6DXVFYBDBB4NP", #设置accesskey、secretkey、sessiontoken aws_secret_access_key="ZhNueULyW1HFPcFD32Ub/co/7UOIea8S3pYYUWY6", # aws_session_token="sessionToken", region_name="ap-northeast-1", # endpoint_url="theast-1.compute.amazonaws.com # 13.115.91.185" ) output = f"downloads/{file_name}" # s3.Bucket(bucket).download_file(file_name, output) # s3.Bucket(bucket).download_file(file_name, output) s3.Object(bucket, file_name).download_file(output) return output
5, 压力测试
说完了如何对Sanic编写的服务进行单元测试,接下来稍微讲下如何进行压力测试,压力测试最好在内外网都进行测试下,当然服务器配置是你定,然后在多个服务器上部署好服务,启动起来,利用负载均衡给压测代码一个固定的ip,这样对于服务的水平扩展测试就会很方便。
压力测试 locust
test 下的目录结构
├── locust_rss
│ ├── __init__.py
│ ├── action.py
│ ├── locust_rss_http.py
│ ├── locustfile.py
│ └── utils.py
├── setting.py
└── test_rss.py
新增了locust_rss
文件夹,首先在action.py
定义好请求地址与请求方式:
HTTP_URL = "http://0.0.0.0:8000/v1/post/rss/"
GRPC_URL = "0.0.0.0:8990"
def json_requests(client, data, url):
func_name = inspect.stack()[1][3]
headers = {'content-type': 'application/json'}
return post_request(client, data=json.dumps(data), url=url, func_name=func_name, headers=headers)
#
def action_rss(client):
data = {
"name": "howie6879"
}
json_requests(client, data, HTTP_URL)
压测怎么个压测法,请求哪些接口,接口请求怎么分配,都在locust_rss_http.py
里定好了:
class RssBehavior(TaskSet):
@task(1)
def interface_rss(self):
action.action_rss(self.client)
然后需要发送请求给目标,还需要判断是否请求成功,这里将其封装成函数,放在utils.py
里,比如post_request
函数:
def post_request(client, data, url, func_name=None, **kw):
"""
发起post请求
"""
func_name = func_name if func_name else inspect.stack()[1][3]
with client.post(url, data=data, name=func_name, catch_response=True, timeout=2, **kw) as response:
result = response.content
res = to_json(result)
if res['status'] == 1:
response.success()
else:
response.failure("%s-> %s" % ('error', result))
return result
locustfile.py
是压测的启动文件,必不可少,我们先请求一次,看看能不能请求成功,如果成功了再将其正式运行起来:
cd Sanic-For-Pythoneer/examples/demo06/sample/tests/locust_rss # 只想跑一次看看有没有问题 记得先将你编写的服务启动起来哦 locust -f locustfile.py --no-web -c 1 -n 1 # Output: 表示没毛病 [2018-01-14 14:54:30,119] 192.168.2.100/INFO/locust.main: Shutting down (exit code 0), bye. Name # reqs # fails Avg Min Max | Median req/s -------------------------------------------------------------------------------------------------------------------------------------------- POST action_rss 1 0(0.00%) 1756 1756 1756 | 1800 0.00 -------------------------------------------------------------------------------------------------------------------------------------------- Total 1 0(0.00%) 0.00 Percentage of the requests completed within given times Name # reqs 50% 66% 75% 80% 90% 95% 98% 99% 100% -------------------------------------------------------------------------------------------------------------------------------------------- POST action_rss 1 1800 1800 1800 1800 1800 1800 1800 1800 1756 --------------------------------------------------------------------------------------------------------------------------------------------
好了,没问题了,可以执行locust -f locustfile.py
,然后访问http://0.0.0.0:8089/
,如下图:
我在项目中主要使用redis作为缓存,使用aiocache很方便就完成了我需要的功能,当然自己利用aioredis编写也不会复杂到哪里去。
比如上面的例子,每次访问http://0.0.0.0:8000/v1/get/rss/howie6879,都要请求一次对应的rss资源,如果做个缓存那岂不是简单很多?
修改:
@cached(ttl=1000, cache=RedisCache, key="rss", serializer=PickleSerializer(), port=6379, namespace="main") async def get_rss(): print("第一次请求休眠3秒...") await asyncio.sleep(3) url = "http://blog.howie6879.cn/atom.xml" feed = parse(url) articles = feed['entries'] data = [] for article in articles: data.append({"title": article["title_detail"]["value"], "link": article["link"]}) return data @api_bp.route("/get/rss/<name>") async def get_rss_json(request, name): if name == 'howie6879': data = await get_rss() return json(data) else: return json({'info': '请访问 http://0.0.0.0:8000/v1/get/rss/howie6879'})
为了体现缓存的速度,首次请求休眠3秒,请求过后,redis中就会将此次json数据缓存进去了,下次去请求就会直接冲redis读取数据。
带上装饰器,什么都解决了。
在web中使用时,可以同一个进程共享所有的redis连接线程,并且自己设置最大连接数,防止不够或者创建太多连接,代码如下:
# coding: utf-8 import redis class RedisPool(object): __pool = None def __new__(cls, cfs, *args, **kargs): if not cls.__pool: cls.__pool = super(RedisPool, cls).__new__(cls, *args, **kargs) cls.__pool.pool = redis.ConnectionPool( host=cfs.get("HOST"), port=int(cfs.get("PORT")), db=cfs.get("DB"), password=cfs.get("PASSWORD"), max_connections=int(cfs.get("MAX_CONNECTIONS")) ) return cls.__pool def __init__(self, cfs): super(RedisPool, self).__init__(cfs) def get_connection(self): return redis.StrictRedis(connection_pool=RedisPool.__pool.pool) @property def conn(self): '''官方推荐使用StrictReids而不推荐使用Redis''' return redis.StrictRedis(connection_pool=RedisPool.__pool.pool) if __name__ = "__main__": redis_config = { "HOST": "192.168.10.54", "PORT": 6002, "DB": 3, "MAX_CONNECTIONS": 20, "PASSWORD": "6752"} redis = RedisPool(redis_config) # 这里conn和redis.conn是等价的,用完之后都会被redispool 自动回收 conn = redis.get_connection() print(redis.conn.dbsize()) print(conn.dbsize())
pip install flask_migrate flask_script
from flask import Flask from flask_script import Manager from flask_migrate import Migrate, MigrateCommand from exts.db import db from config import Config app = Flask(__name__) app.config.from_object(Config) db.init_app(app) # 初始化 migrate # 两个参数一个是 Flask 的 app,一个是数据库 db migrate = Migrate(app, db) # 初始化管理器 manager = Manager(app) # 添加 db 命令,并与 MigrateCommand 绑定 manager.add_command('db', MigrateCommand) # 导入model from models.user import User # 打开注释,重新migrate,查看效果 #from models.post import Post if __name__ == '__main__': manager.run()
1. python migrate.py db init # 初始化版本库 2. python migrate.py db migrate -m '第一个版本' 3. python migrate.py db upgrade # 提交版本 4. 修改models中的属性,再重复2和3 先查看版本号 python xxx.py db history 还原版本 python xxx.py db downgrade(upgrade) 版本号 5. python migrate.py db downgrade # 回退版本 可以看到很方便的控制model的更新 修改model 或者新增model 修改完成后,继续创建新的迁移脚本 python 文件 db migrate -m"新版本名(注释)" 更新数据库 python3 xxx.py db upgrade 更新完之后,其实就是提交操作,类似于 git 添加一个新的版本。
from flask_restful.reqparse import RequestParser 1创建对象:reqser = ReqserParser() 2添加参数声明: reqser.add_argument(‘name’) required 是否必须存在 True 必须 False 默认 help 错误时返回错误信息 type 参数类型 系统类型: int float , file, 使用: inputs type=inputs.int_range(1,100) 参数位置 location 指定从哪里获取参数 locaton=‘args’/'forme 2.1 choices: 这个参数用来检查输入参数的范围 parser.add_argument(’-u’,type=int,choices=[1,3,5]) 2.2 parser.parse_args(’-u 3’.split()) Namespace(u=3) 如果你要接受一个键有多个值的话,你可以传入 action='append' parser.add_argument('name', type=str, action='append')
单位置: location 指定从哪里获取参数
# Look only in the POST body
parser.add_argument('name', type=int, location='form')
# Look only in the querystring
parser.add_argument('PageSize', type=int, location='args')
# From the request headers
parser.add_argument('User-Agent', type=str, location='headers')
# From http cookies
parser.add_argument('session_id', type=str, location='cookies')
# From file uploads
parser.add_argument('picture', type=werkzeug.datastructures.FileStorage, location='files')
通过传入一个列表到 location
中可以指定 多个 参数位置:
parser.add_argument('text', location=['headers', 'values'])
flask_restx 接口
(1), 定义接口描述
from flask_restx import Resource, Namespace, fields
docx_template_api = Namespace('docx-templates', description='模板相关API')
(2), 定义 model 返回字段
model:定义标准化请求内容,或者是响应内容
请求内容与@docx_template_api.doc(body=crud_resp)联用
响应内容与@docx_template_api.marshal_with(crud_resp) 联用
DocxTemplateBody model 名称
base_resp 抽取基础字段
crud_resp = docx_template_api.clone("DocxTemplateBody", marshalling_models.base_resp,
{
"data": fields.Nested(marshalling_models.template_model, skip_none=True),
}
(3)详解示例
@acs.route('/accounts') # 是路由地址
@acs.expect(parser) # 这里就是添加请求头参数了, 装饰在class上面就是所有的方法都需要这个参数
@acs.response(400, 'params error') # 响应内容,装饰在class上面就是所有的方法都回有这个相应
class CreateAccounts(Resource):
@acs.doc('A description of what this function does') # post 接口描述
@acs.doc(body=acc_obj) # 把acc_obj这个model放入请求的body中,它是可以做validate的,只要在model后面加上validate=True。
@acs.marshal_with(token) 把token这个model放入相应体中,也是可以做validate的,如果响应体写了,那么最后return的时候必须以键值对的形式return该model的参数名
def post(self):
...
return {'accessToken': b}
变量名 | 上下文 | 说明 |
---|---|---|
current_app | 程序上下文 | 当前激活程序的程序实例 |
g | 程序上下文 | 处理请求时用作临时存储的对象,每次请求都会重设这个变量 |
Session | 请求上下文 | 请求对象,封装了客户端发出的HTTP请求中的内容 |
request | 请求上下文 | 用户会话,用于存储请求之间需要记住的值的词典 |
Flask 在分发请求之前激活(或推送)程序和请求上下文,请求处理完成后再将其删除。程 序上下文被推送后,就可以在线程中使用 current_app 和 g 变量。类似地,请求上下文被 推送后,就可以使用 request 和 session 变量。如果使用这些变量时我们没有激活程序上 下文或请求上下文,就会导致错误。
作者:小小看护
链接:https://www.jianshu.com/p/dfe1ee1dc1ec
import pymysql
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123456', db='s8day127db')
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
# cursor.execute("select id,name from users where name=%s and pwd=%s",['lqz','123',])
cursor.execute("select id,name from users where name=%(user)s and pwd=%(pwd)s",{'user':'lqz','pwd':'123'})
obj = cursor.fetchone()
conn.commit()
cursor.close()
conn.close()
print(obj)
from datetime import timedelta from redis import Redis import pymysql from DBUtils.PooledDB import PooledDB, SharedDBConnection class Config(object): DEBUG = True SECRET_KEY = "umsuldfsdflskjdf" PERMANENT_SESSION_LIFETIME = timedelta(minutes=20) SESSION_REFRESH_EACH_REQUEST= True SESSION_TYPE = "redis" PYMYSQL_POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123456', database='s8day127db', charset='utf8' ) class ProductionConfig(Config): SESSION_REDIS = Redis(host='192.168.0.94', port='6379') class DevelopmentConfig(Config): SESSION_REDIS = Redis(host='127.0.0.1', port='6379') class TestingConfig(Config): pass
import pymysql from settings import Config class SQLHelper(object): @staticmethod def open(cursor): POOL = Config.PYMYSQL_POOL conn = POOL.connection() cursor = conn.cursor(cursor=cursor) return conn,cursor @staticmethod def close(conn,cursor): conn.commit() cursor.close() conn.close() @classmethod def fetch_one(cls,sql,args,cursor =pymysql.cursors.DictCursor): conn,cursor = cls.open(cursor) cursor.execute(sql, args) obj = cursor.fetchone() cls.close(conn,cursor) return obj @classmethod def fetch_all(cls,sql, args,cursor =pymysql.cursors.DictCursor): conn, cursor = cls.open(cursor) cursor.execute(sql, args) obj = cursor.fetchall() cls.close(conn, cursor) return obj
AttributeError 试图访问一个对象没有的树形,比如foo.x,但是foo没有属性x
IOError 输入/输出异常;基本上是无法打开文件
ImportError 无法引入模块或包;基本上是路径问题或名称错误
IndentationError 语法错误(的子类) ;代码没有正确对齐
IndexError 下标索引超出序列边界,比如当x只有三个元素,却试图访问x[5]
KeyError 试图访问字典里不存在的键
KeyboardInterrupt Ctrl+C被按下
NameError 使用一个还未被赋予对象的变量
SyntaxError Python代码非法,代码不能编译(个人认为这是语法错误,写错了)
TypeError 传入对象类型与要求的不符合
UnboundLocalError 试图访问一个还未被设置的局部变量,基本上是由于另有一个同名的全局变量,
导致你以为正在访问它
ValueError 传入一个调用者不期望的值,即使值的类型是正确的
为了保证程序的健壮性与容错性,即在遇到错误时程序不会崩溃,我们需要对异常进行处理
try:
被检测的代码块
except 异常类型 as f: #将错误类型赋值给f
try中检测到异常就执行这句
else:
print('没有任何报错是触发')
所有异常报错都接收
1.except Exception as f:
2.except BaseException as f:
ry:
被检测的代码块
except Exception as f: #万能异常
try中检测到异常就执行这句
try:
被检测的代码块
except BaseException as f: #万能异常
try中检测到异常就执行这句
try:
被检测的代码块
except Exception as f: #万能异常
try中检测到异常就执行这句
finally:
print('无论是否发生异常,都执行finally内部代码')
关键字:raise
raise 随便写的纠错类型(内容)
a = 12
if a == 123:
pass
else:
raise bushi('不对')
断言 assert
猜某个数据状态,猜对正常走,猜错报错
驼峰命名法
func.__dict__
由对象来调用类内部的函数,称之为对象的绑定方法。
对象的绑定方法特殊之处: 会将对象当做第一个参数传给该方法。
- 新式类:广度优先
1.凡是继承object的类或子孙类都是新式类。
2.在python3中所有的类都默认继承object,都是新式类。
- 经典类:深度优先
1.在python2中才会有经典类与新式类之分。
2.在python2中,凡是没有继承object的类,都是经典类。
继承是一种创建新类的方式,在python中,新建的类可以继承一个或多个父类,父类又可称为基类或超类(super),新建的类称为派生类或子类
**继承:是基于抽象的结果,通过编程语言去实现它,肯定是先经历抽象这个过程,才能通过继承的方式去表达出抽象的结构。**
抽象只是分析和设计的过程中,一个动作或者说一种技巧,通过抽象可以得到类
在python2经典类中继承的顺序是: 深度优先 在python3新式类中继承的顺序是: 广度优先 多继承, 继承顺序为从左至右 检查super的继承顺序mro()方法 在python3中提供了一个查找新式类查找顺序的内置方法. mro(): 会把当前类的继承关系列出来 多继承的情况下: 从左到右 class A: def test(self): print('from A.test') super().test() class B: def test(self): print('from B.test') class C(A, B): pass c = C() # 检查super的继承顺序mro()方法 print(C.mro()) # 结果: [<class '__main__.C'>, <class '__main__.A'>, <class '__main__.B'>, <class 'object'>]
issubclass(a,B):判断a是不是B的子类
参数a:子类
参数B:父类
issubclass 判断传入的obj是不是Animal的子类
def manage(obj):
if issubclass(type(obj), Animal):
obj.eat()
else:
print('不是动物')
封装:
1.面向对象最重要的是封装
2.隐藏对象的属性和实现细节,仅对外提供公共访问方式
1.将不需要对外提供的内容都隐藏起来
2.把属性都隐藏,提供公共方法对其访问。
1.讲变化隔离
2.便于使用
3.提高复用性
4.提高安全性
私有属性、私有方法:1.让一些关键的数据,变成私有更加的安全
2.不是随意可以更改的
3.在属性,和方法前面加’__‘,变成私有,那么外界就不可以直接调用修改。
4.但是:在类的内部可以定义一个函数,方法调用修改。使用者直接调用这个函数就可以了。这个函数就是接口
5.可以在这个函数、方法加条件限制,而不是任意的改动
*在python中用双下划线开头的方式将属性隐藏起来(设置成私有的)
原理: 替换变量名称 方法名 替换为:_类名__方法名
#其实这仅仅这是一种变形操作
#类中所有双下划线开头的名称如__x都会自动变形成:_类名__x的形式
在继承中,父类如果不想让子类覆盖自己的方法,可以将方法定义为私有的
#把fa定义成私有的,即__fa
class A:
def __fa(self): #在定义时就变形为_A__fa
print('from A')
def test(self):
self.__fa() #只会与自己所在的类为准,即调用_A__fa
class B(A):
def __fa(self):
print('from B')
b=B()
b.test()
from A
(3)、访问限制机制
把内部方法、属性私有化,让外面不能直接调用,而是通过内部给的一个接口调用。这个接口可以控制调用方法的顺序,条件等
(4)、封装与扩展性
(5)、组合
- 继承:
继承是类与类的关系,子类继承父类的属性/方法,子类与父类是一种 “从属” 关系。
- 组合:
组合是对象与对象的关系,一个对象拥有另一个对象中的属性/方法,是一种什么有什么的关系。
classmethod:
是一个装饰器,给在类内部定义方法中装饰,将类内部的方法变为 “类的绑定方法”。
staticmethod:
翻译: 静态方法
是一个装饰器,给在类内部定义方法中装饰,将类内部的方法变为 “非绑定方法”。
- 对象的绑定方法:
- 由对象来调用,由谁来调用,会将谁(对象)当做第一个参数传入。
- 类的绑定方法:
- 由类来调用,由谁来调用,会将谁(类)当做第一个参数传入。
- 非绑定方法:
- 可以由对象或类来调用,谁来调用都是一个普通方法(普通函数),方法需要传入几个参数,就得传入几个。
# 对象绑定方法,需要实例化出一个对象
keji = Dog()
keji.eat()
# 类绑定方法,不需要对象,直接通过类
zhu = Pig.eat()
装饰器:为了调用方式一致,方便使用者
@property首先创建一个age对象,所以@setter、@deleter要加函数名
:这里的age是装饰方法的名称
@property(获取私有) :把一个方法伪装成普通属性,通常函数名和属性名保持一致(方便使用者调用)
@函数名.setter(修改私有):函数名和属性名保持一致
@函数名.deleter(控制删除):
反射使用场景:
1.反射就是对属性的增删改查,但是如果直接使用内置的 dict来操作,语法繁琐,不好理解
2.如果对象是别人提供的,判断这个对象是否满足要求
hasattr(p,'name'):查找p对象中是否存在name属性
getattr(p,'name'):取值,p对象中name属性
setattr(p,'name','jeff'):添加,为p对象中添加name属性jeff
delattr(p,'name'):删除,删除p对象中name属性
判断 class 中 x 某些属性是否存在 class 中
class Foo:
def __init__(self, x, y):
self.x = x
self.y = y
foo_obj = Foo(10, 20)
# 通过字符串x 判断对象中是否有 x属性
print(hasattr(foo_obj, 'x')) # True
print(hasattr(foo_obj, 'z')) # False
获取 class 中属性的值
class Foo:
def __init__(self, x, y):
self.x = x
self.y = y
foo_obj = Foo(10, 20)
res = getattr(foo_obj, 'x')
print(res) # 10
设置 class 属性值
class Foo:
def __init__(self, x, y):
self.x = x
self.y = y
foo_obj = Foo(10, 20)
setattr(foo_obj, 'z', 30) # 为对象添加z属性值为30
print(hasattr(foo_obj, 'z')) # True
删除 class 中 某属性
class Foo:
def __init__(self, x, y):
self.x = x
self.y = y
foo_obj = Foo(10, 20)
delattr(foo_obj, 'x') # 删除对象中x属性
print(hasattr(foo_obj, 'x'))
通过用户输入的字符串,判断是否存在,getattr取值,加括号调用相应的功能函数
class FileControl: def run(self): while True: # 让用户输入上传或下载功能的命令: user_input = input('请输入 上传(upload) 或 下载(download) 功能:').strip() # 通过用户输入的字符串判断方法是否存在,然后调用相应的方法 if hasattr(self, user_input): func = getattr(self, user_input) func() else: print('输入有误!') def upload(self): print('文件正在上传...') def download(self): print('文件正在下载...') file = FileControl() file.run()
类的内置方法(魔法方法): 凡是在类内部定义,以__开头__结尾的方法,都是类的内置方法,也称之为魔法方法。 类的内置方法,会在某种条件满足下自动触发。 内置方法如下: __new__: 在__init__触发前,自动触发。 调用该类时,内部会通过__new__产生一个新的对象。 __init__: 在调用类时自动触发。 通过产生的对象自动调用__init__() __getattr__: 在 “对象.属性” 获取属性时,若 “属性没有” 时触发。 __getattribute__: 在 “对象.属性” 获取属性时,无论 "属性有没有" 都会触发。 # 注意: 只要__getattr__ 与 __getattribute__ 同时存在类的内部,只会触发__getattribute__。 __setattr__:当 “对象.属性 = 属性值” , 添加或修改属性时触发 __call__ : 在调用对象 “对象 + ()” 时触发。 即:对象() 或者 类()() __str__ : 在打印对象时触发。 # 注意: 该方法必须要有一个 “字符串” 返回值。 __getitem__: 在对象通过 “对象[key]” 获取属性时触发。 __setitem__: 在对象通过 “对象[key]=value值” 设置属性时触发。 __gt__,__lt__,__eq__:自定义比较对象大小双下:gt、lt、eq __enter__: 进入文件时,开打文件时执行。返回值:self __exit__: 退出文件时,报错中断、或者代码执行完时执行。 返回值:可以有返回值,是bool类型 __del__ : 手动删除时立马执行,或者程序运行结束时自动执行 使用场景:当你的对象使用过程中,打开了不属于解释器的资源;例如,文件,网络端口 __slots__:原理,给对象声明只有某些属性,从而删除不必要的内存,不能添加新属性 使用场景:1.优化对象内存 2.限制属性数量
因为类继承了object,所有在调用类之前会自动先执行双下new来创建对象。这里的双下new重写了object中的双下new,所以此时不能创建出对象。
slots:原理,给对象声明只有某些属性,从而删除不必要的内存,不能添加新属性
1.优化对象内存
2.限制属性数量
__slots__案例:slots使用 # 第一种:不优化内存 import sys class Person: slots = ['name'] # 声明名称空间只有name属性 def __init__(self, name): self.name = name p = Person('jeff') print(sys.getsizeof(p)) # 结果:56 # 第二种:优化内存 import sys class Person: __slots__ = ['name'] # 声明名称空间只有name属性 def __init__(self, name): self.name = name p = Person('jeff') print(sys.getsizeof(p)) #结果:48 #####内存减少了8
单例:一个对象,多次使用相同的。节约内存空间
单例模式:
指的是在确定 "类中的属性与方法" 不变时,需要反复调用该类,
产生不同的对象,会产生不同的内存地址,造成资源的浪费。
让所有类在实例化时,指向同一个内存地址,称之为单例模式。 ----> 无论产生多个对象,都会指向 单个 实例。
- 单例的优点:
节省内存空间。
1.通过classmethod 类绑定方法
2.通过装饰器实现
3.通过__new__实现
4.通过导入模块时实现
5.通过元类实现。
(1) 通过classmethod 类绑定方法
# 第一种:不用单例 # 两个不同的对象,地址不一样 class MySQL(): def __init__(self, host, port): self.host = host self.port = port def singleton(self): pass obj1 = MySQL('192.168.0.101',3306) obj2 = MySQL('192.168.0.101',3306) print(obj1) print(obj2) # 结果: <__main__.MySQL object at 0x000001FE458A8470> <__main__.MySQL object at 0x000001FE458A8518> # 第二种:用单例 # 一个对象重复使用,指向同一个地址,节约内存 class MySQL(): __instance = None # 标识对象是否已经存在 def __init__(self, host, port): self.host = host self.port = port @classmethod def singleton(cls, host, port): # 单例方法 if not cls.__instance: obj = cls(host,port) cls.__instance = obj # 如果__instance有值,证明对象已经存在,则直接返回该对象 return cls.__instance obj1 = MySQL.singleton('192.168.0.101',3306) obj2 = MySQL.singleton('192.168.0.101',3306) print(obj1) print(obj2) # 结果: <__main__.MySQL object at 0x000001D2B79D8518> <__main__.MySQL object at 0x000001D2B79D8518>
class Singleton(object): def __new__(cls, *args, **kwargs): if not hasattr(cls, '_instance'): res = super(Singleton, cls) cls._instance = res.__new__(cls, *args, **kwargs) return cls._instance class A(Singleton): pass obj1 = A() obj2 = A() print(obj1) print(obj2) # 结果: <__main__.A object at 0x000001F81A3B8518> <__main__.A object at 0x000001F81A3B8518>
class Singleton:
__instance = None
def __new__(cls, *args, **kwargs):
if not cls.__instance:
# 造一个空对象
cls.__instance = object.__new__(cls)
return cls.__instance
# 方式三: 装饰器 def singleton(cls): # cls---> Father _instance = {} def inner(*args, **kwargs): if cls not in _instance: obj = cls(*args, **kwargs) _instance[cls] = obj return _instance[cls] return inner @singleton class Father: pass print(Father()) print(Father())
Singleton.py文件中:
class SingletonCls:
pass
obj = SingletonCls()
# 另一个文件
from Singleton import obj
print(obj)
from Singleton import obj
print(obj)
from Singleton import obj
print(obj)
注意:只要继承了type 那么这个类就变成了一个元类
__call__在调用对象时触发
在python中一切皆对象,那么我们用class关键字定义的类本身也是一个对象,负责产生该对象的类称之为元类,即元类可以简称为类的类
元类是负责产生类的,所以我们学习元类或者自定义元类的目的:是为了控制类的产生过程,还可以控制对象的产生过程
创建类的方法有两种:
大前提:如果说类也是对象的话,那么用class关键字去创建类的过程也是一个实例化的过程
该实例化的目的是为了得到一个类,调用的是元类
方式一:用的默认的元类type
方式二:自定义元类继承type
class Mate(type):
def __call__(self, *args, **kwargs):
if args:
raise Exception('不允许')
return super().__call__(*args,**kwargs)
# 定义了一个元类
class A(metaclass=Mate):
def __init__(self, name):
self.name = name
a = A(name='jeff')
print(a.name)
十四、 常用功能
(1)查找单词/字符串的相近匹配
现在,关于 Python 标准库中一些晦涩难懂的特性。如果你发现自己需要使用Levenshtein distance 【2】之类的东西,来查找某些输入字符串的相似单词,那么 Python 的 difflib 会为你提供支持。
import difflib
difflib.get_close_matches('appel', ['ape', 'apple', 'peach', 'puppy'], n=2)
# returns ['apple', 'ape']
(2)使用IP地址
如果你必须使用 Python 做网络开发,你可能会发现 ipaddress 模块非常有用。一种场景是从 CIDR(无类别域间路由 Classless Inter-Domain Routing)生成一系列 IP 地址:
import ipaddress
net = ipaddress.ip_network('74.125.227.0/29') # Works for IPv6 too
# IPv4Network('74.125.227.0/29')
for addr in net:
print(addr)
# 74.125.227.0
# 74.125.227.1
# 74.125.227.2
# 74.125.227.3
# ...
ip = ipaddress.ip_address("74.125.227.3")
ip in net
# True
ip = ipaddress.ip_address("74.125.227.12")
ip in net
# False
参考链接:http://www.voidcn.com/article/p-ydjlyqdz-bce.html
常用的十大 python 图像处理工具:
https://baijiahao.baidu.com/s?id=1637822659435117454&wfr=spider&for=pc
二进制文件转base64
image_path = '/home/duanweiye/work_dirs/MQ-dev-3-17/MQ/tests/img/111.jpg'
img_content = open(image_path, 'rb').read()
# 转64
b64_data= base64.b64encode(img_content).decode(encoding='utf-8')
----------------------------------------------------------------------
二进制文件 转 array
image = imageio.imread(img_content)
h, w = image.shape[:2]
Bs64 转 bytes
# 64 转 bytes
base.b64decode(b64_data)
detection_img = imageio.imread(base64.b64decode(b64_direction_fixed_image))
import os import io from PIL import Image, ImageEnhance import base64 import imageio qrcode = data["img_data"] qrcode = base64.b64decode(qrcode) # 将字节对象转为Byte字节流数据,供Image.open使用 byte_stream = io.BytesIO(qrcode) roiImg = Image.open(byte_stream) # imageio.open(roiImg) # imageio.show() from PIL import Image import matplotlib.pyplot as plt # img=Image.open('d:/dog.png') plt.figure("dog") # plt.figure(num=1, figsize=(8,5),) # plt.title('The image title') plt.axis('off') # 不显示坐标轴 print(type(roiImg)) # print(type(img)) # print(img.size) #图片的尺寸 # print(img.mode) #图片的模式 # print(img.format) #图片的格式 plt.imshow(roiImg) plt.show()
import matplotlib.pyplot as plt # plt 用于显示图片
from PIL import Image, ImageEnhance
byte_stream = io.BytesIO(img_content)
roiImg = Image.open(byte_stream)
plt.imshow(roiImg) # 显示图片
plt.show()
image = imageio.imread(img_content)
h, w = image.shape[:2]
import cv2
import numpy as np
detection_img = image.copy()
for box in position:
x1, y1 = box["left_top"][0] * w, box["left_top"][1] * h
x2, y2 = box["right_top"][0] * w, box["right_top"][1] * h
x3, y3 = box["right_bottom"][0] * w, box["right_bottom"][1] * h
x4, y4 = box["left_bottom"][0] * w, box["left_bottom"][1] * h
pts = np.array([[x1, y1], [x2, y2], [x3, y3], [x4, y4]])
cv2.polylines(detection_img, [pts.reshape(-1, 2).astype(np.int32)], True, (255, 255, 0), 2)
cv2.imshow("test", cv2.cvtColor(detection_img, cv2.COLOR_RGB2BGR))
cv2.waitKey()
cv2.destroyAllWindows()
# ocr_img = image.copy()
# for box in json_data["bbox_info_list"]:
# x1, y1 = box["left_top"][0] * w, box["left_top"][1] * h
# x2, y2 = box["right_top"][0] * w, box["right_top"][1] * h
# x3, y3 = box["right_bottom"][0] * w, box["right_bottom"][1] * h
# x4, y4 = box["left_bottom"][0] * w, box["left_bottom"][1] * h
#
# pts = np.array([[x1, y1], [x2, y2], [x3, y3], [x4, y4]])
# cv2.polylines(ocr_img, [pts.reshape(-1, 2).astype(np.int32)], True, (255, 255, 0), 2)
# cv2.imshow("test", cv2.cvtColor(ocr_img, cv2.COLOR_RGB2BGR))
# cv2.waitKey()
# cv2.destroyAllWindows()
# h, w = image.shape[:2]
# ocr_img = image.copy()
# for box in recognition_result.get('bboxes_info_list'):
# if box.get("property") in ['qrcode']:
# x1, y1 = box["left_top"][0] * w, box["left_top"][1] * h
# x2, y2 = box["right_top"][0] * w, box["right_top"][1] * h
# x3, y3 = box["right_bottom"][0] * w, box["right_bottom"][1] * h
# x4, y4 = box["left_bottom"][0] * w, box["left_bottom"][1] * h
# qrcode_box = [[x1, y1], [x2, y2], [x3, y3], [x4, y4]]
# xs = [int(x[0]) for x in qrcode_box]
# ys = [int(x[1]) for x in qrcode_box]
# cropimage = ocr_img[min(xs):max(xs), min(ys):max(ys)]
img_array = imageio.imread(cropimage.encode())
detection_img = imageio.imread(base64.b64decode(b64_direction_fixed_image))
for box in position:
x1, y1 = box["left_top"][0] * w, box["left_top"][1] * h
x2, y2 = box["right_top"][0] * w, box["right_top"][1] * h
x3, y3 = box["right_bottom"][0] * w, box["right_bottom"][1] * h
x4, y4 = box["left_bottom"][0] * w, box["left_bottom"][1] * h
pts = np.array([[x1, y1], [x2, y2], [x3, y3], [x4, y4]])
cv2.polylines(detection_img, [pts.reshape(-1, 2).astype(np.int32)], True, (255, 255, 0), 2)
# cv2.resizeWindow('detection_img', int(w), int(h))
cv2.imwrite('TEST.JPG', detection_img)
cv2.imshow("test", cv2.cvtColor(detection_img, cv2.COLOR_RGB2BGR))
cv2.waitKey()
cv2.destroyAllWindows()
img = base64.decodebytes(img_base64.encode())
# 链接亚马逊
ssh -i "adminduan.pem" ubuntu@ec2-13-115-91-185.ap-northeast-1.compute.amazonaws.com
User name:
Password: vc%UNdUGaPE]
Access key ID:
Secret access key: bK/ssNl/+3YffLcUHH6k
region_name: # 地区
SDK boto3
全屏:F11 zoomIn/zoomOut:Ctrl +/- 侧边栏显/隐:Ctrl+B 控制台终端显示与隐藏:ctrl + ~ 显示资源管理器 :Ctrl+Shift+E 自动换行 : alt + z 快速回到顶部 : ctrl + home 快速回到底部 : ctrl + end 显示搜索: Ctrl+Shift+F 显示 Git: Ctrl+Shift+G 显示 Debug :Ctrl+Shift+D 显示 Output :Ctrl+Shift+U 全局查找 :Ctrl+Shift+F 万能键:F1 找到所有的引用: Shift+F12 同时修改本文件中所有匹配的: Ctrl+F12 重命名:比如要修改一个方法名,可以选中后按 F2,输入新的名字 回车,会发现所有的文件都修改了 跳转到下一个 Error 或 Warning:当有多个错误时可以按 F8 逐个跳转 Shift + Alt + F 格式化代码 F12 跳转到定义处 Ctrl + ` 打开集成终端 Ctrl + Shift + ` 创建一个新的终端
ab命令会创建很多的并发访问线程,模拟多个访问者同时对某一URL地址进行访问。它的测试目标是基于URL的,因此,既可以用来测试Apache的负载压力,也可以测试nginx、lighthttp、tomcat、IIS等其它Web服务器的压力。
ab命令对发出负载的计算机要求很低,既不会占用很高CPU,也不会占用很多内存,但却会给目标服务器造成巨大的负载,其原理类似CC攻击。自己测试使用也须注意,否则一次上太多的负载,可能造成目标服务器因资源耗完,严重时甚至导致死机。
2, 参数及使用方法
格式:ab [options] [http://]hostname[:port]/path 参数说明: -n requests Number of requests to perform //在测试会话中所执行的请求个数(本次测试总共要访问页面的次数)。默认时,仅执行一个请求。 -c concurrency Number of multiple requests to make //一次产生的请求个数(并发数)。默认是一次一个。 -t timelimit Seconds to max. wait for responses //测试所进行的最大秒数。其内部隐含值是-n 50000。它可以使对服务器的测试限制在一个固定的总时间以内。默认时,没有时间限制。 -p postfile File containing data to POST //包含了需要POST的数据的文件,文件格式如“p1=1&p2=2”.使用方法是 -p 111.txt 。 (配合-T) -T content-type Content-type header for POSTing //POST数据所使用的Content-type头信息,如 -T “application/x-www-form-urlencoded” 。 (配合-p) -v verbosity How much troubleshooting info to print //设置显示信息的详细程度 – 4或更大值会显示头信息, 3或更大值可以显示响应代码(404, 200等), 2或更大值可以显示警告和其他信息。 -V 显示版本号并退出。 -w Print out results in HTML tables //以HTML表的格式输出结果。默认时,它是白色背景的两列宽度的一张表。 -i Use HEAD instead of GET // 执行HEAD请求,而不是GET。 -x attributes String to insert as table attributes -y attributes String to insert as tr attributes -z attributes String to insert as td or th attributes -C attribute Add cookie, eg. -C “c1=1234,c2=2,c3=3” (repeatable) //-C cookie-name=value 对请求附加一个Cookie:行。 其典型形式是name=value的一个参数对。此参数可以重复,用逗号分割。 提示:可以借助session实现原理传递 JSESSIONID参数, 实现保持会话的功能,如 -C ” c1=1234,c2=2,c3=3, JSESSIONID=FF056CD16DA9D71CB131C1D56F0319F8″ 。 -H attribute Add Arbitrary header line, eg. ‘Accept-Encoding: gzip’ Inserted after all normal header lines. (repeatable) -A attribute Add Basic WWW Authentication, the attributes are a colon separated username and password. -P attribute Add Basic Proxy Authentication, the attributes are a colon separated username and password. //-P proxy-auth-username:password 对一个中转代理提供BASIC认证信任。用户名和密码由一个:隔开,并以base64编码形式发送。无论服务器是否需要(即, 是否发送了401认证需求代码),此字符串都会被发送。 -X proxy:port Proxyserver and port number to use -V Print version number and exit -k Use HTTP KeepAlive feature -d Do not show percentiles served table. -S Do not show confidence estimators and warnings. -g filename Output collected data to gnuplot format file. -e filename Output CSV file with percentages served -h Display usage information (this message) //-attributes 设置属性的字符串. 缺陷程序中有各种静态声明的固定长度的缓冲区。另外,对命令行参数、服务器的响应头和其他外部输入的解析也很简单,这可能会有不良后果。它没有完整地实现 HTTP/1.x; 仅接受某些’预想’的响应格式。 strstr(3)的频繁使用可能会带来性能问题,即你可能是在测试ab而不是服务器的性能。 参数很多,一般我们用 -c 和 -n 参数就可以了。例如: # ab -c 5000 -n 600 http://127.0.0.1/index.php
用例:
ab -n 4000 -c 1000 http://www.ha97.com/
-n后面的4000代表总共发出4000个请求;-c后面的1000表示采用1000个并发(模拟1000个人同时访问),后面的网址表示测试的目标URL。
This is ApacheBench, Version 2.3 Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/ Licensed to The Apache Software Foundation, http://www.apache.org/ Benchmarking 192.168.80.157 (be patient) Completed 400 requests Completed 800 requests Completed 1200 requests Completed 1600 requests Completed 2000 requests Completed 2400 requests Completed 2800 requests Completed 3200 requests Completed 3600 requests Completed 4000 requests Finished 4000 requests Server Software: Apache/2.2.15 Server Hostname: 192.168.80.157 Server Port: 80 Document Path: /phpinfo.php #测试的页面 Document Length: 50797 bytes #页面大小 Concurrency Level: 1000 #测试的并发数 Time taken for tests: 11.846 seconds #整个测试持续的时间 Complete requests: 4000 #完成的请求数量 Failed requests: 0 #失败的请求数量 Write errors: 0 Total transferred: 204586997 bytes #整个过程中的网络传输量 HTML transferred: 203479961 bytes #整个过程中的HTML内容传输量 Requests per second: 337.67 [#/sec] (mean) #最重要的指标之一,相当于LR中的每秒事务数,后面括号中的mean表示这是一个平均值 Time per request: 2961.449 [ms] (mean) #最重要的指标之二,相当于LR中的平均事务响应时间,后面括号中的mean表示这是一个平均值 Time per request: 2.961 [ms] (mean, across all concurrent requests) #每个连接请求实际运行时间的平均值 Transfer rate: 16866.07 [Kbytes/sec] received #平均每秒网络上的流量,可以帮助排除是否存在网络流量过大导致响应时间延长的问题 Connection Times (ms) min mean[+/-sd] median max Connect: 0 483 1773.5 11 9052 Processing: 2 556 1459.1 255 11763 Waiting: 1 515 1459.8 220 11756 Total: 139 1039 2296.6 275 11843 #网络上消耗的时间的分解,各项数据的具体算法还不是很清楚 Percentage of the requests served within a certain time (ms) 50% 275 66% 298 75% 328 80% 373 90% 3260 95% 9075 98% 9267 99% 11713 100% 11843 (longest request) #整个场景中所有请求的响应情况。在场景中每个请求都有一个响应时间,其中50%的用户响应时间小于275毫秒,66%的用户响应时间小于298毫秒,最大的响应时间小于11843毫秒。对于并发请求,cpu实际上并不是同时处理的,而是按照每个请求获得的时间片逐个轮转处理的,所以基本上第一个Time per request时间约等于第二个Time per request时间乘以并发请求数。
总结:在远程对web服务器进行压力测试,往往效果不理想(因为网络延时过大),建议使用内网的另一台或者多台服务器通过内网进行测试,这样得出的数据,准确度会高很多。如果只有单独的一台服务器,可以直接本地测试,比远程测试效果要准确。
ab -c 100 -n 1000 127.0.0.1:80/
# 日志文件输出, 限制文件大小
import logging
from logging.handlers import RotatingFileHandler
log_formatter = logging.Formatter('%(asctime)s %(levelname)s %(funcName)s(%(lineno)d) %(message)s')
logFile = 'lable_error_img_log.txt'
my_handler = RotatingFileHandler(logFile, mode='a', maxBytes=5*1024*1024,
backupCount=2, encoding=None, delay=0)
my_handler.setFormatter(log_formatter)
my_handler.setLevel(logging.INFO)
logger = logging.getLogger('root')
logger.setLevel(logging.INFO)
logger.addHandler(my_handler)
# 日志文件输出, 终端输出
# logger = logging.getLogger(__name__)
# logger.setLevel(level=logging.INFO)
# handler = logging.FileHandler("log.txt")
# handler.setLevel(logging.INFO)
# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# handler.setFormatter(formatter)
# console = logging.StreamHandler()
# console.setLevel(logging.INFO)
# logger.addHandler(handler)
# logger.addHandler(console)
# import sched # import time # from datetime import datetime # # 初始化sched模块的 scheduler 类 # # 第一个参数是一个可以返回时间戳的函数,第二个参数可以在定时未到达之前阻塞。 # schedule = sched.scheduler(time.time, time.sleep) # # # 被周期性调度触发的函数 # def printTime(inc): # print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # schedule.enter(inc, 0, printTime, (inc,)) # # # 默认参数60s # def main(inc=6): # # enter四个参数分别为:间隔事件、优先级(用于同时间到达的两个事件同时执行时定序)、被调用触发的函数, # # 给该触发函数的参数(tuple形式) # schedule.enter(0, 0, printTime, (inc,))
# def run():
# # schedule.every(10).seconds.do(job1)
# # schedule.every(10).seconds.do(job2)
#
# # schedule.every().days.do(job_func=job1)
#
# # schedule.every().day.at("23:50").do(job, name)
#
# schedule.every(10).minutes.do(job, name)
#
# while True:
# schedule.run_pending()
# time.sleep(1)
官网:https://python-docx.readthedocs.io/en/latest/
需求分析:
1, 数据源 2, 数据样式 3,数据字段规则配置
2, 数据样式
后来决定采取修改的方式, 先以一个word文件作为模板,在模板中定义好上面提到的“样式”,然后在模板中做一个个标记,然后将数据按照规则更新到对应的标记。
3,数据字段规则配置
字段
文字
table
image
生成的word转html
定制化word模板
python-docx
python-docx是一个用于创建和更新Microsoft Word(.docx)文件的Python库。
docxtpl
主要包含两个包,分别是python-docx用于读取,编写和创建子文档。jinja2用于管理插入模板docx的标签。通过预先设定的模板文件,生成需要的文件。
官方文档:
https://docxtpl.readthedocs.io/en/latest/
jinjia2 模板语法
https://jinja.palletsprojects.com/en/2.10.x/
中文文档:
http://docs.jinkan.org/docs/jinja2/templates.html#import-visibility
操作 word文档 实则替换xml, 通过占位符进行替换 jinjia中变量名称
1.jinja2使用{{…}}声明模版中的变量,我们将docx模版中需要替换的内容使用{{…}}手动标注起来。
2.从xls中读取需要替换的值,并与docx模版中预设的变量名对应起来。
3.使用docxtpl库中的DocxTemplate.render完成模板替换。
4.输出替换后的docx。
将需要替换的位置使用双大括号进行标准,并添加变量名。
这里需要注意的是这里应该对{{var}}本身的文本格式完成调整,这样后面替换时就 不需要再单独对文本格式进行处理了。
需要单独调整的可以通过docxtpl库使用富文本的方式操作。
二十二, 时间处理
def valid_date(expired_at): today = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') today_time = time.mktime(time.strptime(datetime.datetime.now().strftime('%Y-%m-%d') , "%Y-%m-%d")) try: expired_at_time = '' if isinstance(expired_at, datetime.datetime): expired_at_time = time.mktime(expired_at.timetuple()) elif isinstance(expired_at, str): expired_at_time = time.mktime(time.strptime(expired_at, '%Y-%m-%d')) diff_dt = int(today_time) - int(expired_at_time) logger.info(f"today: {today} diff_dt: {diff_dt}") if diff_dt > 0: return True else: return False except Exception as e: logger.error(f'today: {today} valid_date error:{e}', exc_info=e)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。