赞
踩
百年沉浮困低谷,莫以今朝度兴衰, 人生终有高飞日,傲振雄翅过沧海.
2022-10-24
celery
django-redis
祝各位程序员节快乐
Redis 由 Vmware 公司开发, (卧槽 虚拟机)! 因为考虑有没有学习Linux的同学 基础篇使用Windows的Redis入门 后期会进入Linux的Redis学习阶段
有些数据用Mysql这种数据库存储很不nice!譬如一些图片 秒杀活动之类的数据 这个时候就可用用到Redis类似的NoSql来存储了
用内存存储数据的技术 从而提高curd(增删改查)速度
扯句题外话kodachi是一个存储在内存上的系统 黑客们很喜欢使用它 毕竟一个关机键就可以完全清除他们的犯罪历史 遇见危险 直接关机处理 有哪个小可爱会不喜欢呢
运存(内存)空间很小 如何去保存数据呢? 关机就清除数据的话 不就都没了吗?
- 我们先要明白 数据使用频率很高的时候我们称之为 热数据 ,举个栗子, 淘宝的商品 steam的热销 哔咔的推荐本 都算是一种热数据
- 如果我们每次都想看这些数据,我们就不用把他们放在Mysql这样存储大量数据的地方了(虽然Mysql有索引这样的机制 但还是麻烦!) 毕竟放在硬盘的数据怎么都比不过存在内存上的数据
- 反而呢 内存非常贵 (吃鸡火热的时候 一个内存买500了解一下) 为了节约成本 不经常使用叫做冷数据 存放在硬盘里这是很不错的选择
万一哪一天停电没有存储在硬盘的热数据岂不是直接GameOver, 所以我们需要了解一个点 热数据其实不但保存在内存 他也有机会在硬盘中被存储 这里存储在硬盘的热数据就是备份备份
所以加入高速缓存的存储机制是这样的:
如果缓存数据发生变化记得及时 更新到数据库里哦!
如果大量数据只靠一台服务器做告诉缓存似乎没那么靠谱 我们就可以搭建集群 类似HDFS叫做 LAZY_PERSIST 的玩意儿. 一般我们在Docker这种容器中搭建
是一个NoSQL数据库产品之一 以键值对的形式进行存储 像JSON 像DICT 像Mapper 由C语言编写(源码开源) 理论可以达到10个w的QPS QPS指的是每秒查询的次数
X轴是链接数 Y轴是一秒查询的次数
下载链接https://github.com/tporadowski/redis/releases,下载ZIP即可
现在之后不要打开任何程序 我们需要让redis-server.exe
启动的时候redis.windows.conf
, 但是 exe 默认不会加载任何文件 我们需要写一个Bat文件来实现这样的效果
新建记事本txt:
redis-server redis.windows.conf
写完之后 存储为.bat文件即可
打开bat弹出的cmd类似于下图
启动客户端的话可以直接点击redis-cli.exe
运行 界面简陋 见谅 服务器端口默认 6379
默认, Redis有16个逻辑库(0~15) 都是空的 可以存储数据 使用 select <id>
切换到指定id的逻辑库 如
select 0
切换到 0 号逻辑库
还有以下命令:
set <key> <value> 新增键值对
get <key> 获取指定key的value
del <key> 删除
clear 清空控制台
flushall 删除所有数据
虽然好用但是简陋!高端程序员的美工都差劲(暴论)
安装 RedisDesktopManager
优化你的眼睛查看更高级的界面
官网收费从别人那里借来的免费网盘链接
百度网盘:https://pan.baidu.com/s/15xVRpCT8mkP2uT8PoBHT3g 提取码:v727
为了不让突然断电的事故导致Redis数据丢失 Redis提供了俩不错的方案
对Redis本身的配置
- port 端口 默认 6379
- bind 默认被注释 表示可以连接数据库的IP地址 推荐是0.0.0.0 表示全部
- 删除空格的时候记得删除空格!
- timeout 超时时间 默认是0 表示没有超时时间
- 长时间不用不关闭也不好 timeout最好设置个数字
- logfile 日志输出的文件名称
- 填写之后记得找到注释并删除注释: syslog-enabled
- 值设置为no就行
- databases 逻辑库的数量
- 默认16
- requirepass
- 密码 默认被注释 建议打开
rdb是内存数据同步到硬盘的数据库文件 有如下操作
- save 同步频率
- 写入包括增删改
- save 60 10000 指的是 在 60s 之内 写入了10000条数据就进行同步
- save 900 1 指的是900内写入了一条数据都可以进行同步
- rdbcompression
- 同步数据采用压缩
- rdbchecksum
- 同步数据校验
- rdbfilename
- rdb文件的名称
- dir
- redis所在的目录、
RDB 同步方案似乎会导致数据的丢失? 我们不妨了解一下AOF的备份方案吧
- maxclients 最大连接数
- 默认无限制 实际环境推荐设置一个大数字
- maxmemory 占用内存的大小
- 默认无限制 实际环境推荐设置一个70%的内存
- appendonly 开始AOF模式
- 默认关闭
- 开启AOF RDB就要关闭 我们只需要删除save命令存在的配置行即可
- 需要指定appendfilename 指定aof
- appendfsync 同步频率
- no - 有系统决定何时存储
- everysec - 每秒都会把数据写入到硬盘 不可靠就是了
- always - 每次内存写入 都会写入硬盘
前排提示:
这一章出现的指令很多! 当我们常用的数据指令了 太多了 记不住?
多用就是哈哈
刚开始用记不住,不妨下载一个 Utools 安装插件redis文档 在开发的时候使用Alt+空格输入redis可以快速查找你的指令
比如我要查询列表删除的数据 我就这样:
然后使用多了你就可以得心应手了!
不是广告!!!
redis的数据类型分为 5 种 - 字符串 哈希 列表 集合 有序集合
可以保存文字数字 也可以保存图片等多媒体文件数据
如何保存多媒体文件?
将文件转化为二进制 序列化 即可存储
但是String并不是无穷大的 最大可以保存512MB数据
set <key> <value>
可以设置数据 也可以对已有key进行修改 不用给value引号 Redis不需要他get <key>
获取字符串的数据del <key>
删除数据getrange <key> <index_min> <index_max>
strlen <key>
获取字符串的长度setex <key> <sec> <value>
设置带有过期时间的kv对(单位秒)psetex <key> <sec> <value>
设置带有过期时间的kv对(单位毫秒)mset <k1> <v1>...<kn> <vn>
同时设置多个kv对mget <k1> <k2>
获取多个k的valueappend <k> <v>
向 k 对应的 value 追加内容 内容为vincr <key>
数字+1incrby <key> <number>
数字+number number∈intincrbyfloat <key> <number>
数字+number number∈floatdecr <key>
数字-1decrby <key> <number>
数字-number number∈intdecrbyfloat
浮点计算不大准确 还是少用最好
hash说白了就是字典这种结构化的数据 在Redis使用hash 有种字典套字典的感觉
使用hash 可以一次性存储多个数据 比如对象的全部属性
hset <key> <hash_key> <hash_value>
-- 在key里定义一个hash的value 并给这个hash的hash_key 设置一个hash_value
hmset -- 类似于mset 同时给一个hash设置多个key
hget <key> <hash_key> -- 获取hash的hash_key的value
hmget -- 类似于hmget
hgetall <key> -- 获取hash所有的kv
hkeys <key> -- 获取hash所有的key
hlen <key> -- 获取hash的key数量
hexists <key> <hkey> -- 是否存在key字段 (0False 1True)
hvals <key> -- 获取字段值
hdel <key> <hkey>...<hkeyn> -- 删除多个hash的kv对
其实我们发现了一个规律只要在String的指令前面加上h 在添加一个hash_key的参数基本可以复刻string的操作
猜猜hincrby 和 hincrbyfloat 是什么? 如何用?
列表可以存储多个重复的值 他和Java的数组(但是你不用定义长度) python的列表有些类似
猜猜 现在dname列表种存储的元素是哪些 顺序如何
你可以使用lrange查看列表数据
类似于Python的集合 和列表一致 不过不能存储重复字段而且无序
顾名思义 可以排序的集合
5 10 表示 5<= x <= 10
(10 表示 小于10
+inf 表示无穷大
-inf 表示负无穷大
同理, ZrevrangebyScore表示降序 参数和升序一致
使用 pip install redis
下载redis库
import redis
r = redis.Redis(
host='127.0.0.1', # IP
port=6379, # 端口
db=0, # 指定逻辑库
# password="我没有哦" 密码没有的话可以咬打火机
)
print(r.keys("*")) # 获取逻辑库0的所有keys
print(r.type('number')) # 获取逻辑库0中number值类型
print(r.type('id'))
r.sadd('id',"24","4")
print(r.smembers('id'))
还记得Mysql的事务吗,记不得了? 没事 redis的事务机制和Mysql不同
但是还是推荐你去复习一下Mysql的事务…
mysql这一类 的数据库的事务是为了防止数据在进行操作的过程中发生了意外的宕机而出现的数据错乱问题所以引入的事务机制 undo
redo
redis本身不需要对数据进行持久化,所以redis也不需要向mysql这样的事务机制,在redis中失败就是失败成功就是成功 没有原子性的说法。
redis的事务可以这么理解
Redis 不会给数据来个锁来保证数据被多个请求所修改的事情发生,而是使用一个数据监视来监视数据,如果事务在执行过程中,其他的客户端修改了监视的记录那么当前的事务会自动的关闭。只有监视的数据没有被其他客户端修改的时候,事务才会执行完成。
watch <key> <key2> ...
监视的数据可以是一条可以是多条
还是那个案例:小红给小明转账
如果 命令在中途出现宕机的问题 小红的钱没了 小明的钱没到账
为了避免这样的问题 我们推荐了事务 要么全部成功执行 要么全部执行失败
我们操作数据的类型发生错误的时候 可能会出现中间状态的问题(事务执行到一半不执行的问题),但是我们执行错误的指令的时候会自动discard
使用 multi
启动事务 使用 exec
提交事务
开启事务后操作不会立即执行 而是在提交事务时候一同执行(类似批处理)
redis的事务没有回滚机制 所以不存在原子性 当事务提交之后不能进行回滚的操作,但是在事务提交之前我们随时可以使用discard取消事务。
当一致性持久性 隔离性 原子性都实现的事务就是强事务 如果没有实现其中的某个功能就是弱事务
? 为什么不实现回滚呢
因为回滚的实现需要使用磁盘执行因为redis基于内存 所以作者舍去了回滚机制
一般的, redis对事务的执行机制是在服务端对发来的数据指令进行积攒,直到exec在进行执行.
现在我们可以将指令的积攒到本地然后再发送给服务器,也就是将积攒命令的工作移交给客户端.
流水线就是实现指令积攒的客户端技术 从而将发送1000条指令的次数 降低为 发送指令集也就是一次的次数 但是在这个一组命令中 有一个命令需要一个命令的执行结果(就是返回值) 的时候就不能使用该技术了 而且当本地的指令集非常庞大的时候一次性传送会给服务器很大的压力
pipeline技术实现需要使用连接池:
import redis pool = redis.ConnectionPool( host='127.0.0.1', port=6379, db=0 ) r = redis.Redis(connection_pool=pool) r.set("money", 2) pipe = r.pipeline() pipe.incr("money") # pipe.incr('number') print(pipe.execute()) # 会返回所有指令执行的全部返回值
import redis
pool = redis.ConnectionPool(
host='127.0.0.1',
port=6379,
db=0
)
r = redis.Redis(connection_pool=pool)
with r.pipeline(transaction=1) as pip:
pip.multi()
#TODO 这里执行事务的命令
ret = pip.execute()
Redis 事务机制基于乐观锁实现
事务执行中可以对key进行监听 命令提交的时候 被监听的key对应的值没有被修改 事务提交成功 否则失败
相反的有个技术叫做悲观锁, 这个机制会认为数据是及其不安全的 所以每时每刻都会给给有竞争的资源进行上锁(有点像是进程锁与线程锁,在Java中有个单独的实现叫做
同步代码块 synchronized
)
watch books; # 对 books 进行乐观锁判断
multi # 开启事务
incr books # 修改值
exec # 提交事务
-- 同时在另一边
incr books # 我们在watch之后 exec提交之前执行这个数据
在乐观锁的思想下我们在wacth后记录了books的状态,我们在另个客户端对其进行了修改,这个时候事务进行提交会发现数据的值被修改了,这当然不行,所以事务执行失败。
如果是悲观锁的思想下我们在另一边的客户端(就叫B端), 修改值 因为books加了锁 那这个B就执行失败了。
redis的watch基于的是乐观锁 所以事务部分会执行失败 exec返回的是(nil)
简单的对 购买商品的实现 sleep期间会有其他客户端修改book_counter
import time import redis pool = redis.ConnectionPool( host='127.0.0.1', port=6379, db=0 ) r = redis.Redis(connection_pool=pool) key = 'book_counter' def buy_book(username): """ 购买书籍 :return: """ with r.pipeline(transaction=True) as pipe: while True: try: pipe.watch(key) # 对 book_counter 进行监听 # 虽然watch在流水线里面 # 理当在客户端整理完指令之后一并发出 # 但是watch是直接发送的额 time.sleep(10) pipe.multi() # 开启事务 pipe.decr(key) # 在事务中对数据进行修改pipe.decr(key) # 在事务中对数据进行修改 pipe.execute() # 在执行事务之前会看看监听的值是否被改变如果被改变就不执行了 except redis.WatchError: print(f'{username} 购买失败 重新购买中') continue else: print(f"{username} 购买成功, 当前存货量{r.get(key)}") break import multiprocessing if __name__ == '__main__': r.set(key, 1) # 设置一个 book_counter 现在还有一本了! print(f"当前存货量{r.get(key)}") buy_book("用户一")
精简版:
with r.pipeline(transaction=True) as pip:
try:
pip.watch(key) # 监听
time.sleep(10) # 等待
pip.multi()# 开始
pip.decr(key) # 我自己购买
pip.execute() # 提交给服务器
except:
print("购买失败 有人买了")
rdb的文件windows在redis的根目录下,linux在 /var/lib/redis/dump.rdb
或者是 /etc/dump.rdb
下,
我们可以使用指令 save
手动触发 reids的rdb 执行 save 的时候 服务器阻塞 无法处理客户端发送的命令请求。 这种技术我们也称之为 快照
为了避免阻塞导致服务器的redis无法工作 我们可以使用指令 bgsave
让我们redis在后台开启写磁盘的工作(Redis不是一个真正全部都在使用单进程单线程的程序 作者在某些重要的地方也被迫做了妥协)
bgsave执行流程:
save不消耗其他资源,bgsave不进行阻塞
aof记录得是命令而不是数据 每当服务器重启得时候重新执行一下命令即可
AOF 并不是来一个命令记录一次,因为频繁的写入命令实质会变得像是mysql类似的磁盘数据库了 而是redis利用系统开辟的一个缓冲区 在缓冲区中进行命令的记录(缓冲区是内存级别的) 等到缓冲满了才会写入硬盘中。所以AOF也会失去部分文件
因为是记录命令难免命令太多aof文件太大 为了避免命令的胡乱增长 redis提供了aof重写功能 比如多个命令发来的set我们可以变成一个mset 且过程不阻塞redis的正常运行
实现的方式有两种:
bgrewriteaof
和 配置 auto-aof-rewrite-percentage
和 auto-aof-rewrite-min-size
bgwrite和bgsave同理
AOF
既然学习了这么多的命令 不妨来做个小demo 假设设计一个网络购物平台 只有下单前10名的用户才可以拿到数据 记录他们的名字与购买
主从复制又名读写分离
架构设计考虑的地点 通常是设计如何去减少系统被阻塞的时间(不可服务的时间片要少)。
因为redis是单线程单进程的模式 如果redis挂掉 相关以来的服务就难以正常运行
类似于MasterSlave Master负责服务,而Salve是该服务的复制品 master会同步更新数据给slaves 保持主从同步 且master可以执行写读命令了 slave只能执行读命令
现在我们可以通过slave进行读 master进行写 将读写的命令尽量的分散开来 从而减去服务器的压力 即使master倒下了 slave
可以继任 master
成为 新的master
.
在linux下使用目录下 redis-server --slaveof <主服务器的IP> <主服务的PORT> --port <从服务器的端口> --masterauth <master的密码>
接着可以使用命令 redis-cli -p <端口号>
链接从服务器了 在客户端中执行info查看自己的master
当然我们可以正常启动一个redis-cli 然后使用指令 slaveof IP PORT
让我成为从, 或者是 slaveof no one
让自己与master
断绝关系
我们手写一个redis的conf
slaveof 0.0.0.0 6379
port 1333
然后我们使用命令: redis-server <配置文件目录>
即可开启从服务器
如果主服务器在运行的过程中驾崩了, 为了快速的在slave中找到master的替代品,这个时候我们就可以使用哨兵(sentine
)了 他会不断检查Master和Slave是否正常每一个哨兵可以监控多个Master和其Slaves. 当哨兵认为redis的主服务器死亡后 会挑选一个新的子服务器 并且推进为新主服务器, 自动切换完成。
使用 redis-server <SENTINEL_CONFIGER> --sentinel
使用哨兵
哨兵的配置文件如下:
port 2342341 端口
sentinel monitor <name> 127.0.0.1 4444 <count>
name: 监听的主服务器 name表示对主服务器所在的集体的名称可以随便取
count: 启动的哨兵投票多少数目才可以通过确认 哨兵可以是多个,在主服务器死亡之后投票选择新的从服务器进行继承
为什么引入投票机制:
可能哨兵在检测的时候 对某个从服务器的生死判断有一定的错误(他可能活着 但是哨兵认为死了) 多些哨兵可以让结果准确一些
这样的话我们可以根据判断主服务器是否死亡来进行新的子服务器的上位,即使未来旧主复活新主依旧不会让位。
这是哨兵的其他重要配置
import redis from redis.sentinel import Sentinel sentinel = Sentinel([('192.168.236.128', 11451)]) # 链接哨兵 可以获取多个 master = sentinel.master_for('Group1', db=1) # 获取指定某个组里的主 slave = sentinel.slave_for('Group1', db=1) # 获取指定某个组的从 master.set("test_value", "TEST") # master 读 print(slave.get("test_value")) # slave 写 number = "counter" master.set(number, 1) with master.pipeline() as pipe: while True: try: pipe.watch(number) pipe.multi() pipe.decr(number) pipe.execute() break except redis.WatchError: print("购买失败") continue print(slave.get(number))
如果请求量过大的时候,一个服务器很难抗住所有, 比如 春节订票 双十一 的时候,
集群是一组相互独立的 但是通过网络互连的计算机 他们构成了一个组 并以单一系统的模式加以管理
pip install django
之后 使用命令 django-admin startproject 项目名
创建项目 然后会出现类似的结构
使用命令: python manage.py runserver
启动服务器
Django开发中, 我们会对网站的功能进行不同模块的划分,一个模块一个Django
的应用 ,使用 python manger.py startapp 【应用名】
开启一个应用。
输入命令
python manager.py startapp studentapp
可以得到这些应用的文件有什么作用呢?
- admin
- apps
- models 写数据库相关的内容
- tests 测试类
- views 接受请求进行处理与MT交互 返回应答
创建好应用之后就可以在视图中实现代码了
from django.http import HttpResponse
def index(res):
"""
index主页视图
:param res:
:return:
"""
return HttpResponse("HelloWorld")
默认的 views.py 有一个render的引用现阶段可以删除他
视图写完了 但是无法看见他真正的效果,我们需要将一个 URL 映射到它——这就是我们需要 URLconf 的原因了。
在 应用文件夹 里创建一个 url.py
并做好注册工作
from django.urls import path
from . import views
urlpatterns = [
path('', views.index , name='index')
]
# 这里面的列表名称必须是urlpatterns
这一步的操作是给我们的 APP 中的view(第二个参数) 指定路由(第一个参数 ) 并给他命名(name参数)
但是我们应用并没有注册到服务器上 所以我们需要在Django的项目urls中进行注册
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path('student/',include('studentapp.urls')),
# 给 studentapp.urls 里面注册的视图一个路由组 student/
path("admin/", admin.site.urls),
]
运行网站 访问IP:http://127.0.0.1:8000/student/
即可查看视图
在项目的setting中有一个DATABASES
列表我们可以轻松的找到里面字典ENGINE的值 默认是 SQLite 我们可以换成MySQL 或者是其他的DBMS.
ENGINE
– 可选值有'django.db.backends.sqlite3'
,'django.db.backends.postgresql'
,'django.db.backends.mysql'
,或'django.db.backends.oracle'
。其它 可用后端。
当我们使用的不是SQLite的时候我们可能会给连接的数据库获取他的密码或者是用户名称 可以这么做:
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'mydatabase',
'USER': 'mydatabaseuser',
'PASSWORD': 'mypassword',
'HOST': '127.0.0.1',
'PORT': '5432',
}
}
使用mysql的时候 直接启动程序会有问题:
Did you install mysqlclient?
你可以在项目的__init__
文件中输入下列代码import pymysql pymysql.install_as_MySQLdb()
- 1
- 2
在setting中设置
LANGUAGE_CODE = 'zh-hans'
TIME_ZONE = 'Asia/Shanghai'
可以设置中文
我们可以在应用下的 models 里编辑
from django.db import models
# Create your models here.
from django.db.models import CASCADE
class TeacherModel(models.Model):
name : str = models.CharField(max_length=255)
_teacher_id : int = models.IntegerField()
class StudentModel(models.Model):
_id: int = models.AutoField(primary_key=True, null=False)
name: str = models.CharField(max_length=255)
_teacher_id: int = models.ForeignKey(TeacherModel, on_delete=models.CASCADE, db_column='_teacher_id') # 外键
我们再把眼光跳到应用的 apps.py
这是我们应用的一个AppConfig 比如: studentapps
的 AppConfig
就是StudentappConfig
这是应用的灵魂
Django 的应用是可以热插拔的 你可以在任何项目使用同一个应用 !
而我们的AppConfig相当于一个插头,可以使App插入Django项目 我们只需要在 项目的setting.py 中的 INSTALLED_APPS
进行注册即可
注册的方式如下:
在AppConfig 中 有两个属性 name 我们知道是 这个应用的名称 default_auto_field 是指如果模型没有指定主键 那么就会自创建一个主键 这个主键的类型是default_auto_field的值 一般是django.db.models.BigAutoField 指的是巨大数字
给一个Filed设置主键只需要设置参数:
primary_key=True
主键默认自动递增
迁移是对Django模型的定义变化的存储方式, 我们使用命令 python manage.py makemigrations 应用名称
执行之后我们可以在 应用/migrations/000x_initial.py
中找到迁移的数据,使用第二个命令: python manage.py sqlmigrate 应用名 000x
可以查看迁移的模型的建表SQL
使用命令:
python manager.py sqlmigrate
将模型在SQL语句中创建
使用 python manage.py shell
可以 快速操作模型哦
from StudentApp.models import StudentModel as sm
from TeacherApp.models import TeacherModel as tm
newdata = sm(
name='新增字段', # 普通字段
_teacher_id=tm(_teacher_id=1) # 外键
)
newdata.save()
# 插入成功
使用filter函数进行查找 他的参数是拼接而成的 比如 name__endwith
表示name字段 以参数值结尾的数据的查询
name__endswith
结尾__name__contains
包含name__isnull
为空id__in
等于列表元组里的某个值id__lt
小于 参数再加个e id__lte
小于等于id__gt
大于 参数再加个e id__gte
大于等于id__year
时间在某年id__gt
时间在之后id__iexact
大小写不敏感使用函数 exclude
进行反操作
还有很多… 更多查询
如果有查询出的结果 调用结果对象的delete
方法就是删除
a = sm.objects.all()
for i in a:
i.name= "23"
i.save()
或者是
Example.objects.filter(id=481).update(total_calories = 10)
总而言之 使用Django的模型创建表格需要以下几步
models.py
文件,改变模型。python manage.py makemigrations
为模型的改变生成迁移文件。python manage.py migrate
来应用数据库迁移。在后期的模型更新的时候你可以不在两个命令后加上应用名
有些时候指定了主键但是还是会出现 Django新建字段名称为:id列作为主键,这种情况在模型被其他模型做成了外键的时候会出现, 解决方式是在设置外键的模型字段添加参数db_column 指定列名。
from django.db import models
from teacherapp.models import TeacherModel
class StudentModel(models.Model):
_id: int = models.AutoField(primary_key=True, null=False)
name: str = models.CharField(max_length=255)
_teacher_id: int = models.ForeignKey(TeacherModel, on_delete=models.CASCADE, db_column='_teacher_id') # 外键
模型的其他参考文档:
首先,我们得创建一个能登录管理页面的用户。请运行下面的命令:
python manage.py createsuperuser
<input type="text" name="name" id="name">
$.ajax({
type: 'post',
url: '/withtag_ajax/',
data: {name: $("#name").val()}, // data直接是js对象
success: function (data) {
$('#info').text(data.msg)
}
}
)
from django.http import HttpRequest, JsonResponse from django.views import View from api_blog.models import BlogModels class api_get_page_blogs(View): def post(self, response: HttpRequest): """ Ajax 请求处理 DESCRIPT 返回指定页数的所有博客 CLient AJAX Format { page: xxx // 第几页 pre_page_count: xxx // 每页有X个博客需要展示 默认 5 order: READERCOUNTER | DEFAULT (READERCOUNTER可以缩写为1 表示阅读量 DEFAULT简写为0默认) } Server AJAX ForMat { data : [{ 'name' : ... , 'page': ... ,'author' : ...}...] // 博客数据 status: 1 | error | 3; 1 表示成功 2表示失败(页数不符合规范或者没有数据) 3表示请求不规范 } :param response: 浏览器的请求体 :return:Server AJAX JSON :author LiuBoyuan@qq.com """ json = {} ajax_data: dict = response.POST now_page = ajax_data.get('page') pre_page_count = ajax_data.get("pre_page_count") order = ajax_data.get("order") if now_page is None or now_page < 0: return JsonResponse({"status": 'error'}) pre_page_count = pre_page_count if pre_page_count is not None else 5 order_rule = order if order is not None else "DEFAULT" if order_rule == "READERCOUNTER" or order_rule == 1: BlogModels.objects.order("reader_count") slice_index = slice((now_page - 1) * pre_page_count, now_page * pre_page_count) all_das = BlogModels.objects.all() ret_das = all_das[slice_index] if ret_das == [] or not ret_das: return JsonResponse({"status": 2}) json['data'] = ret_das json['status'] = 1 return json
见:https://zhuanlan.zhihu.com/p/151855280
STATIC_URL = '/static/'
if DEBUG == False:
STATIC_ROOT = os.path.join(BASE_DIR, 'static')
else:
STATICFILES_DIRS = [
os.path.join(BASE_DIR,"static")
]
# setting.py
一般情况下 Session是不会串联的(每个客户单的session单独存在 除非两个客户端的session被同时设值等问题)
def view(req):
req.session['key'] = value
req.seesion.get('key')
...
api 就是一个接口 一个url
restful 中文叫做 资源状态转移
(表征状态转义)
实质上 restful 是面对资源进行开发的
我们理解的 Book/add Book/delete 这些路由是不符合restful 规范的
面向资源开发指的是指
针对资源本身 不加入任何动作
的规范 add delete属于资源的动作 所以不符合我们的要求。
restful规范是通过请求的类型来规定制定的动作不同
- post 添加数据
- get 获取数据
- get 获取id为pk的数据
- delete id为pk数据的删除
- put 修改一个学生的全部数据
- patch 修改一个学生部分的信息
实质上 FBV 就是函数视图 CBV 就是类视图
def student(req):
if req.method == 'GET':
return HttpResponse("Get")
elif req.method == 'POST':
return HttpResponse("Post")
以上就是使用 FBV 实现的RESTFUL的函数视图
CBV的实现就是
def StudentView(django.views.View):
def get(self, request):
return HttpResponse("Get")
def post(self, request):
return HttpResponse("Post")
在urls中的注册
urlpatterns = [
path("student/", StudentView.as_view(), name='...')
]
问题:
我写的 CBV 写好了 student 路由的 post 方法 为什么postman的时候获取不到呢?
127.0.0.1:8000/api/student
因为 student后面没有 / 也就是不是 127.0.0.1:8000/api/student/这么结束的时候 Django默认会补全一个/ 从而重定向一个get请求。
我们在Path中首先走了as_view() 方法:
@classonlymethod
def as_view(cls, **initkwargs):
"""Main entry point for a request-response process."""
....
$ def view(request, *args, **kwargs):
...
...
$ return view
as_view的大致结构是这样的 前后做了些操作 关键的是: 定义了一个inner函数view 然后将其返回
所以我们在 StudentView.as_view()
的部分就是as_view
返回的函数的调用
细看下as_view返回的函数
def view(request, *args, **kwargs):
self = cls(**initkwargs) # 将类进行实例化 谁调用的as_view谁是cls
...
return self.dispatch(request, *args, **kwargs)
view 函数将类进行实例化 然后使用实例化对象的dispatch方法 dispatch方法返回什么 view返回什么
我们细说下dispatch:
def dispatch(self, request, *args, **kwargs):
if request.method.lower() in self.http_method_names:
$ 如果请求在cls的请求列表里(请求类型存在)
handler = getattr(self, request.method.lower(), self.http_method_not_allowed)
$ 设置对象的指定请求名称的方法
else:
handler = self.http_method_not_allowed
$ 请求不存在 设置请求不存在方法
return handler(request, *args, **kwargs)
$ 调用设置好的方法 并将结果返回
pip install djangorestframework
我们将CBA视图的父类修改为APIView如:
from rest_framework.views import APIView
class BookView(APIView):
def get(self, request):
return HttpResponse("Get")
def post(self, request):
return HttpResponse("Post")
我们使用postman测试的时候,发现她返回的数据与功能和View大差不差 是一致的.
前往ApiView类之后我们可以看到该类重写了View的 as_view
类
def as_view(cls, **initkwargs):
if isinstance... $ 滤过
view = super().as_view(**initkwargs) # 调用了View原生的as_view 并接受她的返回值
$ ... 滤过
return csrf_exempt(view) $ 相当与返回view 而且这个API不受csrf保护
我们已经知道了APIView 无非是将View的返回值进行一层csrf_exempt
的包装 其余似乎什么也没有 但是我们知道在View中的as_view 他调用了self.dispatch() 这个方法有没有被重写呢 —— 重写了 这是dispatch
def dispatch(self, request, *args, **kwargs):
...
request = self.initialize_request(request, *args, **kwargs)
$ 构建了一个新的request对象
...
return self.response
封装前的request 和 封装后的request
封装之前的request只支持处理UrlEncoded的数据 封装之后便开始支持 Json格式的数据了
当然在此后它还做了认证 权限 限流等组件的初始化
在BookView具体的请求处理函数中 post 可以使用 request.data 来获取请求体 get 就是query_params
class ClassBook(APIView):
def get(req):
data = req.query_params
...
def post(req):
data = req.data
...
在网络中的序列化就是把模型对象转化为字典 经过response之后 变成Json对象就是序列化 反序列化就是将JSON转化为字典 列表之类的模型的
from django.http import HttpRequest, HttpResponse from rest_framework import serializers from rest_framework.response import Response # Create your views here. from rest_framework.views import APIView from sers.models import Book class BookSerializer(serializers.Serializer): """ 书籍模型的序列化器 """ title: str = serializers.CharField(max_length=32) price: int = serializers.IntegerField() date: str = serializers.DateField(source='pub_date') class SersView(APIView): def get(self, request: HttpRequest): """ 获取 全部的书籍 数据 :param request: :return: """ book_list = Book.objects.all() # 使用序列化对象进行序列化 bookSerializer = BookSerializer( instance=book_list, # 序列化的对象 many=True # 是否同时序列化多个对象 ) ret = bookSerializer.data # 获取序列化的结果 print(ret) return Response(ret) def post(self, request: HttpResponse): """ 添加数据 :param request: :return: """ datas = request.data bookSerializer = BookSerializer(data=datas) # 反序列化 print(bookSerializer.is_valid()) # 传过来的字段合法吗 """ 合法生成一个 validated_data 对象 里面存储着可以直接被数据库存储的对象数据 不合法生成一个 errors 对象 里面生成了 键值对 键是字段 值是错误的原因 """ if bookSerializer.is_valid(): print(bookSerializer.validated_data) Book.objects.create(**bookSerializer.validated_data) return Response(datas) else: return Response(bookSerializer.errors)
我们在对模型数据返回到前台的时候 或者是 需要将前台传来的数据进行模块话的时候 我们可以使用 rest_framework 进行序列化和反序列化的操作:
serializers.Serializer
类serializers.Serializer
放着我们想要对模型进行序列化的字段 以及它的数据类型serializers.Serializer
的派生类 传入不同的参数有不同的效果 从而进行序列数据的处理初始化
is_valid()
函数 如果True生成一个validated_data对象 里面存储着可以直接被数据库存储的对象 如果Flase 生成一个error对象 里面生成了 键值对 键是字段 值是错误的原因 你可以直接使用Response返回为了实现松耦合,我们可以不再添加数据的时候加入代码
Book.objects.create(**bookSerializer.validated_data)
而是使用序列器继承下来的save()函数 也就是
bookSerializer.save()
但是这样会报错 因为在源码中save调用类Serializer的基类的create,但是这个create可以理解成 抽象
的 需要去实现 而且create需要有返回值 这个返回值将会作为这个序列器对象的instance的值 也就默认帮我们将结果的数据进行了序列化 所以我们在Response传给前端的时候 可以直接使用 序列器对象的.data
获得 其 instance
的值。
也就是这样的:
... class BookSerializer(serializers.Serializer): ... def create(self, validated_data): """ 实现create方法从而默许调用 save :param validated_data: :return: """ return Book.objects.create(**validated_data)# 返回之后默认序列化 class SersView(APIView): def post(self, request: HttpResponse): """ 添加数据 :param request: :return: """ ... if bookSerializer.is_valid(): bookSerializer.save() return Response(datas) else: return Response(bookSerializer.errors)
当然除了create save还隐式调用了update 我们可以实现update再序列器中 然后进行调用
from django.http import HttpRequest, HttpResponse from rest_framework import serializers from rest_framework.response import Response # Create your views here. from rest_framework.views import APIView from sers.models import Book class BookSerializer(serializers.Serializer): """ 书籍模型的序列化器 """ title: str = serializers.CharField(max_length=32) price: int = serializers.IntegerField() date: str = serializers.DateField(source='pub_date') def create(self, validated_data): """ 实现create方法从而默许调用 save :param validated_data: :return: """ return Book.objects.create(**validated_data) def update(self, instance, validated_data): """ 实现 update 方法从而默许调用save :param instance: :param validated_data: :return: """ Book.objects.filter(id=instance.id).update(**validated_data) return Book.objects.get(id=instance.id) class SersView(APIView): def get(self, request: HttpRequest, id:int = None): """ 获取 全部的书籍 数据 :param request: :return: """ if id is None: book_list = Book.objects.all() else: book_list = Book.objects.get(id=int(id)) # 使用序列化对象进行序列化 bookSerializer = BookSerializer( instance=book_list, # 序列化的对象 many=True if id is None else False # 是否同时序列化多个对象 ) ret = bookSerializer.data # 获取序列化的结果 print(ret) return Response(ret) def post(self, request: HttpResponse): """ 添加数据 :param request: :return: """ datas = request.data bookSerializer = BookSerializer(data=datas) # 反序列化 print(bookSerializer.is_valid()) # 传过来的字段合法吗 if bookSerializer.is_valid(): print(bookSerializer.validated_data) bookSerializer.save() return Response(datas) else: return Response(bookSerializer.errors) class BookDetailView(APIView): def get(self, request, _id:str): if _id.isnumeric(): _id = int(_id) try: book = Book.objects.get(id=_id) except : return Response("NO Exist") bookSerializer = BookSerializer(instance=book, many=False) return Response(bookSerializer.data) else: return Response("error") def put(self, request, _id:str): """ 设置 :param request: :param _id: :return: """ request_json = request.data print(request_json) if _id.isnumeric(): _id = int(_id) try: book = Book.objects.get(id=_id) bookSerializer = BookSerializer(data=request_json, instance=book) if bookSerializer.is_valid(): bookSerializer.save() else: return Response(bookSerializer.errors) return Response(bookSerializer.validated_data) except Exception as e: return Response(f"error1{e}") return Response("error2")
顾名思义 ModelSerializer 是通过model自动实现序列器对象 不需要手动创建序列器的神器(甚至 create和update 都已经做好了)
class BookSerializer(serializers.Serializer): title: str = serializers.CharField(max_length=32) price: int = serializers.IntegerField() date: str = serializers.DateField(source='pub_date') def create(self, validated_data): return Book.objects.create(**validated_data) def update(self, instance, validated_data): Book.objects.filter(id=instance.id).update(**validated_data) return Book.objects.get(id=instance.id) $ 可以直接这么写: class BookSerializerAuto(serializers.ModelSerializer): date = serializers.DateField(source="pub_date") class Meta: model = Book # fields = ['title', "price"] # 对指定的字段进行序列化 # fields = "__all__" # 对所有字段进行序列化 exclude = ['pub_date'] # 哪些字段不用序列化 因为我们自定义了一个date 所以pub_date可以不要
其实还是基础的CBV开发模式 只不过在APIView上提供了新的方法而已 依旧采用的dispatch分发的流程,一个简单的增删改查查代码太多了! 我们需要使用一种方式来简化我们的代码 这个时候GenericApiView的好处就来了
以上四个方法的作用为调度
以我们的get请求为例
class SerailizerStudent(ModelSerializer): class Meta: model = Student fields = '__all__' class StudntView(GenericAPIView): queryset = Student.objects.all() # 设置查询集 serializer_class = SerailizerStudent # 设置序列器 def get(self, req): mydatas = self.get_queryset() # 获取自身的查询集 ret = self.get_serializer(instance=mydatas, many=True).data # 序列器的创建 return response.Response(ret) # 返回
我们有些时候并不会获取全部的数据而是获取指定id的数据 那么如何实现呢?
在GenericApiView中我们有一个叫做 lookup_field
的属性 通过 get_object
我们可以获取指定的
mixin类为 GenericApiView
进行了再一步的封装
from rest_framework.generics import GenericAPIView from rest_framework.mixins import * from rest_framework.serializers import ModelSerializer from Study.models import Student class SerailizerStudent(ModelSerializer): class Meta: model = Student fields = '__all__' class StudntView(GenericAPIView, ListModelMixin, CreateModelMixin): queryset = Student.objects.all() serializer_class = SerailizerStudent def get(self, req): return self.list(req) def post(self, req): return self.create(req) class StudentDetailView(GenericAPIView, RetrieveModelMixin, DestroyModelMixin, UpdateModelMixin): queryset = Student.objects.all() serializer_class = SerailizerStudent def get(self, req, pk): return self.retrieve(req) def put(self, req, pk): return self.update(req) def delete(self, req, pk): return self.destroy(req)
class StudntView(ListCreateAPIView):
queryset = Student.objects.all()
serializer_class = SerailizerStudent
class StudentDetailView(RetrieveUpdateDestroyAPIView):
queryset = Student.objects.all()
serializer_class = SerailizerStudent
viewset改变了原有的apiview分发机制的逻辑,将单一资源和多资源的查询结合在一起从而简化代码
class StudentView(ViewSet): def get_object(self, req, pk): return Response("单一查询") def get_all(self, req): return Response("多个查询") def add_object(self, req, pk): return Response("添加数据") def update_object(self, req, pk): return Response("单一添加") def delete_object(self, req, pk): return Response("单一删除")
如上 我们可以自定义方法的名称来区分 单一资源的面向和多资源的面向
对于他的分发点 我们可以在as_view中创建映射
urlpatterns = [ path("student/", StudentView.as_view( { "get": "get_all", "post": "add_object", } )), re_path(r"student/(?P<pk>\d+)", StudentView.as_view( { "get": "get_object", "delete": "delete_object", "put": "update_object" } )) ]
因为原生的ViewSet继承的是APIView没有GenericApiView的功能自然也用不了Mixin这些好玩意儿, 功能可以说是非常的鸡肋,所以我们推荐使用 GenericViewSet
进行开发 这样我们可以直接映射到Mixin方法里面
path("student/", StudentView.as_view(
{
"get": "list",
"post": "create",
}
))
通过这种方式我们可以实现这样的玩意儿
class StudentView(
ViewSet,
GenericAPIView,
ListModelMixin,
UpdateModelMixin,
CreateModelMixin,
RetrieveModelMixin,
DestroyModelMixin
):
queryset = Student.objects.all()
serializer_class = SerailizerStudent
urlpatterns = [ path("student/", StudentView.as_view( { "get": "list", "post": "create", } ), name='...'), re_path(r"student/(?P<pk>\d+)", StudentView.as_view( { "delete": "destroy", "update": "update", "get": "retrieve" } )) ]
继承的东西太多了! 直接
class StudentView(ModelViewSet):
queryset = Student.objects.all()
serializer_class = SerailizerStudent
针对Mixin和ViewSet django 有一套路由
from rest_framework.routers import DefaultRouter
from .views import StudentView
route = DefaultRouter()
route.register('student', StudentView)
urlpatterns = []
urlpatterns += route.urls
在drf的view的源码中,认证权限和限流是在执行dispatch前执行的三件套(在方法initial中执行)
def initial(self, request, *args, **kwargs):
...
self.perform_authentication(request)# 认证
self.check_permissions(request)# 权限武松和
self.check_throttles(request)# 限流
认证
完成用户信息的判断 确定是否为用户表中的注册用户
权限
判断用户的视图操作权限
from django.contrib.auth.models import User
user = User.objects.create_user('john', 'lennon@thebeatles.com', 'johnpassword')
# 可以进行修改
u = User.objects.get(username='john')
u.set_password('new password')
u.save()
我们需要用户来进行操作 所有的用户信息都在auth_user
里面存放着
admin用户的创建:python manage.py createsuperuser
然后会让你输入Password Email 和Username信息
username
必须。150 个字符或更少。用户名可能包含字母数字,_
,@
,+
,。和 -
字符。如果您使用的 MySQL,请指定 max_length=191
,因为默认情况下,MySQL 只能创建 191 个字符的唯一索引。
first_name
可选 (blank=True
)。 30 个字符或更少。
last_name
可选 (blank=True
)。 150 个字符或更少
email
可选 (blank=True
)。邮箱地址。
password
必须。密码的散列和元数据。(Django 不存储原始密码。)原始密码可以是任意长的,并且可以包含任何字符。
groups
多对多关系 Group
user_permissions
多对多关系 Permission
is_staff
Bollean 类型。指定此用户是否可以访问 admin 站点。
is_active
Bollean 类型。指定是否应将此用户帐户视为活动用户。我们建议您将此标志设置为 False
而不是删除帐户;这样,如果您的应用程序对用户有任何外键,外键不会中断。
is_superuser
Bollean 类型。指定该用户具有所有权限而不明确分配它们。
last_login
用户上次登录的日期时间。
date_joined
指定帐户何时创建的日期时间。在创建帐户时默认设置为当前日期/时间
class UserView(APIView): def get(self, request: Request): """ 登录 :param request: :return: """ user_info = request.data username, password = user_info.get("username"), \ user_info.get("password") print(username, password) ret = authenticate( username=username, password=password ) print(ret) return Response({"status": False if ret is None else True}) def post(self, request): """ 注册 :param request: :return: """ user_info = request.data username, password = user_info.get("username"), \ user_info.get("password") try: User.objects.get(username=username) return Response({"status": False}) except User.DoesNotExist: ret = User.objects.create_user( username=username, password=password, email="??@qq.com" ) return Response({"status": True})
踩坑记录:
创建用户使用的是create_user函数 而不是create
from django.contrib.auth import login, logout
user = User.objects.get(username="xxx")
user.set_password("2323") # 设置密码
user.save()
def lv(req):
user = authenticate(username=urn, password=pwd)# 首先校验
a = login(req, user) if user else user is None # 保持登录状态 存放在session中
@login_required # 判断当前有用户登录
def user_center(req):
login_user = req.user # 获取用户
logout(req) # 登出
我们可以内建抽象user模型类
前提 我们需要在一个没有进行migrate的情况下使用该方法
步骤:
python manage.py startapp myNewUser
AbstractUser
AUTH_USER_MODEL = "应用名.类名"
EG:
模型:
from django.contrib.auth.models import AbstractUser
from django.db import models
class MyNewUser(AbstractUser):
phone = models.CharField(max_length=11, default="")
# 在这里添加新的字段
设置:
AUTH_USER_MODEL = "myNewUser.MyNewUser"
def perform_authentication(self, request):
request.user
源码中就做了一件事调用了request的user属性方法
@property
def user(self):
if not hasattr(self, '_user'):
with wrap_attributeerrors():
self._authenticate()
return self._user
没有_user的时候我们就去_authenticate
里面一探究竟。
def _authenticate(self):
for authenticator in self.authenticators:
try:
user_auth_tuple = authenticator.authenticate(self)
except exceptions.APIException:
self._not_authenticated()
raise
if user_auth_tuple is not None:
self._authenticator = authenticator
self.user, self.auth = user_auth_tuple
return
self._not_authenticated()
至于这个authenticators是什么我们还得回去看看我们dispatch创建request对象的地方:
file:view.py ... def initialize_request(self, request, *args, **kwargs): """ Returns the initial request object. """ parser_context = self.get_parser_context(request) return Request( request, parsers=self.get_parsers(), authenticators=self.get_authenticators(), negotiator=self.get_content_negotiator(), parser_context=parser_context ) ..... def get_authenticators(self): return [auth() for auth in self.authentication_classes]
至于这个authentication_classes是什么我们可以前往rest_framework的Setting(不是项目的Setting!)里面的 DEFAULTS
里面进行查看。
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.SessionAuthentication',
'rest_framework.authentication.BasicAuthentication'
],
我们可以看到这里面是一个默认存储了Session的验证机制(Session验证机制是基于Basic的所以需要加入)
因为在perform_authentication
中我们调用了了Request的user 也就是获取属性方法 - user的返回值 其属性方法主要的内容就是self._authenticate
, 而 self._authenticate
做的主要的事情就是做的主要的事情就拿到authenticators
里面的对象的authenticate
方法的返回值 默认返回一个元组 格式: (user, token)
。
然后将返回值返回给 request
的 user
和auth
所以我们可以在自己的StudentView(自定义的视图里面进行 重写authentication_classes)
class StudentView(ModelViewSet):
authentication_classes = [
]
queryset = Student.objects.all()
serializer_class = SerailizerStudent
这是局部的配置 如果要做全局的配置那么就再Setting里面进行设置
REST_FRAMEWORK = {
"DEFAULT_AUTHENTICATION_CLASSES":[]
}
我们可以先看看默认的用户是什么, 通过request.user
获取
class StudentView(ModelViewSet):
authentication_classes = [
]
queryset = Student.objects.all()
serializer_class = SerailizerStudent
def retrieve(self, reqest, *args, **kwargs):
print(reqest.user)
return super().retrieve(reqest, *args, **kwargs)
通过 http://127.0.0.1:8000/student/1
获得到了: AnonymousUser
也就是匿名用户 好的我们给这个Views一个局部的认证器罢:
class Authen(BaseAuthentication):
def authenticate(self, request):
return ("NoAdmin", None)
class StudentView(ModelViewSet):
authentication_classes = [
Authen
]
...
认证器可以返回3个值:
- 元组 (1,2) -> 表示认证通过 在视图中request中 request.user 就是第一个值,request.auth就是元组的第二个值
- 返回None 谁都不管 继续执行其他认证器
- 异常 结束执行
class Authen2(BaseAuthentication): def authenticate(self, request:Request): user = request.META.get('HTTP_X_USERNAME') # 获取请求头需要 在请求头前面加上HTTP_ print(user) if user: try: user_obj = User.objects.get(username=user) return (user_obj, None) except User.DoesNotExist: return None return None class Permissions(BasePermission): def has_permission(self, request, view): print(request.user.username) if request.user.is_superuser == 1: print("超级用户来啦") return True return False class StudentView(ModelViewSet): authentication_classes = [ Authen2 ] permission_classes = [ Permissions ] queryset = Student.objects.all() serializer_class = SerailizerStudent def retrieve(self, reqest, *args, **kwargs): print(reqest.user) print(reqest.auth) return super().retrieve(reqest, *args, **kwargs)
权限同理 到get_permissions
中获取由 self.permission_classes
构成的对象列表 默认是在DEFAULTS
里的PERMISSION_CLASSES
里面
'DEFAULT_PERMISSION_CLASSES': [
'rest_framework.permissions.AllowAny',
],
这个表示的是允许任何权限
class AllowAny(BasePermission):
def has_permission(self, request, view):
return True
AllowAny 就是无脑返回True
k def check_permissions(self, request):
for permission in self.get_permissions():
if not permission.has_permission(request, self):
# 在权限列表中,只要有一个权限对象返回的是 False 那么就是无权限
self.permission_denied(
request,
message=getattr(permission, 'message', None),
code=getattr(permission, 'code', None)
)
每次对于热数据的访问,都需要使用Mysql等数据库进行增删改查,为了减小数据库的压力我们通常将热数据放在缓存中进行操作
- 给定一个服务请求
- 查询页面
- 如果缓存命中: 返回缓存的页面
- 如果缓存不命中: 生成页面,将页面进行缓存(可以采用
lru
等算法) 并返回页面
unique-snowflake
雪花缓存算法进行内存地址寻址而不是指定地址先pip一个库: pip install python-memcached
CACHES = {
'default':{
"BACKEND": # 引擎策略
"django.core.cache.backends.memcached.MemcachedCache",
"LOCATION": "127.0.0.1:11211", # IP地址 可以是列表 代表一个集群
"TIMEOUT": 300, # 保存的过期时间,
"OPTIONS":{
"MAX_ENTRIES": 300, # 缓存最大的数据量
"CULL_FREQUENCY": 2 # 缓存条数达到最大值的时候 删除 1/2(2是字面量) 条数据
}
}
}
其中LOACTION可以是一个IP的列表 代表一个缓存的集群
配置:
CACHES = {
'default':{
"BACKEND":"django.core.cache.backends.db.DatabaseCache",
"LOCATION": "my_cache_table", # 表名称
"TIMEOUT": 300
}
}
使用命令 python manage.py createcachetable
在数据库中创建指定名称的缓存表 并且使用 mirate
进行数据表的迁移
创建好的数据表有三个字段: cache_key value expires 键 值 过期时间
将某个视图函数直接进行缓存
@cache_page(30)
def retrieve(self, reqest, *args, **kwargs):
if reqest.user.is_staff == 1:
cache1 = cache.get("adminUser")
print(cache1)
if cache1 is None:
print("缓存未命中")
cache.set('adminUser', reqest.user.username, 30)
cache1 = cache.get("adminUser")
print("cache", cache1)
return super().retrieve(reqest, *args, **kwargs)
@cache_page(30)
def get_html(req, username):
s = time.time()
user = User.objects.get(username=username)
return HttpResponse(f"<h1>HelloWorld, 你是员工吗:{user.is_staff} <hr> 消耗时间:{time.time() - s}</h1>")
如果我们开启了缓存会发现 所显示的消耗的时间是不会被改变的 因为他已经将固定的返回内容进行了缓存 导致界面无法进行更新
我们观察上面的代码 其实慢就满载对User模型的查询部分我们能不能对其进行单独的提取从而进行缓存呢? 答案是可以的
我们在Setting里面的配置项可以是多个 其中默认是default
那个表示默认的default. 我们使用from django.core.cache import cache
可以直接使用 default
配置项里面的数据,但是你也可以使用from django.core.cache import caches
然后 caches['default']
也可以
from django.core.cache import cache
...
cache.set("hot_data_blog", "【热门】", 20) # 设置热点数据到缓存服务器上
cache.get("hot_data_blog") # 获取
cache.add("hot_data", "Value")# 添加数据 只有key值不存在才生效
例如代码:
class StudentView(ModelViewSet): authentication_classes = [ Authen2 ] permission_classes = [ IsAuthenticated ] queryset = Student.objects.all() serializer_class = SerailizerStudent def retrieve(self, reqest, *args, **kwargs): if reqest.user.is_staff == 1: cache1 = cache.get("adminUser") print(cache1) if cache1 is None: print("缓存未命中") cache.set('adminUser', reqest.user.username, 30) cache1 = cache.get("adminUser") print("cache", cache1) return super().retrieve(reqest, *args, **kwargs)
浏览器请求某个路由的时候
中间件是一个请求/响应处理的钩子框架 轻量 低级的插件 用于全局改变输入和输出
当请求发送给服务器的时候 中间件可以在主路由前 视图前做一定的处理(或者是拦截) 或者是服务器发送响应给浏览器的时候也需要通过中间件 当然中间件的类型还有很多 不一一举例
中间件以类定义 需要继承类 django.utils.deprecation.MiddlewareMixin
类
from django.utils.deprecation import MiddlewareMixin class RouterMiddleWare(MiddlewareMixin): """ 一个合格的中间件 需要有以下的函数 可以是多个! 以下函数都需要返回数据: None: 继续王下周 HttpResponse: 结束了 不用继续了 我把数据返回给你就是 """ def process_request(self, request, callback=None, callback_args=None, callback_kwargs=None): """ 执行路由之前被调用 在每个请求上调用 """ print("-process_request-") def process_view(self, request,callback=None, callback_args=None, callback_kwargs=None): """ 调用视图之前被调用 """ print("-process_view-") def process_response(self, request, response,callback=None, callback_args=None, callback_kwargs=None): """ 所有响应返回给浏览器被调用 只能返回 HttpResponse """ print("-process_response-") return response
我们可以在setting里面进的MiddleWare进行注册
在视图返回数据之前 按照注册顺序执行 在视图函数之后 注册顺序的逆序执行
让浏览器每次指定访问次数
from django.http import HttpResponse from django.utils.deprecation import MiddlewareMixin from collections import defaultdict import schedule class LimitRequestRoute(MiddlewareMixin): visit_log = defaultdict(lambda :0) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) schedule.every(10).seconds.do(self.clear) print("[中间件/LimitRequestRoute]: 开始执行数据") def clear(self): self.visit_log.clear() print("[中间件/LimitRequestRoute]: 缓存被清空") def process_request(self, request, **kwargs): value = request.META.get("REMOTE_ADDR") schedule.run_pending() if self.visit_log[value] <= 4: self.visit_log[value] += 1 print(self.visit_log) else: return HttpResponse("你的访问次数超过正常值 请在10秒后重试")
有一个叫做Paginator
的类 很nice
page = Paginator(object_list, pre_pages)
- object_list 数据列表
- pre_pages 每页的数据量
它有这些
、数据
InvalidPage
的异常我们先做个前端:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> <script src="https://cdn.staticfile.org/jquery/3.6.1/jquery.min.js"></script> </head> <body> <style> #form { width: 60%; margin: auto; } input { display: block; width: 100%; margin-bottom: 20px; height: 50px; font-size: 2rem; } </style> <form id="form" method="post" enctype="multipart/form-data" action="/file/upload/"> <input type="file" id="ac"> <input type="submit" value="登录" id="dl"> </form> </body> </html>
后端:
def file_update(req: Request):
if req.method == 'POST':
file: InMemoryUploadedFile = req.FILES.get('user_file')
print(type(file))
if file is None: return HttpResponse("上传失败 - 文件为空")
filename = file.name
return HttpResponse(f"你的文件是{filename}, 大小 {file.size} KB")
else:
return HttpResponse("不支持的类型")
在Django中 我们将客户端上传的资源称之为: media
资源 区别于 static
资源
这是对 media
资源的配置
MEDIA_URL = "/media/" # 加载资源的时候 路由需要以MEDIA_URL开头
MEDIA_ROOT = os.path.join(BASE_DIR, "media")
我们还需要在主路由里面配置MEDIA_URL
设置的路由
from django.conf import settings
from django.conf.urls import static
urlpatterns += static(
settings.MEDIA_URL,
document_root=settings.MEDIA_ROOT
)
file_path = os.path.join(settings.MEDIA_ROOT,file.name)
with open(file_path, 'wb') as f:
f.write(file.file.read())
缺点:容易重复名称
1: 创建一个模型
from django.db import models
class Content(models.Model):
title = models.CharField("name", max_length=11)
path = models.FileField(upload_to="picture") # upload_to 表示的是子目录
def file_update(req: Request):
if req.method == 'POST':
file: InMemoryUploadedFile = req.FILES.get('user_file')
print(type(file))
if file is None: return HttpResponse("上传失败 - 文件为空")
filename = file.name
Content.objects.create(title=filename, path=file)
return HttpResponse(f"你的文件是{filename}, 大小 {file.size} KB")
else:
return HttpResponse("不支持的类型")
部署机器: Centos
runserver 的能力很差 我们一般使用 uwsgi 进行
是一个新的web网关接口 是将Python应用放在web服务器之间的一种接口 被广泛的使用 我们平时使用的runserver
是一种很拉的测试环境使用的接口(但是如果没有runserver django根本和http无关).
通俗的说 wsgi就是一种http和django之间的翻译而且实现了uwsgi的协议功能完善协议众多。
我们可以使用 pip install uwsgi
进行安装
报错解决:
yum install -y gcc* pcre-devel openssl-devel
yum install -y python-devel
# 或者是
yum install -y python3-devel
pip3 install uwsgi
我们要在于主应用目录中设置一个ini配置文件
[uwsgi]
#socket=0.0.0.0:9876
http=0.0.0.0:9876 ;让uwsgi 默认在9876端口开启http服务
chdir=/xxx/xxx/xxx/ ; 项目的路径 必须是绝对路径
wsgi-file=blog/wsgi.py ; 告诉uwsgi: wsgi的路径 相对路径
process=1
thread=2
pidfile=us.pid ; 服务的pid
daemonize=x.log ; 日志
master=true ; 主进程的管理
然后我们直接运行:
uwsgi --ini uwsgi.ini
重复启动uWsgi 导致pid文件中的进程号失准
解决方式: kill -f uwsgi -9
jwt 就是: json web token
,用于 前后端分离项目|app|微信小程序 中用户的认证,他可以将原始的数据json加密成我们看不懂的字符串,通过后台将加密的字符串交给前台
> pip install djangorestframework-jwt
1、 用户发送密码和账号进行登录 服务器收到之后发送一串随机的字符串我们称为token 交给用户
2、用户拿到之后存储, 在下次登录的时候拿着token交给服务器服务器进行验证
import uuid # Create your views here. from rest_framework.request import Request from rest_framework.response import Response from rest_framework.serializers import ModelSerializer from rest_framework.views import APIView from User.models import User class UserSerial(ModelSerializer): class Meta: model = User fiedles = ['username', "password"] class UserLogin(APIView): def put(self, request: Request): """ 修改密码 默认只有登录成功才可以修改 :param request: :return: """ req = request.data token = req.get('token') urn = req.get('username') newpwd = req.get('newpassword') if not token: return Response({"status": 201, 'error': "错误的登录状态-你没有登录!"}) else: ret = User.objects.get(username=urn) if ret.token != token: return Response({"status":201, 'error': "小黑子伪造了错误的令牌"}) ret.password = newpwd ret.save() return Response({"status":200, 'error':"修改成功"}) def get(self, request: Request): """ 登录 :param req: :return: """ req = request.data urn = req.get('username') pwd = req.get('password') print(urn, pwd) ret = User.objects.filter(username=urn, password=pwd) print(ret) if not ret: return Response({"status": 201, "code": "error_login"}) random_str = str(uuid.uuid4()) i = ret[0] i.token = random_str i.save() return Response({'status': 200, 'code': random_str}) def post(self, request: Request): """ 注册 :param request: :return: """ req = request.data urn = req.get('username') pwd = req.get('password') userSerial = UserSerial(data=request.data) if userSerial.is_valid(): User.objects.create(**userSerial.validated_data) return Response(userSerial.validated_data) else: return Response(userSerial.errors)
代码的主要思路就是:
- 用户在某个登录操作之后 系统记录下用户的token并交给客户端
- 下次客户端在基于登录之后对账户具体的操作的时候对比记录的token和传来的ajax的请求token是否一致 一致再进行操作
用户登录的时候 用户部分保存JWT 但是服务端不保存 以后用户再进行访问的时候携token 服务端进行校验,优势:相较于传统的token无需在服务端保存token
第一步: 用户提交表单信息给服务器(用户名 密码) 使用jwt创建一个token并交给用户进行返回
jwt的token是三段字符串 并且用.链接
第一段字符串: header 内部包含 alg:HS256, typ:“JWT”
- 就是使用的加密算法是HS256 使用的类型是JWT
第二段是自定义的用户的信息
{id:"123123",name:"sadasd", exp:123123}
exp 是过期时间 因为信息是可以被反解的 所以不能把敏感信息凡在里面第三段 第一部分的明文和第二部分的明文拼接起来并进行加密 加密算法就是第一段alg指向的值 当然加了盐 加完密之后再加上了base64url
第二步: 以后用户再来访问的时候需要携带token 后端需要对token进行校验
校验过程
- 获取token,对token进行切割
- 对第二段进行base64url解密
- 获取第二段的json信息 检测过期时间是否超时
第三步: 把第一二端进行一系列的解密对比两个密文如果一致就是没修改过
因为服务器对第三段的信息进行了加盐 用户无法知道盐的具体值 所以无法进行解密
class JWTUserLogin(APIView): def put(self, request: Request): """ 修改密码 默认只有登录成功才可以修改 :param request: :return: """ req = request.data token = req.get('token') newpwd = req.get('newpassword') salt = settings.JWT_SALT try: payload = jwt.decode(token, salt, True) # 对token进行校验 如果失效抛出异常 except Exception as e: return Response({ "status": 201, 'error': "错误的令牌" }) username :str= payload.get("urn") try: user = User.objects.get(username=username) except: return Response({ "status": 201, "data": f"用户不存在了" }) return Response({ "status": 200, "data": f"你好, {user.username}" }) def get(self, request: Request): """ 登录 :param req: :return: """ req = request.data urn = req.get('username') pwd = req.get('password') print(urn, pwd) ret = User.objects.filter(username=urn, password=pwd) print(ret) if not ret: return Response({"status": 201, "code": "error_login"}) header = { "alg": "HS256", "typ": "JWT" } # 不写 默认也是他 payload = { "urn": ret[0].username, "uid": ret[0].id, # "exp": ... } token = jwt.encode( payload=payload, key=settings.JWT_SALT, headers=header, algorithm="HS256" ).decode('utf-8') return Response({'status': 200, 'code': token})
我们可以将token的验证过程交给一个验证器来完成
class JwtAuth(BaseAuthentication): def authenticate(self, request): if request.method == "GET": return None req = request.data token = req.get('token') salt = settings.JWT_SALT try: payload = jwt.decode(token, salt, True) # 对token进行校验 如果失效抛出异常 except Exception as e: raise AuthenticationFailed({ "status": 201, 'error': "令牌错误 小黑子伪造了吧" }) username: str = payload.get("urn") try: user = User.objects.get(username=username) except: return AuthenticationFailed({ "status": 201, "data": f"用户不存在了" }) return (user, None) class prologin(APIView): authentication_classes = [JwtAuth, ] def put(self, req): return Response({"status":200, "data":"Okay"}) def get(self, req): JWTUserLogin().get(request=req)
rabbitmq
是一个消息队列 消息队列也称之为消息中间件
队列是一个先进先出的数据结构(FIFO)
我们知道的 线程和进程python已经实现了队列 为什么还有Rabbit MQ呢
不同的语言(Java和C#和python的通讯)或者是Python本身实现的不同主机的分布式通讯
传统的web服务是来一个请求我处理一个 如果服务器并发量是10 突然来了11人会导致服务器的崩溃! 为了解决这个问题我们可以使用一个队列 存储从客户端请求来的消息 服务器每次从消息队列中取出自己合适数量的消息。这就是高并发的流量削峰。
某个系统 有订单 骑士 商家 后台四个单元模块 用户下订单之后订单系统收到请求 并将请求发送一份给骑士系统 骑士系统处理完毕之后 请求交给商家来处理 商家处理完之后后台进行记录 系列流程执行完毕之后通过订单系统交给客户
这样的链式架构往往有一个问题 其中有一环如果出现了那就Bug了导致流程无法正常执行 有很大的问题 而且时间的消耗也是极其巨大的
如果换成了由A模块并起之后所有的模块 虽然性能表现较好与避免了之前环环相扣的问题,但是我们代码的架构就变成了高耦合低内聚的架构了 每次添加一个新的模块都要在订单系统进行该模块的并发代码编写 很是不好!
我们可以创建一个消息队列 然后订单系统将需要处理的数据交给队列 队列拿到之后 监听队列的模块们不约而同的过来拿去数据 一人一份然后进行处理 (像极了你在家喂一群猫猫狗狗的样子)
这就是一种生产者消费者模式
rabbitmq 有两个模式
如果是生产者消费者模式,我们需要一个往队列里存任务的实例和一个监听队列并获取任务的实例
职责: 链接RabbitMQ 向队列里面添加消息
流程
RabbitMQ
import pika auth_user = pika.PlainCredentials("admin", "20021003") # 创建一个用户认证器 conn = pika.BlockingConnection( pika.ConnectionParameters( "192.168.236.128", credentials=auth_user ) ) # 链接指定的 RabbitMQ 并指定用户认证器 channel = conn.channel() # 获取一个频道 他可以对RabbitMQ继续控制 channel.queue_declare(queue="new_task") # 创建队列 并指定一个队列的名称为 new_task channel.basic_publish( exchange="", # 这是交换机参数 因为是简单模式 所以为空 routing_key="new_task", # 指定的队列名称 body="你好我的上帝" # 插入的数据 ) # 向指定的队列插入数据
职责: 监听MQ 并获取数据
流程:
import pika auth_user = pika.PlainCredentials("admin", "20021003") # 创建一个用户认证器 conn = pika.BlockingConnection( pika.ConnectionParameters( "192.168.236.128", credentials=auth_user ) ) # 链接指定的 RabbitMQ 并指定用户认证器 channel = conn.channel() # 获取一个频道 他可以对RabbitMQ继续控制 channel.queue_declare(queue="new_task") # 创建队列 如果存在默认跳过 # 确认回调函数 def call_back(ch, method, properties, body): body:bytes print(f"[*] 检测到消息 {body.decode('utf-8')}") # 确定监听队列 channel.basic_consume( queue="new_task", # 监听队列 auto_ack= True, # 默认应答 on_message_callback=call_back # 回调 ) channel.start_consuming()
这是全部的代码总览
import time from threading import Thread import pika def _getter(): auth_user = pika.PlainCredentials("admin", "20021003") # 创建一个用户认证器 conn = pika.BlockingConnection( pika.ConnectionParameters( "192.168.236.128", credentials=auth_user ) ) channel = conn.channel() # 获取一个频道 他可以对RabbitMQ继续控制 channel.queue_declare(queue="new_task") # 创建队列 如果存在默认跳过 # 确认回调函数 def call_back(ch, method, properties, body): body:bytes time.sleep(.1) print(f"[*] 检测到消息 {body.decode('utf-8')}") # 确定监听队列 channel.basic_consume( queue="new_task", # 监听队列 auto_ack= True, # 默认应答 on_message_callback=call_back # 回调 ) channel.start_consuming() def _putter(): import random auth_user = pika.PlainCredentials("admin", "20021003") # 创建一个用户认证器 conn = pika.BlockingConnection( pika.ConnectionParameters( "192.168.236.128", credentials=auth_user ) ) channel =conn.channel() channel.queue_declare("new_task") while 1: channel.basic_publish( routing_key='new_task', exchange="", body=f"我是一个一个的{random.randint(1,100)}数据啊" ) if __name__ == '__main__': t1 = Thread(target=_getter) t2 = Thread(target=_putter) t1.start() t2.start()
basic_consume
可以设置我们的默认的应答参数: auto_ack
当数据被消费者取走的时候执行callback函数 如果callback有bug或者是需要重新做一个方案 我们重写了消费者的代码 但是当我们再次从数据中去数据的时候,因为数据早早的被取走了所以已经没有了
将默认应答修改为手动应答即可
... def call_back(ch, method, properties, body): body:bytes # time.sleep(.1) str_body = body.decode('utf-8') if str_body != "我是一个一个的52数据啊": print(f"[*] 检测到消息 {str_body}") ch.basic_ack(delivery_tag=method.delivery_tag) # 手动发送删除消息的代码 else: print("有错误啦!") raise KeyError("我处理不了52的数据 呜呜呜呜") # 确定监听队列 channel.basic_consume( queue="new_task", # 监听队列 auto_ack= False, # 默认应答 on_message_callback=call_back # 回调 ) ...
如果生产者将数据存在了MQ 但是消费者还没来得及去获取 MQ 就绷不住了 挂了,导致在队列里面的数据全部丢失, 怎么办?
在创建队列的方法queue_declare
添加参数 durable=True
,注意当申明一个队列之后就无法对其的持久化进行修改了, 单单一个队列具有持久化的能力还不够,我们需要对他publish的数据也进行持久化的申明:
channel.basic_publish(
exchange="",
...,
properties=pika.BasicProperties(
delivery_mode=2
)
)
如果应用场景里出现了多个消费者 (似乎现实也是) 生产者在某队列A中存放了一个数据 但是A B两个生产者都监听了该队列 这个数据究竟交给谁好呢?(人手一份? 那是发布订阅模式)默认的简单分发机制称之为轮询机制,一人一个谁也不许多谁也不兴少,如果生产者上传了一个数据给队列 队列有多个客户端在监听 那么这个数据会给最先监听队列的那个消费者。以此类推
这种轮询的方式固然不错 但是当数据交给某个消费者的时候,消费者本身处理数据的能力特别的慢,导致轮询无法继续往下执行 不好
我们指定两个消费者 一个消费者的callback我们设置sleep(2)表示该消费者的处理速度为2s 另外一个sleep(1)表示速度为1s.
我们是可以在消费者使用 channle.basic_qos(prefeth_count=1)
设置一个公平的分发机制。谁干活干完了就把数据交给谁。
发布订阅模式主要的功能是通过上传者开辟一个交换机 所有的客户端准备一个队列 并订阅某个交换机后 每当生产者生产一份数据给交换机 所有订阅交换机的队列们将会立刻人手一份生产者生产的新数据
在发布订阅模式下我们的生产者消费者在做什么呢?
RabbitMQ
import pika auth = pika.PlainCredentials('admin','20021003') conn = pika.BlockingConnection( pika.ConnectionParameters( "192.168.236.128", credentials=auth ) ) chl = conn.channel() chl.exchange_declare( exchange="tasks", # 一个交换寄的名称 exchange_type="fanout" # 生产交换机的模式 这里是发布订阅模式 ) # 生成一个交换机 chl.basic_publish( exchange="tasks", routing_key='', body="这是一个消息 你可以进行订阅" )
RabbitMQ
import time import pika auth_user = pika.PlainCredentials('admin', "20021003") connect = pika.BlockingConnection( pika.ConnectionParameters( '192.168.236.128', credentials=auth_user ) ) chl_tmp = connect.channel() exchange_name = "exchage_test" chl_tmp.exchange_declare(exchange_name, exchange_type="fanout") # 创建一个全局的交换机 chl_tmp.queue_declare('a') def putter(): """ 生产者 :return: """ auth_user = pika.PlainCredentials('admin', "20021003") connect = pika.BlockingConnection( pika.ConnectionParameters( '192.168.236.128', credentials=auth_user ) ) chl = connect.channel() exchange_name = "exchage_test" chl.exchange_declare(exchange_name, exchange_type="fanout") # 创建一个全局的交换机 chl.queue_declare('a') chl.basic_publish( exchange=exchange_name, routing_key='a', body="23" ) def __call_back(chl, method, properties, body): body:bytes print(body.decode("utf-8")) def getter(): chl = connect.channel() queue = chl.queue_declare("", exclusive=True) queue_name = queue.method.queue chl.queue_bind( exchange=exchange_name, queue=queue_name ) chl.basic_consume( queue=queue_name, auto_ack=True, on_message_callback=__call_back ) chl.start_consuming() print("【生产者】") from threading import Thread t1 = Thread(target=putter) t2 = Thread(target=getter) t1.start() t2.start()
踩坑:
如果你是顺序执行代码的 请先启动消费者 在启动生产者
关键字模式是发布订阅模式的一个增强,我们知道绑定交换机的队列一旦知道交换机被赋予了新的值就会立刻获取一份拷贝数据 以至于人手一份,但是有些数据可能对某个具体的消费者是无用的 有些是有用的,换句话来说就是 不想让交换机发送的所有数据订阅的队列都有响应,这是极其不好的事情。
现在RabbitMQ有了一个新的方案: 队列绑定某个交换机的时候,顺便会携带一个关键字的参数,生产者在向交换机发送数据的时候也会携带一个关键字 如果关键字匹配成功那么就发送反之不发送。
关键字参数就是我们 queue_bind
的 route_key
和 basic_publish
的 route_key
如果需要多个可以 调用多次 queue_bind
或者 basic_publish
import pika exchange = "direct_test" def connect(): auth = pika.PlainCredentials('admin', '20021003') connect = pika.BlockingConnection( pika.ConnectionParameters( '192.168.236.128', credentials=auth ) ) return connect def init(): chl = connect().channel() chl.exchange_declare(exchange=exchange, exchange_type="direct") def putter(key=None): c = connect() chl = c.channel() while True: chl.basic_publish( exchange=exchange, routing_key=key, body=f"我的关键字是 {key}" ) print(f"关键字{key}的交换机数据已发送") def __callback(chl, method, property, body): body:bytes print(f"指定关键字{chl.routing_key}消费者已收到数据: {body.decode('utf-8')}") def getter(key=None): c = connect() chl = c.channel() ret = chl.queue_declare("", exclusive=True) queuename: str = ret.method.queue chl.queue_bind( exchange=exchange, queue=queuename, routing_key=key # 绑定一个关键字 ) # 如果需要绑定一个关键字 可以再写一个 queue_bind chl.routing_key = key chl.basic_consume( queue=queuename, auto_ack=True, on_message_callback=__callback ) print(f"关键字{key}的选手开始监听") chl.start_consuming() if __name__ == '__main__': import threading init() threading.Thread(target=getter, args=("admin",)).start() threading.Thread(target=getter, args=("guest",)).start() threading.Thread(target=putter, args=('admin',)).start() threading.Thread(target=putter, args=('guest',)).start()
关键字模式需要交换机和队列绑定的关键字需要完全匹配 但是我们可以通过通配符进行模糊匹配 使用方法就是 原来使用 basic_publish
设置的 route_key
不变 改变的是我们的 queue_bind
的值。 # 表示任意多个字符 , * 表示一个字符(这个和window的文件搜索引擎的通配符似乎是不大一样的…)
在RabbitMQ的过程中我们依旧会发现问题的所在
目前的架构我们可以得知: 任务在分发的时候 先会将任务放在交换机中 订阅交换机的队列从中去数据
我们可以使用多路复用技术或者是多线程进程 但是太麻烦了 我们可以直接使用Celery来实现并发的技术
是一个分布式系统 处理异步任务支持任务调度
消息中间件
任务执行单元
任务结果存储f
本教程使用
Redis
消费者
生产者
将异步函数方引用给Celery
对异步函数进行方法的调用就可以执行所需要的实现的效果
底层实现的还是 类似交换机模式 的实现
import time import celery backend = "redis://127.0.0.1:6379/1" # 结果集 brokr = "redis://127.0.0.1:6379/2" # 消息中间件 cel = celery.Celery( # 创建一个异步任务 "test", backend=backend, broker=brokr # 消息中间件 ) @cel.task def send_email(name): print("正在准备短消息的发送") time.sleep(10) print("f") return "OJBK"
然后我们使用命令行启动:
如果是在windows启动还需要: --pool=solo
这个参数
其中__init__
是文件的名称(不需要.py!)
它能够做的事情:启动一个worker 监听任务
from getter import *
result = send_email.delay("yuan")
print(result.id)
result2 = send_msg.delay("alex")
print(result2.id)
当一个celery的任务监听到数据之后并执行完毕之后 会将返回的结果放在我们指定的redis数据库中 —— backend
我们可以通过 delay
返回的对象的 id
属性在 redis数据库中进行获取
taskid = result2.id
async_ret = AsyncResult(
id=taskid,
app=cel
)
... # 我可以做任何我想做的事情在这里
if async_ret.successful():
print("成功!")
ret = async_ret.get() # 获取任务的返回值
print(ret)
我们可以创建一个新的包 包里面存放我们的所有监听任务 也就是我们的消费者,其他功能不变
celery.py
- 一些对Celery的配置文件import celery
cel = celery.Celery(
"celery_demo",
backend="redis://127.0.0.1:6379/4", # 结果集
broker="redis://127.0.0.1:6379/5", # 消息中间件
include=[ # 包含任务所在的文件 对多个任务可以做模块化的分类
"tasks_group.task1",
"tasks_group.task2",
]
)
cel.conf.timezone = "Asia/Shanghai"
task01 之类的代码
- 任务 (消费者)import time
from tasks_group._celery import cel
@cel.task
def send_email(res):
print(f'我在发送邮件 内容是{res}')
time.sleep(10)
return "ok"
from tasks_group.task2 import send_msg
from tasks_group.task1 import send_email
ret = send_msg.delay("Hello")
print(ret.id)
ret = send_email.delay("Hello")
print(ret.id)
使用 国标时间 传入 apply_async
方法的 eta
参数。可以进行定时任务的设置
import datetime
from tasks import send_msg as tsend_msg
ctime = datetime.datetime(2022, 10, 16, 23, 14)
print(ctime)
utc = datetime.datetime.utcfromtimestamp(
ctime.timestamp()
)
print(utc)
ret = tsend_msg.apply_async(args=["Hello", ], eta=utc)
print(f'ret: {ret.id}')
在 2022-10-16 23:14:00
点发送一个 发送消息的任务
from datetime import timedelta from celery.schedules import crontab import celery cel = celery.Celery( "mode_time_tasks", backend="redis://127.0.0.1:6379/1", broker="redis://127.0.0.1:6379/3", include=[ "tasks_time.task1" ] ) cel.conf.timezone = "Asia/Shanghai" cel.conf.enable_utc = False cel.conf.beat_schedule = { "every10m": { # 该key可以自定义 "task": "tasks_time.task1.send_email", # 指定的任务 "schedule": timedelta(seconds=10), # 每十秒执行一次 # "schedule": 10, # 上一行的代码的简写 # "schedule": crontab(minute="*/1"), # 每一分执行一次 * 表示每 "args": ("就是给你发消息",) # 参数 } }
这样配置之后我们可以直接使用消费者进行 从而不再对生产者进行编码了 为了让这个定时任务跑起来 我们需一行命令 每个指定的时间就向 redis队列
放置任务: celery -A tasks_time._celery beat
我们需要在django
项目里生成一个包 celery
包的里面装着许多的任务包这些就是我们的消费者,每个消费者包里面需要一个 task.py
这里面存放着我们的消费函数,回到 celery
包下面 创建一个config 和一个 main
大致结构像这样:
接下来代码部分:
main.py
import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "celeryInDjango.settings") # 将celery绑定给django 需要放在最前面 避免使用django数据库无效的问题 from celery import Celery app = Celery('taskManager') app.config_from_object("celery_tasks.config") # 加载配置文件 app.autodiscover_tasks( ['celery_tasks.async_task', "celery_tasks.time_task"] ) # 会自动加载指定目录的 `task.py` 文件
config
broker_url = "redis://127.0.0.1:6379/5"
result_backend = "redis://127.0.0.1:6379/4"
某个task:
from celery_tasks.main import app
import time
@app.task
def send_msg(msg):
print(f"【celery - 异步任务】正在发送代码{msg}")
time.sleep(10)
print("END")
return "ok"
某个view:
# Create your views here. from celery.result import AsyncResult from rest_framework.request import Request from rest_framework.response import Response from rest_framework.views import APIView from celery_tasks.async_task import tasks from celery_tasks.main import app class test(APIView): def get(self, request: Request): id = request.data.get('id') async_ret = AsyncResult( id=id, app=app ) print(async_ret.status) if async_ret.successful(): return Response({"ret": async_ret.get()}) else: return Response({"ret": "数据还在处理 请稍后"}) def post(self, request): ret = tasks.send_msg.delay("hello") return Response({"status": "OJBK", "id": ret.id})
启动celery
的指令: celery -A celery_tasks.main worker -l info --pool=solo
启动django的指令: python manage.py runserver
.
对于Django-redis的官方文档:https://django-redis-chs.readthedocs.io/zh_CN/latest/
我们在安装完毕 django-redis 之后可以进行类似的操作:
CACHES = { "default": { "BACKEND": "django_redis.cache.RedisCache", "LOCATION": "redis://127.0.0.1:6379/15", "OPTIONS": { "CLIENTS_CLASS": "django_redis.client.DefaultClient", "REDIS_CLIENT_CLASS" : "fakeredis.FakeStrictRedis", "PICKLE_VERSION": -1, # 使用最新的一个 pickle序列化版本进行存储python的对象 "IGNORE_EXCEPTIONS": True, # 忽略异常 } } } SESSION_ENGINE = "django.contrib.sessions.backends.cache" # 让 session 利用 缓存存储 SESSION_CACHE_ALIAS = "default" # 指向的是 CACHES 某个指定的键 # 别名
from django.core.cache import cache
from django.http import HttpResponse
# Create your views here.
def PutReids(request):
cache.set("key", "asdasd")
return HttpResponse("已缓存")
def GetReids(request):
return HttpResponse(cache.get("key"))
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。