当前位置:   article > 正文

【DjangoDRF+缓存+JWT+RabbitMQ 七万字总结】

【DjangoDRF+缓存+JWT+RabbitMQ 七万字总结】

百年沉浮困低谷,莫以今朝度兴衰, 人生终有高飞日,傲振雄翅过沧海.

更新记录

2022-10-24
celery
django-redis
祝各位程序员节快乐

Redis

Redis 由 Vmware 公司开发, (卧槽 虚拟机)! 因为考虑有没有学习Linux的同学 基础篇使用Windows的Redis入门 后期会进入Linux的Redis学习阶段

NoSql 的简介

有些数据用Mysql这种数据库存储很不nice!譬如一些图片 秒杀活动之类的数据 这个时候就可用用到Redis类似的NoSql来存储了

高速缓存技术

用内存存储数据的技术 从而提高curd(增删改查)速度

扯句题外话kodachi是一个存储在内存上的系统 黑客们很喜欢使用它 毕竟一个关机键就可以完全清除他们的犯罪历史 遇见危险 直接关机处理 有哪个小可爱会不喜欢呢

运存(内存)空间很小 如何去保存数据呢? 关机就清除数据的话 不就都没了吗?

  • 我们先要明白 数据使用频率很高的时候我们称之为 热数据 ,举个栗子, 淘宝的商品 steam的热销 哔咔的推荐本 都算是一种热数据
  • 如果我们每次都想看这些数据,我们就不用把他们放在Mysql这样存储大量数据的地方了(虽然Mysql有索引这样的机制 但还是麻烦!) 毕竟放在硬盘的数据怎么都比不过存在内存上的数据
  • 反而呢 内存非常贵 (吃鸡火热的时候 一个内存买500了解一下) 为了节约成本 不经常使用叫做冷数据 存放在硬盘里这是很不错的选择

万一哪一天停电没有存储在硬盘的热数据岂不是直接GameOver, 所以我们需要了解一个点 热数据其实不但保存在内存 他也有机会在硬盘中被存储 这里存储在硬盘的热数据就是备份备份

所以加入高速缓存的存储机制是这样的:

  • 去应用程序里找数据(变量, 列表之类的简单数据)
  • 没在App找到,辨认这个数据是否为热数据,
  • 是的话就去缓存中查找(redis),冷数据就去 数据库查找
  • 实在找不到你在HDD里找找?

如果缓存数据发生变化记得及时 更新到数据库里哦!

使用案例

  • 买的火爆的商品数据在redis中,买的一般的数据在数据库中
  • 网红的言论在redis中,普通人的评论在数据库中
    • 这不是流量的限流,只是为了让更多人加载网红的言论速度更快,普通人的评论似乎没那么多人去看加载快不快与我无关~
    • 区别对待是吧 好家伙
  • 超大量(上亿万)的数据请求提交的时候 先交给集群们
  • 集群们先暂存数据拿到数据之后等到请求来到低谷期在上传到Mysql类数据库
  • 买秒杀物品的时候 抢到之后下单支付明明到了支付界面,但是还是可能会支付失败是因为其他进程线程虽然晚入一步订单界面但是他提前执行了订单操作, 从而自己被挤了下去. 订单失败的人就想 这玩意儿有黑幕!一定是商家自导自演! 引入Redis后 只要提交了数据就是 一个一个执行不会出现插队或者加塞的情况 你可以慢慢下单 甚至插科打诨一段时间再去下单 只要道德允许 Redis就能允行。

集群

如果大量数据只靠一台服务器做告诉缓存似乎没那么靠谱 我们就可以搭建集群 类似HDFS叫做 LAZY_PERSIST 的玩意儿. 一般我们在Docker这种容器中搭建

Redis的介绍

是一个NoSQL数据库产品之一 以键值对的形式进行存储 像JSON 像DICT 像Mapper 由C语言编写(源码开源) 理论可以达到10个w的QPS QPS指的是每秒查询的次数

X轴是链接数 Y轴是一秒查询的次数

X轴是链接数 Y轴是一秒查询的次数

安装与服务器的搭建

下载链接https://github.com/tporadowski/redis/releases,下载ZIP即可

现在之后不要打开任何程序 我们需要让redis-server.exe启动的时候redis.windows.conf, 但是 exe 默认不会加载任何文件 我们需要写一个Bat文件来实现这样的效果

新建记事本txt:

redis-server redis.windows.conf
  • 1

写完之后 存储为.bat文件即可

打开bat弹出的cmd类似于下图

基础使用

启动客户端的话可以直接点击redis-cli.exe运行 界面简陋 见谅 服务器端口默认 6379

默认, Redis有16个逻辑库(0~15) 都是空的 可以存储数据 使用 select <id> 切换到指定id的逻辑库 如

select 0
  • 1

切换到 0 号逻辑库

还有以下命令:

set <key> <value> 新增键值对
get <key> 获取指定key的value
del <key> 删除
clear 清空控制台
flushall 删除所有数据
  • 1
  • 2
  • 3
  • 4
  • 5

虽然好用但是简陋!高端程序员的美工都差劲(暴论)

安装 RedisDesktopManager 优化你的眼睛查看更高级的界面

官网收费从别人那里借来的免费网盘链接

百度网盘:https://pan.baidu.com/s/15xVRpCT8mkP2uT8PoBHT3g 提取码:v727

Redis的持久化

为了不让突然断电的事故导致Redis数据丢失 Redis提供了俩不错的方案

  • RDB方案
    • 定期存储备份 只有触发一定条件 才会进行同步 比如一分钟读写1000次我就进行同步
  • AOF方案
    • 日志记录 你写一次我就记录一次 宕机后 使用日志重启就会把操作的记录复刻一次

参数配置

对Redis本身的配置

  • port 端口 默认 6379
  • bind 默认被注释 表示可以连接数据库的IP地址 推荐是0.0.0.0 表示全部
    • 删除空格的时候记得删除空格!
  • timeout 超时时间 默认是0 表示没有超时时间
    • 长时间不用不关闭也不好 timeout最好设置个数字
  • logfile 日志输出的文件名称
    • 填写之后记得找到注释并删除注释: syslog-enabled
    • 值设置为no就行
  • databases 逻辑库的数量
    • 默认16
  • requirepass
    • 密码 默认被注释 建议打开

rdb是内存数据同步到硬盘的数据库文件 有如下操作

  • save 同步频率
  • 写入包括增删改
  • save 60 10000 指的是 在 60s 之内 写入了10000条数据就进行同步
  • save 900 1 指的是900内写入了一条数据都可以进行同步
  • rdbcompression
  • 同步数据采用压缩
  • rdbchecksum
  • 同步数据校验
  • rdbfilename
  • rdb文件的名称
  • dir
  • redis所在的目录、

RDB 同步方案似乎会导致数据的丢失? 我们不妨了解一下AOF的备份方案吧

  • maxclients 最大连接数
    • 默认无限制 实际环境推荐设置一个大数字
  • maxmemory 占用内存的大小
    • 默认无限制 实际环境推荐设置一个70%的内存
  • appendonly 开始AOF模式
    • 默认关闭
    • 开启AOF RDB就要关闭 我们只需要删除save命令存在的配置行即可
    • 需要指定appendfilename 指定aof
  • appendfsync 同步频率
    • no - 有系统决定何时存储
    • everysec - 每秒都会把数据写入到硬盘 不可靠就是了
    • always - 每次内存写入 都会写入硬盘

Redis的数据

前排提示:

这一章出现的指令很多! 当我们常用的数据指令了 太多了 记不住?

多用就是哈哈

刚开始用记不住,不妨下载一个 Utools 安装插件redis文档 在开发的时候使用Alt+空格输入redis可以快速查找你的指令

比如我要查询列表删除的数据 我就这样:

然后使用多了你就可以得心应手了!

不是广告!!!

redis的数据类型分为 5 种 - 字符串 哈希 列表 集合 有序集合

字符串String类型

可以保存文字数字 也可以保存图片等多媒体文件数据

如何保存多媒体文件?

将文件转化为二进制 序列化 即可存储

但是String并不是无穷大的 最大可以保存512MB数据

  • 基本指令
    • set <key> <value> 可以设置数据 也可以对已有key进行修改 不用给value引号 Redis不需要他
    • get <key> 获取字符串的数据
    • del <key> 删除数据
  • 高级指令
    • getrange <key> <index_min> <index_max>
      • 截取字符串从index_min到index_max的子串
    • strlen <key> 获取字符串的长度
    • setex <key> <sec> <value> 设置带有过期时间的kv对(单位秒)
    • psetex <key> <sec> <value> 设置带有过期时间的kv对(单位毫秒)
    • mset <k1> <v1>...<kn> <vn> 同时设置多个kv对
    • mget <k1> <k2> 获取多个k的value
    • append <k> <v> 向 k 对应的 value 追加内容 内容为v
  • 加法运算
    • incr <key> 数字+1
    • incrby <key> <number> 数字+number number∈int
    • incrbyfloat <key> <number> 数字+number number∈float
    • decr <key> 数字-1
    • decrby <key> <number> 数字-number number∈int
    • 没有decrbyfloat

浮点计算不大准确 还是少用最好

哈希Hash类型

hash说白了就是字典这种结构化的数据 在Redis使用hash 有种字典套字典的感觉

使用hash 可以一次性存储多个数据 比如对象的全部属性

hset <key> <hash_key> <hash_value>
	-- 在key里定义一个hash的value 并给这个hash的hash_key 设置一个hash_value 
hmset -- 类似于mset 同时给一个hash设置多个key
hget <key> <hash_key> -- 获取hash的hash_key的value
hmget -- 类似于hmget
hgetall <key> -- 获取hash所有的kv

hkeys <key> -- 获取hash所有的key
hlen <key> -- 获取hash的key数量
hexists <key> <hkey> -- 是否存在key字段 (0False 1True)

hvals <key> -- 获取字段值
hdel <key> <hkey>...<hkeyn> -- 删除多个hash的kv对 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

其实我们发现了一个规律只要在String的指令前面加上h 在添加一个hash_key的参数基本可以复刻string的操作

猜猜hincrby 和 hincrbyfloat 是什么? 如何用?

列表list类型

列表可以存储多个重复的值 他和Java的数组(但是你不用定义长度) python的列表有些类似

  • rpush key val1 [val2 …] 向某个列表的最右侧添加数据
  • lpush key val1 [val2 …] 向某个列表的最左侧添加数据
  • lset key index val 向某个列表的索引为index的地方设置数据(index从0开始)

猜猜 现在dname列表种存储的元素是哪些 顺序如何

你可以使用lrange查看列表数据

  • llen key 获取长度
  • lrange key start_index end_index 查看指定范围的list子串 end_index为负数的时候表示倒数第几个
  • lindex key index 获取第几个元素值
  • linsert key before|after value1 value2 将value2插入到列表元素value的前面|后面
  • lpop key 删除第0个元素(最左的)
  • rpop key 删除最后一个元素(最右的)
  • lrem key number value 删除列表的指定元素值的值 如果有多个可以使用number指定删除多少个

集合set类型

类似于Python的集合 和列表一致 不过不能存储重复字段而且无序

  • sadd key value [value2 vlaue3] 向集合添加元素 可以是多个元素
  • smembers key 获取集合所有元素
  • scard key 获取集合长度
  • sismember key value value是否属于集合
  • srem key value 删除元素值
  • spop 随机删除一个元素并返回
  • srandmember key number 从集合里随机返回number个元素

顺序集合Zset类型

顾名思义 可以排序的集合

  • zadd 添加元素并加上权重值
  • zincrby 添加元素的权重
  • zrevrange 降序排序key

5 10 表示 5<= x <= 10

(10 表示 小于10

+inf 表示无穷大

-inf 表示负无穷大

同理, ZrevrangebyScore表示降序 参数和升序一致

Redis 的Key命令

pyredis的普通操作

使用 pip install redis 下载redis库

import redis

r = redis.Redis(
    host='127.0.0.1', # IP
    port=6379, # 端口
    db=0, #  指定逻辑库
    # password="我没有哦" 密码没有的话可以咬打火机
)

print(r.keys("*")) # 获取逻辑库0的所有keys
print(r.type('number')) # 获取逻辑库0中number值类型
print(r.type('id'))

r.sadd('id',"24","4")
print(r.smembers('id'))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

Redis 事务

还记得Mysql的事务吗,记不得了? 没事 redis的事务机制和Mysql不同

但是还是推荐你去复习一下Mysql的事务…

mysql这一类 的数据库的事务是为了防止数据在进行操作的过程中发生了意外的宕机而出现的数据错乱问题所以引入的事务机制 undo redo

redis本身不需要对数据进行持久化,所以redis也不需要向mysql这样的事务机制,在redis中失败就是失败成功就是成功 没有原子性的说法。

redis的事务可以这么理解

  • 有两个客户端对redis进行请求
  • A-cli有两条指令 B-cli也有两个
  • ac 向redis发送请求 执行的期间 bc可能也会来请求
  • 但是ac值执行到了半途 没有事务的时候 bc可以从中进行插队执行
  • 所以我们引入了事务 如果是a先来的 就需要等到a执行完毕之后 b再去执行
  • 保证了客户端即使有多个命令也不会被插队的问题不再发生
  • 而且redis的事务没有回滚 错误就是错误 正确就是正确

数据监视

Redis 不会给数据来个锁来保证数据被多个请求所修改的事情发生,而是使用一个数据监视来监视数据,如果事务在执行过程中,其他的客户端修改了监视的记录那么当前的事务会自动的关闭。只有监视的数据没有被其他客户端修改的时候,事务才会执行完成

watch <key> <key2> ... 监视的数据可以是一条可以是多条

事务

定义

还是那个案例:小红给小明转账

  • 小红-50
  • 小明+50

如果 命令在中途出现宕机的问题 小红的钱没了 小明的钱没到账

为了避免这样的问题 我们推荐了事务 要么全部成功执行 要么全部执行失败

常见的问题

我们操作数据的类型发生错误的时候 可能会出现中间状态的问题(事务执行到一半不执行的问题),但是我们执行错误的指令的时候会自动discard

启动

使用 multi 启动事务 使用 exec 提交事务

开启事务后操作不会立即执行 而是在提交事务时候一同执行(类似批处理)

redis的事务没有回滚机制 所以不存在原子性 当事务提交之后不能进行回滚的操作,但是在事务提交之前我们随时可以使用discard取消事务。

当一致性持久性 隔离性 原子性都实现的事务就是强事务 如果没有实现其中的某个功能就是弱事务

? 为什么不实现回滚呢

因为回滚的实现需要使用磁盘执行因为redis基于内存 所以作者舍去了回滚机制

流水线 pipeline

一般的, redis对事务的执行机制是在服务端对发来的数据指令进行积攒,直到exec在进行执行.

现在我们可以将指令的积攒到本地然后再发送给服务器,也就是将积攒命令的工作移交给客户端.

流水线就是实现指令积攒的客户端技术 从而将发送1000条指令的次数 降低为 发送指令集也就是一次的次数 但是在这个一组命令中 有一个命令需要一个命令的执行结果(就是返回值) 的时候就不能使用该技术了 而且当本地的指令集非常庞大的时候一次性传送会给服务器很大的压力

pipeline技术实现需要使用连接池:

import redis

pool = redis.ConnectionPool(
    host='127.0.0.1',
    port=6379,
    db=0
)
r = redis.Redis(connection_pool=pool)
r.set("money", 2)

pipe = r.pipeline()

pipe.incr("money")
# pipe.incr('number')

print(pipe.execute()) # 会返回所有指令执行的全部返回值
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

基于流水线实现事务

import redis

pool = redis.ConnectionPool(
    host='127.0.0.1',
    port=6379,
    db=0
)
r = redis.Redis(connection_pool=pool)
with r.pipeline(transaction=1) as pip:
    pip.multi()
    
    #TODO 这里执行事务的命令
    
    ret = pip.execute()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

乐观锁与悲观锁

Redis 事务机制基于乐观锁实现

事务执行中可以对key进行监听 命令提交的时候 被监听的key对应的值没有被修改 事务提交成功 否则失败

相反的有个技术叫做悲观锁, 这个机制会认为数据是及其不安全的 所以每时每刻都会给给有竞争的资源进行上锁(有点像是进程锁与线程锁,在Java中有个单独的实现叫做同步代码块 synchronized)

watch books; # 对 books 进行乐观锁判断
multi # 开启事务
incr books # 修改值
exec # 提交事务

-- 同时在另一边
incr books # 我们在watch之后 exec提交之前执行这个数据 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

在乐观锁的思想下我们在wacth后记录了books的状态,我们在另个客户端对其进行了修改,这个时候事务进行提交会发现数据的值被修改了,这当然不行,所以事务执行失败。

如果是悲观锁的思想下我们在另一边的客户端(就叫B端), 修改值 因为books加了锁 那这个B就执行失败了。

redis的watch基于的是乐观锁 所以事务部分会执行失败 exec返回的是(nil)

简单的对 购买商品的实现 sleep期间会有其他客户端修改book_counter

import time

import redis

pool = redis.ConnectionPool(
    host='127.0.0.1',
    port=6379,
    db=0
)
r = redis.Redis(connection_pool=pool)
key = 'book_counter'


def buy_book(username):
    """
    购买书籍
    :return:
    """
    with r.pipeline(transaction=True) as pipe:
        while True:
            try:
                pipe.watch(key)
                # 对 book_counter 进行监听

                # 虽然watch在流水线里面
                # 理当在客户端整理完指令之后一并发出
                # 但是watch是直接发送的额
                time.sleep(10)

                pipe.multi()  # 开启事务
                pipe.decr(key)  # 在事务中对数据进行修改pipe.decr(key) # 在事务中对数据进行修改
                pipe.execute() # 在执行事务之前会看看监听的值是否被改变如果被改变就不执行了
            except redis.WatchError:
                print(f'{username} 购买失败 重新购买中')
                continue
            else:
                print(f"{username} 购买成功, 当前存货量{r.get(key)}")
                break


import multiprocessing

if __name__ == '__main__':
    r.set(key, 1)  # 设置一个 book_counter  现在还有一本了!
    print(f"当前存货量{r.get(key)}")
    buy_book("用户一")

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

精简版:

with r.pipeline(transaction=True) as pip:
    try:
        pip.watch(key) # 监听
        time.sleep(10) # 等待
        pip.multi()# 开始
        pip.decr(key) # 我自己购买
        pip.execute() # 提交给服务器
    except:
        print("购买失败 有人买了")

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

RDB AOF

RDB

在终端执行PDB

rdb的文件windows在redis的根目录下,linux在 /var/lib/redis/dump.rdb 或者是 /etc/dump.rdb 下,

我们可以使用指令 save 手动触发 reids的rdb 执行 save 的时候 服务器阻塞 无法处理客户端发送的命令请求。 这种技术我们也称之为 快照

为了避免阻塞导致服务器的redis无法工作 我们可以使用指令 bgsave 让我们redis在后台开启写磁盘的工作(Redis不是一个真正全部都在使用单进程单线程的程序 作者在某些重要的地方也被迫做了妥协)

bgsave执行流程:

  • cli发送指令
  • 服务器返回Backgroud saving starter 给cli
  • 服务器fork()一个新的进程做这件事情
  • 服务器继续服务不阻塞
  • RDB文件创建完毕后告诉服务器 并且记录日志

save不消耗其他资源,bgsave不进行阻塞

AOF

aof记录得是命令而不是数据 每当服务器重启得时候重新执行一下命令即可

AOF 并不是来一个命令记录一次,因为频繁的写入命令实质会变得像是mysql类似的磁盘数据库了 而是redis利用系统开辟的一个缓冲区 在缓冲区中进行命令的记录(缓冲区是内存级别的) 等到缓冲满了才会写入硬盘中。所以AOF也会失去部分文件

问题

aof 文件瘦身

因为是记录命令难免命令太多aof文件太大 为了避免命令的胡乱增长 redis提供了aof重写功能 比如多个命令发来的set我们可以变成一个mset 且过程不阻塞redis的正常运行

实现的方式有两种:

bgrewriteaof 和 配置 auto-aof-rewrite-percentageauto-aof-rewrite-min-size bgwrite和bgsave同理

RDB和AOF都开先找谁还原

AOF

练习

既然学习了这么多的命令 不妨来做个小demo 假设设计一个网络购物平台 只有下单前10名的用户才可以拿到数据 记录他们的名字与购买

Redis 高可用

主从复制

主从复制又名读写分离

高可用

架构设计考虑的地点 通常是设计如何去减少系统被阻塞的时间(不可服务的时间片要少)。

因为redis是单线程单进程的模式 如果redis挂掉 相关以来的服务就难以正常运行

类似于MasterSlave Master负责服务,而Salve是该服务的复制品 master会同步更新数据给slaves 保持主从同步 且master可以执行写读命令了 slave只能执行读命令

现在我们可以通过slave进行读 master进行写 将读写的命令尽量的分散开来 从而减去服务器的压力 即使master倒下了 slave 可以继任 master 成为 新的master.

创建从服务器

了解

在linux下使用目录下 redis-server --slaveof <主服务器的IP> <主服务的PORT> --port <从服务器的端口> --masterauth <master的密码>

接着可以使用命令 redis-cli -p <端口号> 链接从服务器了 在客户端中执行info查看自己的master

当然我们可以正常启动一个redis-cli 然后使用指令 slaveof IP PORT 让我成为从, 或者是 slaveof no one 让自己与master断绝关系

配置文件实现

我们手写一个redis的conf

slaveof 0.0.0.0 6379
port 1333
  • 1
  • 2

然后我们使用命令: redis-server <配置文件目录> 即可开启从服务器

哨兵

什么是哨兵与哨兵的搭建

如果主服务器在运行的过程中驾崩了, 为了快速的在slave中找到master的替代品,这个时候我们就可以使用哨兵(sentine)了 他会不断检查Master和Slave是否正常每一个哨兵可以监控多个Master和其Slaves. 当哨兵认为redis的主服务器死亡后 会挑选一个新的子服务器 并且推进为新主服务器, 自动切换完成。

使用 redis-server <SENTINEL_CONFIGER> --sentinel 使用哨兵

哨兵的配置文件如下:

port 2342341 端口

sentinel monitor <name> 127.0.0.1 4444 <count>
name: 监听的主服务器  name表示对主服务器所在的集体的名称可以随便取

count: 启动的哨兵投票多少数目才可以通过确认 哨兵可以是多个,在主服务器死亡之后投票选择新的从服务器进行继承 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

为什么引入投票机制:

​ 可能哨兵在检测的时候 对某个从服务器的生死判断有一定的错误(他可能活着 但是哨兵认为死了) 多些哨兵可以让结果准确一些

这样的话我们可以根据判断主服务器是否死亡来进行新的子服务器的上位,即使未来旧主复活新主依旧不会让位。

这是哨兵的其他重要配置

python操作哨兵

import redis
from redis.sentinel import Sentinel

sentinel = Sentinel([('192.168.236.128', 11451)])
# 链接哨兵 可以获取多个

master = sentinel.master_for('Group1',  db=1)
# 获取指定某个组里的主

slave = sentinel.slave_for('Group1',  db=1)
# 获取指定某个组的从

master.set("test_value", "TEST")
# master 读

print(slave.get("test_value"))
# slave 写

number = "counter"
master.set(number, 1)
with master.pipeline() as pipe:
    while True:
        try:
            pipe.watch(number)

            pipe.multi()
            pipe.decr(number)
            pipe.execute()
            break
        except redis.WatchError:
            print("购买失败")
            continue
print(slave.get(number))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

集群

如果请求量过大的时候,一个服务器很难抗住所有, 比如 春节订票 双十一 的时候,

集群是一组相互独立的 但是通过网络互连的计算机 他们构成了一个组 并以单一系统的模式加以管理

创建一个Django项目

pip install django 之后 使用命令 django-admin startproject 项目名 创建项目 然后会出现类似的结构

  • setting - 配置文件
  • urls - 路由
  • wsgi - web服务器 与 Django交互的入口
  • manage - 项目的管理文件

使用命令: python manage.py runserver 启动服务器

Django开发中, 我们会对网站的功能进行不同模块的划分,一个模块一个Django的应用 ,使用 python manger.py startapp 【应用名】 开启一个应用。

输入命令 python manager.py startapp studentapp可以得到

这些应用的文件有什么作用呢?

  • admin
  • apps
  • models 写数据库相关的内容
  • tests 测试类
  • views 接受请求进行处理与MT交互 返回应答

HelloWorld

创建好应用之后就可以在视图中实现代码了

from django.http import HttpResponse


def index(res):
    """
    index主页视图
    :param res:
    :return:
    """
    return HttpResponse("HelloWorld")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

默认的 views.py 有一个render的引用现阶段可以删除他

视图写完了 但是无法看见他真正的效果,我们需要将一个 URL 映射到它——这就是我们需要 URLconf 的原因了。

应用文件夹 里创建一个 url.py 并做好注册工作

from django.urls import path

from . import views

urlpatterns   = [
    path('', views.index , name='index')
]
# 这里面的列表名称必须是urlpatterns 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

这一步的操作是给我们的 APP 中的view(第二个参数) 指定路由(第一个参数 ) 并给他命名(name参数)

但是我们应用并没有注册到服务器上 所以我们需要在Django的项目urls中进行注册

from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path('student/',include('studentapp.urls')),
    # 给 studentapp.urls 里面注册的视图一个路由组 student/
    
    path("admin/", admin.site.urls),
]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

运行网站 访问IP:http://127.0.0.1:8000/student/ 即可查看视图

数据库的配置

在项目的setting中有一个DATABASES列表我们可以轻松的找到里面字典ENGINE的值 默认是 SQLite 我们可以换成MySQL 或者是其他的DBMS.

ENGINE – 可选值有 'django.db.backends.sqlite3''django.db.backends.postgresql''django.db.backends.mysql',或 'django.db.backends.oracle'。其它 可用后端

当我们使用的不是SQLite的时候我们可能会给连接的数据库获取他的密码或者是用户名称 可以这么做:

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': 'mydatabase',
        'USER': 'mydatabaseuser',
        'PASSWORD': 'mypassword',
        'HOST': '127.0.0.1',
        'PORT': '5432',
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

Django的数据库配置

使用mysql的时候 直接启动程序会有问题:Did you install mysqlclient? 你可以在项目的 __init__文件中输入下列代码

import pymysql
pymysql.install_as_MySQLdb()
  • 1
  • 2

中文设置

在setting中设置

LANGUAGE_CODE = 'zh-hans'

TIME_ZONE = 'Asia/Shanghai'
  • 1
  • 2
  • 3

可以设置中文

模型-基础

我们可以在应用下的 models 里编辑

from django.db import models

# Create your models here.
from django.db.models import CASCADE


class TeacherModel(models.Model):
    name : str = models.CharField(max_length=255)
    _teacher_id : int = models.IntegerField()

class StudentModel(models.Model):
    _id: int = models.AutoField(primary_key=True, null=False)
    name: str = models.CharField(max_length=255)
    _teacher_id: int = models.ForeignKey(TeacherModel, on_delete=models.CASCADE, db_column='_teacher_id')  # 外键
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

我们再把眼光跳到应用的 apps.py 这是我们应用的一个AppConfig 比如: studentappsAppConfig 就是StudentappConfig 这是应用的灵魂

Django 的应用是可以热插拔的 你可以在任何项目使用同一个应用 !

而我们的AppConfig相当于一个插头,可以使App插入Django项目 我们只需要在 项目的setting.py 中的 INSTALLED_APPS 进行注册即可

注册的方式如下:

在AppConfig 中 有两个属性 name 我们知道是 这个应用的名称 default_auto_field 是指如果模型没有指定主键 那么就会自创建一个主键 这个主键的类型是default_auto_field的值 一般是django.db.models.BigAutoField 指的是巨大数字

给一个Filed设置主键只需要设置参数:primary_key=True 主键默认自动递增

通过模型类 获取对应的建表SQL

迁移是对Django模型的定义变化的存储方式, 我们使用命令 python manage.py makemigrations 应用名称 执行之后我们可以在 应用/migrations/000x_initial.py 中找到迁移的数据,使用第二个命令: python manage.py sqlmigrate 应用名 000x 可以查看迁移的模型的建表SQL

使用命令:

python manager.py sqlmigrate

将模型在SQL语句中创建

模型的curd

使用 python manage.py shell 可以 快速操作模型哦

insert 部分

from StudentApp.models import StudentModel as sm
from TeacherApp.models import TeacherModel as tm

newdata = sm(
    name='新增字段', # 普通字段
    _teacher_id=tm(_teacher_id=1) # 外键
)
newdata.save()
# 插入成功
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • save可以实现插入的操作 也可以实现Update操作

select 部分

使用filter函数进行查找 他的参数是拼接而成的 比如 name__endwith 表示name字段 以参数值结尾的数据的查询

  • name__endswith 结尾
  • __name__contains 包含
  • name__isnull 为空
  • id__in 等于列表元组里的某个值
  • id__lt 小于 参数再加个e id__lte 小于等于
  • id__gt 大于 参数再加个e id__gte 大于等于
  • id__year 时间在某年
  • id__gt 时间在之后
  • 大小比较可以类比到时间中
  • id__iexact 大小写不敏感

使用函数 exclude 进行反操作

还有很多… 更多查询

delete 部分

如果有查询出的结果 调用结果对象的delete方法就是删除

update 部分

a = sm.objects.all()
for i in a:
	i.name= "23"
	i.save()
  • 1
  • 2
  • 3
  • 4

或者是

Example.objects.filter(id=481).update(total_calories = 10)
  • 1

总结

总而言之 使用Django的模型创建表格需要以下几步

在后期的模型更新的时候你可以不在两个命令后加上应用名

踩坑

有些时候指定了主键但是还是会出现 Django新建字段名称为:id列作为主键,这种情况在模型被其他模型做成了外键的时候会出现, 解决方式是在设置外键的模型字段添加参数db_column 指定列名。

from django.db import models

from teacherapp.models import TeacherModel


class StudentModel(models.Model):
    _id: int = models.AutoField(primary_key=True, null=False)
    name: str = models.CharField(max_length=255)
    _teacher_id: int = models.ForeignKey(TeacherModel, on_delete=models.CASCADE, db_column='_teacher_id')  # 外键
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

模型的其他参考文档:

管理页面

首先,我们得创建一个能登录管理页面的用户。请运行下面的命令:

python manage.py createsuperuser

边学记录

Ajax的请求处理

<input type="text" name="name" id="name">
$.ajax({
        type: 'post',
        url: '/withtag_ajax/',
        data: {name: $("#name").val()},  // data直接是js对象
        success: function (data) {
            $('#info').text(data.msg)
        }
    }
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
from django.http import HttpRequest, JsonResponse
from django.views import View

from api_blog.models import BlogModels

class api_get_page_blogs(View):

    def post(self, response: HttpRequest):
        """
        Ajax 请求处理
        DESCRIPT
            返回指定页数的所有博客
        CLient AJAX Format
            {
                page: xxx // 第几页
                pre_page_count: xxx // 每页有X个博客需要展示 默认 5
                order: READERCOUNTER | DEFAULT (READERCOUNTER可以缩写为1 表示阅读量 DEFAULT简写为0默认)
            }
        Server AJAX ForMat
            {
                data : [{ 'name' : ... , 'page': ... ,'author' : ...}...] // 博客数据
                status: 1 | error | 3; 1 表示成功 2表示失败(页数不符合规范或者没有数据) 3表示请求不规范
             }
        :param response: 浏览器的请求体
        :return:Server AJAX JSON
        :author LiuBoyuan@qq.com
        """
        json = {}
        ajax_data: dict = response.POST
        now_page = ajax_data.get('page')
        pre_page_count = ajax_data.get("pre_page_count")
        order = ajax_data.get("order")

        if now_page is None or now_page < 0:
            return JsonResponse({"status": 'error'})

        pre_page_count = pre_page_count if pre_page_count is not None else 5
        order_rule = order if order is not None else "DEFAULT"

        if order_rule == "READERCOUNTER" or order_rule == 1:
            BlogModels.objects.order("reader_count")

        slice_index = slice((now_page - 1) * pre_page_count, now_page * pre_page_count)
        all_das = BlogModels.objects.all()
        ret_das = all_das[slice_index]
        if ret_das == [] or not ret_das:
            return JsonResponse({"status": 2})
        json['data'] = ret_das
        json['status'] = 1
        return json


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

DEGUG不同模式下的 static 文件目录 的设置

见:https://zhuanlan.zhihu.com/p/151855280

STATIC_URL = '/static/'
if DEBUG == False: 
    STATIC_ROOT = os.path.join(BASE_DIR, 'static')
else:
    STATICFILES_DIRS = [
	    os.path.join(BASE_DIR,"static")
    ]
 # setting.py
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

Session

一般情况下 Session是不会串联的(每个客户单的session单独存在 除非两个客户端的session被同时设值等问题)

def view(req):
    req.session['key'] = value
    req.seesion.get('key')
    ...
  • 1
  • 2
  • 3
  • 4

DRF 前后端分离模式

api 接口与 restful

api 就是一个接口 一个url

restful 中文叫做 资源状态转移(表征状态转义)

实质上 restful 是面对资源进行开发的

我们理解的 Book/add Book/delete 这些路由是不符合restful 规范的

面向资源开发指的是指 针对资源本身 不加入任何动作的规范 add delete属于资源的动作 所以不符合我们的要求。

restful规范是通过请求的类型来规定制定的动作不同

  • post 添加数据
  • get 获取数据
  • get 获取id为pk的数据
  • delete id为pk数据的删除
  • put 修改一个学生的全部数据
  • patch 修改一个学生部分的信息

FBV 和 CBV

实质上 FBV 就是函数视图 CBV 就是类视图

def student(req):
    if req.method == 'GET':
        return HttpResponse("Get")
    elif req.method == 'POST':
        return HttpResponse("Post")
  • 1
  • 2
  • 3
  • 4
  • 5

以上就是使用 FBV 实现的RESTFUL的函数视图

CBV的实现就是

def StudentView(django.views.View):
    def get(self, request):
        return HttpResponse("Get")
    
    def post(self, request):
        return HttpResponse("Post")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在urls中的注册

urlpatterns = [

 path("student/", StudentView.as_view(), name='...')

]
  • 1
  • 2
  • 3
  • 4
  • 5

问题:

我写的 CBV 写好了 student 路由的 post 方法 为什么postman的时候获取不到呢?

127.0.0.1:8000/api/student

因为 student后面没有 / 也就是不是 127.0.0.1:8000/api/student/这么结束的时候 Django默认会补全一个/ 从而重定向一个get请求。

源码解析

我们在Path中首先走了as_view() 方法:

@classonlymethod
def as_view(cls, **initkwargs):
    """Main entry point for a request-response process."""
    ....
    $ def view(request, *args, **kwargs):
        ...
	...
    $ return view
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

as_view的大致结构是这样的 前后做了些操作 关键的是: 定义了一个inner函数view 然后将其返回

所以我们在 StudentView.as_view()的部分就是as_view 返回的函数的调用

细看下as_view返回的函数

def view(request, *args, **kwargs):
    self = cls(**initkwargs) # 将类进行实例化 谁调用的as_view谁是cls
    ...
    return self.dispatch(request, *args, **kwargs)
  • 1
  • 2
  • 3
  • 4

view 函数将类进行实例化 然后使用实例化对象的dispatch方法 dispatch方法返回什么 view返回什么

我们细说下dispatch:

def dispatch(self, request, *args, **kwargs):
    if request.method.lower() in self.http_method_names: 
        $ 如果请求在cls的请求列表里(请求类型存在)
        
        handler = getattr(self, request.method.lower(), self.http_method_not_allowed)
        $ 设置对象的指定请求名称的方法
        
    else:
        handler = self.http_method_not_allowed
        $ 请求不存在 设置请求不存在方法
        
    return handler(request, *args, **kwargs)
	$ 调用设置好的方法 并将结果返回
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

DRF 与 APIView

pip install djangorestframework

我们将CBA视图的父类修改为APIView如:

from rest_framework.views import APIView

class BookView(APIView):
    def get(self, request):
        return HttpResponse("Get")

    def post(self, request):
        return HttpResponse("Post")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

我们使用postman测试的时候,发现她返回的数据与功能和View大差不差 是一致的.

源码剖析

前往ApiView类之后我们可以看到该类重写了View的 as_view

def as_view(cls, **initkwargs):
    if isinstance... $ 滤过
    
    view = super().as_view(**initkwargs) # 调用了View原生的as_view 并接受她的返回值
    $ ... 滤过
    return csrf_exempt(view) $ 相当与返回view 而且这个API不受csrf保护
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

我们已经知道了APIView 无非是将View的返回值进行一层csrf_exempt的包装 其余似乎什么也没有 但是我们知道在View中的as_view 他调用了self.dispatch() 这个方法有没有被重写呢 —— 重写了 这是dispatch

 def dispatch(self, request, *args, **kwargs):
 	...
    request = self.initialize_request(request, *args, **kwargs)
    $ 构建了一个新的request对象 
    ...
    return self.response
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

封装前的request 和 封装后的request

封装之前的request只支持处理UrlEncoded的数据 封装之后便开始支持 Json格式的数据了

当然在此后它还做了认证 权限 限流等组件的初始化

在BookView具体的请求处理函数中 post 可以使用 request.data 来获取请求体 get 就是query_params

class ClassBook(APIView):
    def get(req):
        data = req.query_params
    	...
	def post(req):
        data = req.data
        ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

序列化

在网络中的序列化就是把模型对象转化为字典 经过response之后 变成Json对象就是序列化 反序列化就是将JSON转化为字典 列表之类的模型的

from django.http import HttpRequest, HttpResponse
from rest_framework import serializers
from rest_framework.response import Response
# Create your views here.
from rest_framework.views import APIView

from sers.models import Book


class BookSerializer(serializers.Serializer):
    """
    书籍模型的序列化器
    """
    title: str = serializers.CharField(max_length=32)
    price: int = serializers.IntegerField()
    date: str = serializers.DateField(source='pub_date')


class SersView(APIView):
    def get(self, request: HttpRequest):
        """
        获取 全部的书籍 数据
        :param request:
        :return:
        """
        book_list = Book.objects.all()

        # 使用序列化对象进行序列化
        bookSerializer = BookSerializer(
            instance=book_list,  # 序列化的对象
            many=True  # 是否同时序列化多个对象
        )
        ret = bookSerializer.data  # 获取序列化的结果
        print(ret)
        return Response(ret)

    def post(self, request: HttpResponse):
        """
        添加数据
        :param request:
        :return:
        """
        datas = request.data
        bookSerializer = BookSerializer(data=datas) # 反序列化
        print(bookSerializer.is_valid()) # 传过来的字段合法吗
        """
        合法生成一个 validated_data 对象 里面存储着可以直接被数据库存储的对象数据
        不合法生成一个 errors 对象 里面生成了 键值对 键是字段 值是错误的原因
        """
        if bookSerializer.is_valid():
            print(bookSerializer.validated_data)
            Book.objects.create(**bookSerializer.validated_data)
            return Response(datas)
        else:
            return Response(bookSerializer.errors)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

我们在对模型数据返回到前台的时候 或者是 需要将前台传来的数据进行模块话的时候 我们可以使用 rest_framework 进行序列化和反序列化的操作:

  • 首先我们要将数据进行序列化和反序列化的时候我们需要一个序列模板 他继承了serializers.Serializer
  • serializers.Serializer 放着我们想要对模型进行序列化的字段 以及它的数据类型
  • 创建一个serializers.Serializer的派生类 传入不同的参数有不同的效果 从而进行序列数据的处理初始化
    • data - 反序列化
    • instance - 序列化
  • 序列化的数据可以直接交给Response进行转化为JSON 交给前台
  • 反序列化的数据首先要获取数据的合法性 也就是 调用is_valid() 函数 如果True生成一个validated_data对象 里面存储着可以直接被数据库存储的对象 如果Flase 生成一个error对象 里面生成了 键值对 键是字段 值是错误的原因 你可以直接使用Response返回

save和create

为了实现松耦合,我们可以不再添加数据的时候加入代码

 Book.objects.create(**bookSerializer.validated_data)
  • 1

而是使用序列器继承下来的save()函数 也就是

bookSerializer.save()
  • 1

但是这样会报错 因为在源码中save调用类Serializer的基类的create,但是这个create可以理解成 抽象 的 需要去实现 而且create需要有返回值 这个返回值将会作为这个序列器对象的instance的值 也就默认帮我们将结果的数据进行了序列化 所以我们在Response传给前端的时候 可以直接使用 序列器对象的.data 获得 其 instance 的值。

也就是这样的:

...

class BookSerializer(serializers.Serializer):
    ...

    def create(self, validated_data):
        """
        实现create方法从而默许调用 save
        :param validated_data:
        :return:
        """
        return Book.objects.create(**validated_data)# 返回之后默认序列化
      
class SersView(APIView):
    def post(self, request: HttpResponse):
        """
        添加数据
        :param request:
        :return:
        """
        ...
        if bookSerializer.is_valid():
            bookSerializer.save()
            return Response(datas)
        else:
            return Response(bookSerializer.errors)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

当然除了create save还隐式调用了update 我们可以实现update再序列器中 然后进行调用

序列化器实现增改查查

from django.http import HttpRequest, HttpResponse
from rest_framework import serializers
from rest_framework.response import Response
# Create your views here.
from rest_framework.views import APIView

from sers.models import Book


class BookSerializer(serializers.Serializer):
    """
    书籍模型的序列化器
    """
    title: str = serializers.CharField(max_length=32)
    price: int = serializers.IntegerField()
    date: str = serializers.DateField(source='pub_date')

    def create(self, validated_data):
        """
        实现create方法从而默许调用 save
        :param validated_data:
        :return:
        """
        return Book.objects.create(**validated_data)

    def update(self, instance, validated_data):
        """
        实现 update 方法从而默许调用save
        :param instance:
        :param validated_data:
        :return:
        """
        Book.objects.filter(id=instance.id).update(**validated_data)
        return Book.objects.get(id=instance.id)

class SersView(APIView):
    def get(self, request: HttpRequest, id:int = None):
        """
        获取 全部的书籍 数据
        :param request:
        :return:
        """
        if id is None:
            book_list = Book.objects.all()
        else:
            book_list = Book.objects.get(id=int(id))
        # 使用序列化对象进行序列化
        bookSerializer = BookSerializer(
            instance=book_list,  # 序列化的对象
            many=True if id is None else False # 是否同时序列化多个对象
        )
        ret = bookSerializer.data  # 获取序列化的结果
        print(ret)
        return Response(ret)

    def post(self, request: HttpResponse):
        """
        添加数据
        :param request:
        :return:
        """
        datas = request.data
        bookSerializer = BookSerializer(data=datas) # 反序列化
        print(bookSerializer.is_valid()) # 传过来的字段合法吗
    
        if bookSerializer.is_valid():
            print(bookSerializer.validated_data)
            bookSerializer.save()
            return Response(datas)
        else:
            return Response(bookSerializer.errors)

class BookDetailView(APIView):
    def get(self, request, _id:str):
        if _id.isnumeric():
            _id = int(_id)
            try:
                book = Book.objects.get(id=_id)
            except :
                return Response("NO Exist")

            bookSerializer = BookSerializer(instance=book, many=False)
            return Response(bookSerializer.data)

        else:
            return Response("error")

    def put(self, request, _id:str):
        """
		设置
        :param request:
        :param _id:
        :return:
        """
        request_json = request.data
        print(request_json)
        if _id.isnumeric():
            _id = int(_id)
            try:
                book = Book.objects.get(id=_id)
                bookSerializer = BookSerializer(data=request_json, instance=book)
                if bookSerializer.is_valid():
                    bookSerializer.save()
                else:
                    return Response(bookSerializer.errors)

                return Response(bookSerializer.validated_data)
            except Exception as e:

                return Response(f"error1{e}")
        return Response("error2")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111

ModelSerializer

顾名思义 ModelSerializer 是通过model自动实现序列器对象 不需要手动创建序列器的神器(甚至 create和update 都已经做好了)


class BookSerializer(serializers.Serializer):
    title: str = serializers.CharField(max_length=32)
    price: int = serializers.IntegerField()
    date: str = serializers.DateField(source='pub_date')

    def create(self, validated_data):
        return Book.objects.create(**validated_data)

    def update(self, instance, validated_data):
        Book.objects.filter(id=instance.id).update(**validated_data)
        return Book.objects.get(id=instance.id)

$ 可以直接这么写:
class BookSerializerAuto(serializers.ModelSerializer):
    date = serializers.DateField(source="pub_date")

    class Meta:
        model = Book
        # fields = ['title', "price"] # 对指定的字段进行序列化
        # fields = "__all__"  # 对所有字段进行序列化
        exclude = ['pub_date'] # 哪些字段不用序列化 因为我们自定义了一个date 所以pub_date可以不要
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

GenericApiView 视图

其实还是基础的CBV开发模式 只不过在APIView上提供了新的方法而已 依旧采用的dispatch分发的流程,一个简单的增删改查查代码太多了! 我们需要使用一种方式来简化我们的代码 这个时候GenericApiView的好处就来了

常用属性与方法

  • get_serializer_class(self)
    • 获取序列化器的类
  • get_serializer
    • 获取序列化器
  • get_queryset
    • 获取查询结果
  • get_object
    • 获取模型类数据对象

以上四个方法的作用为调度

  • queryset 属性
    • 确认数据的全部查询结果
  • serializer_class 属性
    • 需要使用的序列器

简单的使用

以我们的get请求为例

class SerailizerStudent(ModelSerializer):
    class Meta:
        model = Student
        fields = '__all__'

class StudntView(GenericAPIView):
    queryset = Student.objects.all() # 设置查询集
    serializer_class = SerailizerStudent # 设置序列器

    def get(self, req):
        mydatas = self.get_queryset() # 获取自身的查询集
        
        ret = self.get_serializer(instance=mydatas, many=True).data
        # 序列器的创建
        
        return response.Response(ret)  # 返回
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

针对主键的查询

我们有些时候并不会获取全部的数据而是获取指定id的数据 那么如何实现呢?

在GenericApiView中我们有一个叫做 lookup_field 的属性 通过 get_object 我们可以获取指定的

mixin

mixin类为 GenericApiView 进行了再一步的封装

  • ListModelMixin
    • 实现了get的功能在自己的list方法中
  • CreateModelMixin
    • 实现了post的功能在自己的create方法中
from rest_framework.generics import GenericAPIView
from rest_framework.mixins import *
from rest_framework.serializers import ModelSerializer

from Study.models import Student


class SerailizerStudent(ModelSerializer):
    class Meta:
        model = Student
        fields = '__all__'


class StudntView(GenericAPIView, ListModelMixin, CreateModelMixin):
    queryset = Student.objects.all()
    serializer_class = SerailizerStudent

    def get(self, req):
        return self.list(req)

    def post(self, req):
        return self.create(req)


class StudentDetailView(GenericAPIView, RetrieveModelMixin, DestroyModelMixin, UpdateModelMixin):
    queryset = Student.objects.all()
    serializer_class = SerailizerStudent

    def get(self, req, pk):
        return self.retrieve(req)

    def put(self, req, pk):
        return self.update(req)

    def delete(self, req, pk):
        return self.destroy(req)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

再封装


class StudntView(ListCreateAPIView):
    queryset = Student.objects.all()
    serializer_class = SerailizerStudent


class StudentDetailView(RetrieveUpdateDestroyAPIView):
    queryset = Student.objects.all()
    serializer_class = SerailizerStudent
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

高封装视图

ViewSet

viewset改变了原有的apiview分发机制的逻辑,将单一资源和多资源的查询结合在一起从而简化代码


class StudentView(ViewSet):
    def get_object(self, req, pk):
        return Response("单一查询")

    def get_all(self, req):
        return Response("多个查询")

    def add_object(self, req, pk):
        return Response("添加数据")

    def update_object(self, req, pk):
        return Response("单一添加")

    def delete_object(self, req, pk):
        return Response("单一删除")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

如上 我们可以自定义方法的名称来区分 单一资源的面向和多资源的面向

对于他的分发点 我们可以在as_view中创建映射


urlpatterns = [
    path("student/", StudentView.as_view(
        {
            "get": "get_all",
            "post": "add_object",
        }
    )),

    re_path(r"student/(?P<pk>\d+)", StudentView.as_view(
        {
            "get": "get_object",
            "delete": "delete_object",
            "put": "update_object"
        }
    ))
]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

ModelViewSet

因为原生的ViewSet继承的是APIView没有GenericApiView的功能自然也用不了Mixin这些好玩意儿, 功能可以说是非常的鸡肋,所以我们推荐使用 GenericViewSet 进行开发 这样我们可以直接映射到Mixin方法里面

path("student/", StudentView.as_view(
	{
        "get": "list",
    	"post": "create",
	}
))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

通过这种方式我们可以实现这样的玩意儿

class StudentView(
    ViewSet,
    GenericAPIView,
    ListModelMixin,
    UpdateModelMixin,
    CreateModelMixin,
    RetrieveModelMixin,
    DestroyModelMixin
):
    queryset = Student.objects.all()
    serializer_class = SerailizerStudent
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
urlpatterns = [
    path("student/", StudentView.as_view(
        {
            "get": "list",
            "post": "create",
        }
    ), name='...'),

    re_path(r"student/(?P<pk>\d+)", StudentView.as_view(
        {
            "delete": "destroy",
            "update": "update",
            "get": "retrieve"
        }
    ))
]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

继承的东西太多了! 直接

class StudentView(ModelViewSet):
    queryset = Student.objects.all()
    serializer_class = SerailizerStudent
  • 1
  • 2
  • 3

路由

针对Mixin和ViewSet django 有一套路由

from rest_framework.routers import DefaultRouter

from .views import StudentView

route = DefaultRouter()
route.register('student', StudentView)
urlpatterns = []
urlpatterns += route.urls
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

认证 权限 限流 过滤

在drf的view的源码中,认证权限和限流是在执行dispatch前执行的三件套(在方法initial中执行)

    def initial(self, request, *args, **kwargs):
        ...
        self.perform_authentication(request)# 认证
        self.check_permissions(request)# 权限武松和
        self.check_throttles(request)# 限流
  • 1
  • 2
  • 3
  • 4
  • 5

认证权限的开发场景

认证

完成用户信息的判断 确定是否为用户表中的注册用户

权限

判断用户的视图操作权限

用户

普通账号
from django.contrib.auth.models import User
user = User.objects.create_user('john', 'lennon@thebeatles.com', 'johnpassword')

# 可以进行修改
u = User.objects.get(username='john')
u.set_password('new password')
u.save()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
超级账号

我们需要用户来进行操作 所有的用户信息都在auth_user里面存放着

admin用户的创建:python manage.py createsuperuser 然后会让你输入Password Email 和Username信息

字段
username
  • 1

必须。150 个字符或更少。用户名可能包含字母数字,_@+,。和 - 字符。如果您使用的 MySQL,请指定 max_length=191,因为默认情况下,MySQL 只能创建 191 个字符的唯一索引。

first_name
  • 1

可选 (blank=True)。 30 个字符或更少。

last_name
  • 1

可选 (blank=True)。 150 个字符或更少

email
  • 1

可选 (blank=True)。邮箱地址。

password
  • 1

必须。密码的散列和元数据。(Django 不存储原始密码。)原始密码可以是任意长的,并且可以包含任何字符。

groups
  • 1

多对多关系 Group

user_permissions
  • 1

多对多关系 Permission

is_staff
  • 1

Bollean 类型。指定此用户是否可以访问 admin 站点。

is_active
  • 1

Bollean 类型。指定是否应将此用户帐户视为活动用户。我们建议您将此标志设置为 False 而不是删除帐户;这样,如果您的应用程序对用户有任何外键,外键不会中断。

is_superuser
  • 1

Bollean 类型。指定该用户具有所有权限而不明确分配它们。

last_login
  • 1

用户上次登录的日期时间。

date_joined
  • 1

指定帐户何时创建的日期时间。在创建帐户时默认设置为当前日期/时间

操作
class UserView(APIView):

    def get(self, request: Request):
        """
        登录
        :param request:
        :return:
        """
        user_info = request.data
        username, password = user_info.get("username"), \
                             user_info.get("password")
        print(username, password)
        ret = authenticate(
            username=username,
            password=password
        )
        print(ret)
        return Response({"status": False if ret is None else True})

    def post(self, request):
        """
        注册
        :param request:
        :return:
        """
        user_info = request.data
        username, password = user_info.get("username"), \
                             user_info.get("password")
        try:
            User.objects.get(username=username)
            return Response({"status": False})
        except User.DoesNotExist:
            ret = User.objects.create_user(
                username=username,
                password=password,
                email="??@qq.com"
            )
        return Response({"status": True})
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

踩坑记录:

​ 创建用户使用的是create_user函数 而不是create

其他操作
from django.contrib.auth import login, logout
user = User.objects.get(username="xxx")
user.set_password("2323") # 设置密码
user.save()

def lv(req):
	user = authenticate(username=urn, password=pwd)# 首先校验
	a = login(req, user) if user else user is None # 保持登录状态 存放在session中
    
@login_required # 判断当前有用户登录 
def user_center(req):
    login_user = req.user # 获取用户
    
logout(req) # 登出
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
扩展字段

我们可以内建抽象user模型类

前提 我们需要在一个没有进行migrate的情况下使用该方法

步骤:

  • 新建一个应用 python manage.py startapp myNewUser
  • 定义模型类 继承 AbstractUser
  • 在Setting中进行设置 AUTH_USER_MODEL = "应用名.类名"

EG:

模型:
from django.contrib.auth.models import AbstractUser
from django.db import models
class MyNewUser(AbstractUser):
    phone = models.CharField(max_length=11, default="")
    # 在这里添加新的字段

设置:
AUTH_USER_MODEL = "myNewUser.MyNewUser"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

认证

def perform_authentication(self, request):
	request.user        
  • 1
  • 2

源码中就做了一件事调用了request的user属性方法

    @property
    def user(self): 
        if not hasattr(self, '_user'):
            with wrap_attributeerrors():
                self._authenticate()
        return self._user
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

没有_user的时候我们就去_authenticate里面一探究竟。

    def _authenticate(self):
        for authenticator in self.authenticators:
            try:
                user_auth_tuple = authenticator.authenticate(self)
            except exceptions.APIException:
                self._not_authenticated()
                raise

            if user_auth_tuple is not None:
                self._authenticator = authenticator
                self.user, self.auth = user_auth_tuple
                return

        self._not_authenticated()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

至于这个authenticators是什么我们还得回去看看我们dispatch创建request对象的地方:

file:view.py
    ...
	def initialize_request(self, request, *args, **kwargs):
        """
        Returns the initial request object.
        """
        parser_context = self.get_parser_context(request)

        return Request(
            request,
            parsers=self.get_parsers(),
            authenticators=self.get_authenticators(),
            negotiator=self.get_content_negotiator(),
            parser_context=parser_context
        )
    .....
	def get_authenticators(self):
		return [auth() for auth in self.authentication_classes]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

至于这个authentication_classes是什么我们可以前往rest_framework的Setting(不是项目的Setting!)里面的 DEFAULTS 里面进行查看。

'DEFAULT_AUTHENTICATION_CLASSES': [
    'rest_framework.authentication.SessionAuthentication',
	'rest_framework.authentication.BasicAuthentication'
],
  • 1
  • 2
  • 3
  • 4

我们可以看到这里面是一个默认存储了Session的验证机制(Session验证机制是基于Basic的所以需要加入)

因为在perform_authentication中我们调用了了Request的user 也就是获取属性方法 - user的返回值 其属性方法主要的内容就是self._authenticate, 而 self._authenticate 做的主要的事情就是做的主要的事情就拿到authenticators 里面的对象的authenticate方法的返回值 默认返回一个元组 格式: (user, token)

然后将返回值返回给 requestuserauth

所以我们可以在自己的StudentView(自定义的视图里面进行 重写authentication_classes)

class StudentView(ModelViewSet):
    authentication_classes = [
        
    ]
    queryset = Student.objects.all()
    serializer_class = SerailizerStudent
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

这是局部的配置 如果要做全局的配置那么就再Setting里面进行设置

REST_FRAMEWORK = {
    "DEFAULT_AUTHENTICATION_CLASSES":[]
}
  • 1
  • 2
  • 3
自定义认证器

我们可以先看看默认的用户是什么, 通过request.user获取

class StudentView(ModelViewSet):
    authentication_classes = [

    ]
    queryset = Student.objects.all()
    serializer_class = SerailizerStudent
    def retrieve(self, reqest, *args, **kwargs):
        print(reqest.user)
        return super().retrieve(reqest, *args, **kwargs)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

通过 http://127.0.0.1:8000/student/1 获得到了: AnonymousUser 也就是匿名用户 好的我们给这个Views一个局部的认证器罢:

class Authen(BaseAuthentication):
    def authenticate(self, request):
        return ("NoAdmin", None)

class StudentView(ModelViewSet):
    authentication_classes = [
        Authen
    ]
    ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

认证器可以返回3个值:

  • 元组 (1,2) -> 表示认证通过 在视图中request中 request.user 就是第一个值,request.auth就是元组的第二个值
  • 返回None 谁都不管 继续执行其他认证器
  • 异常 结束执行
简单DEMO使用

class Authen2(BaseAuthentication):
    def authenticate(self, request:Request):
        user = request.META.get('HTTP_X_USERNAME')
        # 获取请求头需要 在请求头前面加上HTTP_

        print(user)
        if user:
            try:
                user_obj = User.objects.get(username=user)
                return (user_obj, None)
            except User.DoesNotExist:
                return None
        return None


class Permissions(BasePermission):
    def has_permission(self, request, view):
        print(request.user.username)
        if request.user.is_superuser == 1:
            print("超级用户来啦")
            return True
        return False

class StudentView(ModelViewSet):
    authentication_classes = [
        Authen2
    ]
    permission_classes = [
        Permissions
    ]


    queryset = Student.objects.all()
    serializer_class = SerailizerStudent
    def retrieve(self, reqest, *args, **kwargs):
        print(reqest.user)
        print(reqest.auth)
        return super().retrieve(reqest, *args, **kwargs)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

权限

权限同理 到get_permissions 中获取由 self.permission_classes 构成的对象列表 默认是在DEFAULTS里的PERMISSION_CLASSES里面

'DEFAULT_PERMISSION_CLASSES': [
	'rest_framework.permissions.AllowAny',
],
  • 1
  • 2
  • 3

这个表示的是允许任何权限

class AllowAny(BasePermission):
    def has_permission(self, request, view):
        return True
  • 1
  • 2
  • 3

AllowAny 就是无脑返回True

k def check_permissions(self, request):
	for permission in self.get_permissions():
    	if not permission.has_permission(request, self):
            # 在权限列表中,只要有一个权限对象返回的是 False 那么就是无权限
        	self.permission_denied(
            	request,
                message=getattr(permission, 'message', None),
                code=getattr(permission, 'code', None)
			)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

Django 缓存优化

Django缓存优化

目的

每次对于热数据的访问,都需要使用Mysql等数据库进行增删改查,为了减小数据库的压力我们通常将热数据放在缓存中进行操作

  • 减小过载
  • 避免重复计算
  • 提高技能

流程

  • 给定一个服务请求
  • 查询页面
  • 如果缓存命中: 返回缓存的页面
  • 如果缓存不命中: 生成页面,将页面进行缓存(可以采用lru等算法) 并返回页面

缓存类型

  • Memcached
    • 内存缓存 最快最有效
  • Database caching
    • 数据库存储
    • 比如有两个表 表A的数据很麻烦 非常麻烦每次查询都十分的消耗时间 这个时候我们可以将查询的结果放在表B中进行存储,下一次就可以直接在表B中进行简单的查询。
  • FileSystem caching 不推荐
    • 将缓存存成某个文件
  • Local-memory caching 不推荐
    • 本地缓存 存储在服务器的内存之中
    • LOCATION可以指定成unique-snowflake 雪花缓存算法进行内存地址寻址而不是指定地址
  • Dummy caching 不推荐
    • 仿缓存 假的
  • Using a custom cache backend
    • 自开发

缓存粒度

  • per-site cache
    • 网站全局缓存 整个站点进行缓存
  • per-view cache
    • 对某个指定的视图进行缓存
  • 模板缓存 Template fragment caching
  • Low-level cache API
    • 低级缓存对某一个部分进行缓存

Django的配置

先pip一个库: pip install python-memcached

CACHES = {
    'default':{
        "BACKEND": # 引擎策略
        	"django.core.cache.backends.memcached.MemcachedCache",
        "LOCATION": "127.0.0.1:11211", # IP地址 可以是列表 代表一个集群
        "TIMEOUT": 300, # 保存的过期时间,
        "OPTIONS":{
            "MAX_ENTRIES": 300, # 缓存最大的数据量
            "CULL_FREQUENCY": 2 # 缓存条数达到最大值的时候 删除 1/2(2是字面量) 条数据
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

其中LOACTION可以是一个IP的列表 代表一个缓存的集群

DatabaseCache

配置:

CACHES = {
    'default':{
        "BACKEND":"django.core.cache.backends.db.DatabaseCache",
        "LOCATION": "my_cache_table", # 表名称
        "TIMEOUT": 300
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

使用命令 python manage.py createcachetable 在数据库中创建指定名称的缓存表 并且使用 mirate 进行数据表的迁移

创建好的数据表有三个字段: cache_key value expires 键 值 过期时间

per-view cache

将某个视图函数直接进行缓存

@cache_page(30)
def retrieve(self, reqest, *args, **kwargs):
    if reqest.user.is_staff == 1:
        cache1 = cache.get("adminUser")
        print(cache1)
        if cache1 is None:
            print("缓存未命中")
            cache.set('adminUser', reqest.user.username, 30)
            cache1 = cache.get("adminUser")
        print("cache", cache1)
    return super().retrieve(reqest, *args, **kwargs)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
@cache_page(30)
def get_html(req, username):
    s = time.time()
    user = User.objects.get(username=username)
    return HttpResponse(f"<h1>HelloWorld, 你是员工吗:{user.is_staff} <hr> 消耗时间:{time.time() - s}</h1>")
  • 1
  • 2
  • 3
  • 4
  • 5

如果我们开启了缓存会发现 所显示的消耗的时间是不会被改变的 因为他已经将固定的返回内容进行了缓存 导致界面无法进行更新

Low-level cache API

我们观察上面的代码 其实慢就满载对User模型的查询部分我们能不能对其进行单独的提取从而进行缓存呢? 答案是可以的

我们在Setting里面的配置项可以是多个 其中默认是default那个表示默认的default. 我们使用from django.core.cache import cache 可以直接使用 default配置项里面的数据,但是你也可以使用from django.core.cache import caches 然后 caches['default'] 也可以

from django.core.cache import cache
...
cache.set("hot_data_blog", "【热门】"20) # 设置热点数据到缓存服务器上
cache.get("hot_data_blog") # 获取
cache.add("hot_data", "Value")# 添加数据 只有key值不存在才生效
  • 1
  • 2
  • 3
  • 4
  • 5

例如代码:

class StudentView(ModelViewSet):
    authentication_classes = [
        Authen2
    ]
    permission_classes = [
        IsAuthenticated
    ]

    queryset = Student.objects.all()
    serializer_class = SerailizerStudent

    def retrieve(self, reqest, *args, **kwargs):
        if reqest.user.is_staff == 1:
            cache1 = cache.get("adminUser")
            print(cache1)
            if cache1 is None:
                print("缓存未命中")
                cache.set('adminUser', reqest.user.username, 30)
                cache1 = cache.get("adminUser")
            print("cache", cache1)

        return super().retrieve(reqest, *args, **kwargs)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

浏览器缓存策略

浏览器请求某个路由的时候

  • 发送请求到缓存中 如果缓存有该路由的缓存 直接渲染浏览器(缓存命中)
  • 没有的话才会想服务器请求
  • 请求好了 浏览器将服务器返回的数据交给缓存进行存储

中间件

中间件是一个请求/响应处理的钩子框架 轻量 低级的插件 用于全局改变输入和输出

当请求发送给服务器的时候 中间件可以在主路由前 视图前做一定的处理(或者是拦截) 或者是服务器发送响应给浏览器的时候也需要通过中间件 当然中间件的类型还有很多 不一一举例

中间件以类定义 需要继承类 django.utils.deprecation.MiddlewareMixin

定义

from django.utils.deprecation import MiddlewareMixin
class RouterMiddleWare(MiddlewareMixin):
    """
    一个合格的中间件 需要有以下的函数 可以是多个!
    以下函数都需要返回数据:
        None: 继续王下周
        HttpResponse: 结束了 不用继续了 我把数据返回给你就是
    """
    def process_request(self, request, callback=None, callback_args=None, callback_kwargs=None):
        """
        执行路由之前被调用 在每个请求上调用

        """
        print("-process_request-")

    def process_view(self, request,callback=None, callback_args=None, callback_kwargs=None):
        """
        调用视图之前被调用
        """
        print("-process_view-")

    def process_response(self, request, response,callback=None, callback_args=None, callback_kwargs=None):
        """
        所有响应返回给浏览器被调用 只能返回 HttpResponse
        """
        print("-process_response-")
        return response

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

注册

我们可以在setting里面进的MiddleWare进行注册

执行的顺序

在视图返回数据之前 按照注册顺序执行 在视图函数之后 注册顺序的逆序执行

例子

让浏览器每次指定访问次数

from django.http import HttpResponse
from django.utils.deprecation import MiddlewareMixin
from collections import defaultdict
import schedule
class LimitRequestRoute(MiddlewareMixin):
    visit_log = defaultdict(lambda :0)
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        schedule.every(10).seconds.do(self.clear)
        print("[中间件/LimitRequestRoute]: 开始执行数据")

    def clear(self):
        self.visit_log.clear()
        print("[中间件/LimitRequestRoute]: 缓存被清空")

    def process_request(self, request, **kwargs):
        value = request.META.get("REMOTE_ADDR")
        schedule.run_pending()
        if self.visit_log[value] <= 4:
            self.visit_log[value] += 1
            print(self.visit_log)
        else:
            return HttpResponse("你的访问次数超过正常值 请在10秒后重试")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

分页

有一个叫做Paginator的类 很nice

实例化

page = Paginator(object_list, pre_pages)
- object_list 数据列表
- pre_pages 每页的数据量
  • 1
  • 2
  • 3

使用

它有这些

、数据

  • count 数据的长度
  • num_pages 分页后的总数
  • page_range: 当前所有的页数
  • page(number)
    • 返回number页码的信息(从1开始而不是0)
    • 如果不存在 抛出InvalidPage的异常
    • 返回的信息数据有这些属性
      • object_list 当前页上所有数据的对象列表
      • number 当前页的序号
      • paginator 当前page的Paginator对象
      • has_next() 下一页有吗
      • has_previous() 上一页有吗
      • has_other_pages() 有上一页和下一页吗
      • next_page_number() 下一页的页码 如果不存在抛出InvalidPage异常
      • previous_page_number() 上一页的页码 如果不存在抛出InvalidPage异常

文件上传

我们先做个前端:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <script src="https://cdn.staticfile.org/jquery/3.6.1/jquery.min.js"></script>
</head>
<body>
<style>
    #form {
        width: 60%;
        margin: auto;
    }

    input {
        display: block;
        width: 100%;
        margin-bottom: 20px;
        height: 50px;
        font-size: 2rem;
    }
</style>
<form id="form" method="post" enctype="multipart/form-data" action="/file/upload/">
    <input type="file"  id="ac">
    <input type="submit" value="登录" id="dl">
</form>

</body>
</html>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

后端:

def file_update(req: Request):
    if req.method == 'POST':
        file: InMemoryUploadedFile = req.FILES.get('user_file')
        print(type(file))
        if file is None: return HttpResponse("上传失败 - 文件为空")
        filename = file.name
        return HttpResponse(f"你的文件是{filename}, 大小 {file.size} KB")
    else:
        return HttpResponse("不支持的类型")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

在Django中 我们将客户端上传的资源称之为: media 资源 区别于 static 资源

这是对 media 资源的配置

MEDIA_URL = "/media/" # 加载资源的时候 路由需要以MEDIA_URL开头
MEDIA_ROOT = os.path.join(BASE_DIR, "media")
  • 1
  • 2

我们还需要在主路由里面配置MEDIA_URL设置的路由

from django.conf import settings
from django.conf.urls import static

urlpatterns += static(
    settings.MEDIA_URL,
    document_root=settings.MEDIA_ROOT
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

文件存储

方案一: open() 传统法则
file_path = os.path.join(settings.MEDIA_ROOT,file.name)
with open(file_path, 'wb') as f:
            f.write(file.file.read())
  • 1
  • 2
  • 3

缺点:容易重复名称

方案二:文件ORM

1: 创建一个模型

from django.db import models

class Content(models.Model):
    title = models.CharField("name", max_length=11)
    path = models.FileField(upload_to="picture") # upload_to 表示的是子目录
  • 1
  • 2
  • 3
  • 4
  • 5
  1. 使用
def file_update(req: Request):
    if req.method == 'POST':
        file: InMemoryUploadedFile = req.FILES.get('user_file')
        print(type(file))
        if file is None: return HttpResponse("上传失败 - 文件为空")
        filename = file.name
        Content.objects.create(title=filename, path=file)

        return HttpResponse(f"你的文件是{filename}, 大小 {file.size} KB")
    else:
        return HttpResponse("不支持的类型")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

项目部署

部署机器: Centos

runserver 的能力很差 我们一般使用 uwsgi 进行

uWSGI

定义

是一个新的web网关接口 是将Python应用放在web服务器之间的一种接口 被广泛的使用 我们平时使用的runserver 是一种很拉的测试环境使用的接口(但是如果没有runserver django根本和http无关).

通俗的说 wsgi就是一种http和django之间的翻译而且实现了uwsgi的协议功能完善协议众多。

我们可以使用 pip install uwsgi进行安装

报错解决:

yum install -y gcc* pcre-devel openssl-devel

yum install -y python-devel

# 或者是

yum install -y python3-devel

pip3 install uwsgi

我们要在于主应用目录中设置一个ini配置文件

[uwsgi]
#socket=0.0.0.0:9876
http=0.0.0.0:9876 ;让uwsgi 默认在9876端口开启http服务
chdir=/xxx/xxx/xxx/ ; 项目的路径 必须是绝对路径
wsgi-file=blog/wsgi.py ; 告诉uwsgi: wsgi的路径 相对路径
process=1
thread=2
pidfile=us.pid ; 服务的pid
daemonize=x.log ; 日志
master=true ; 主进程的管理

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

然后我们直接运行:

uwsgi --ini uwsgi.ini
  • 1

常见的问题

重复启动uWsgi 导致pid文件中的进程号失准

解决方式: kill -f uwsgi -9

JWT

jwt 就是: json web token ,用于 前后端分离项目|app|微信小程序 中用户的认证,他可以将原始的数据json加密成我们看不懂的字符串,通过后台将加密的字符串交给前台

> pip install djangorestframework-jwt

基本流程

1、 用户发送密码和账号进行登录 服务器收到之后发送一串随机的字符串我们称为token 交给用户

2、用户拿到之后存储, 在下次登录的时候拿着token交给服务器服务器进行验证

基于传统的token实现

import uuid

# Create your views here.
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.serializers import ModelSerializer
from rest_framework.views import APIView

from User.models import User


class UserSerial(ModelSerializer):
    class Meta:
        model = User
        fiedles = ['username', "password"]


class UserLogin(APIView):
    def put(self, request: Request):
        """
        修改密码 默认只有登录成功才可以修改
        :param request:
        :return:
        """
        req = request.data
        token = req.get('token')
        urn = req.get('username')
        newpwd = req.get('newpassword')
        if not token:
            return Response({"status": 201, 'error': "错误的登录状态-你没有登录!"})
        else:
            ret = User.objects.get(username=urn)
            if ret.token != token:
                return Response({"status":201, 'error': "小黑子伪造了错误的令牌"})
            ret.password = newpwd
            ret.save()
            return Response({"status":200, 'error':"修改成功"})
    def get(self, request: Request):
        """
        登录
        :param req:
        :return:
        """
        req = request.data
        urn = req.get('username')
        pwd = req.get('password')

        print(urn, pwd)
        ret = User.objects.filter(username=urn, password=pwd)
        print(ret)
        if not ret:
            return Response({"status": 201, "code": "error_login"})
        random_str = str(uuid.uuid4())
        i = ret[0]
        i.token = random_str
        i.save()
        return Response({'status': 200, 'code': random_str})

    def post(self, request: Request):
        """
        注册
        :param request:
        :return:
        """
        req = request.data
        urn = req.get('username')
        pwd = req.get('password')

        userSerial = UserSerial(data=request.data)
        if userSerial.is_valid():
            User.objects.create(**userSerial.validated_data)

            return Response(userSerial.validated_data)
        else:
            return Response(userSerial.errors)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75

代码的主要思路就是:

- 用户在某个登录操作之后 系统记录下用户的token并交给客户端
- 下次客户端在基于登录之后对账户具体的操作的时候对比记录的token和传来的ajax的请求token是否一致 一致再进行操作
  • 1
  • 2

JWT的实现

用户登录的时候 用户部分保存JWT 但是服务端不保存 以后用户再进行访问的时候携token 服务端进行校验,优势:相较于传统的token无需在服务端保存token

实现过程

第一步: 用户提交表单信息给服务器(用户名 密码) 使用jwt创建一个token并交给用户进行返回

jwt的token是三段字符串 并且用.链接

  • 第一段字符串: header 内部包含 alg:HS256, typ:“JWT”

    • 就是使用的加密算法是HS256 使用的类型是JWT
  • 第二段是自定义的用户的信息{id:"123123",name:"sadasd", exp:123123} exp 是过期时间 因为信息是可以被反解的 所以不能把敏感信息凡在里面

  • 第三段 第一部分的明文和第二部分的明文拼接起来并进行加密 加密算法就是第一段alg指向的值 当然加了盐 加完密之后再加上了base64url

第二步: 以后用户再来访问的时候需要携带token 后端需要对token进行校验

校验过程

  • 获取token,对token进行切割
  • 对第二段进行base64url解密
  • 获取第二段的json信息 检测过期时间是否超时

第三步: 把第一二端进行一系列的解密对比两个密文如果一致就是没修改过

因为服务器对第三段的信息进行了加盐 用户无法知道盐的具体值 所以无法进行解密

代码实现

class JWTUserLogin(APIView):
    def put(self, request: Request):
        """
        修改密码 默认只有登录成功才可以修改
        :param request:
        :return:
        """
        req = request.data
        token = req.get('token')
        newpwd = req.get('newpassword')
        salt = settings.JWT_SALT
        try:
            payload = jwt.decode(token, salt, True)
            # 对token进行校验 如果失效抛出异常
        except  Exception as e:
            return Response({
                "status": 201,
                'error': "错误的令牌"
            })
        username :str= payload.get("urn")
        try:
            user = User.objects.get(username=username)
        except:
            return Response({
                "status": 201,
                "data": f"用户不存在了"
            })
        return Response({
            "status": 200,
            "data": f"你好, {user.username}"
        })

    def get(self, request: Request):
        """
        登录
        :param req:
        :return:
        """
        req = request.data
        urn = req.get('username')
        pwd = req.get('password')

        print(urn, pwd)
        ret = User.objects.filter(username=urn, password=pwd)
        print(ret)
        if not ret:
            return Response({"status": 201, "code": "error_login"})
        header = {
            "alg": "HS256",
            "typ": "JWT"
        } # 不写 默认也是他

        payload = {
            "urn": ret[0].username,
            "uid": ret[0].id,
            # "exp": ...
        }

        token = jwt.encode(
            payload=payload,
            key=settings.JWT_SALT,
            headers=header,
            algorithm="HS256"
        ).decode('utf-8')

        return Response({'status': 200, 'code': token})
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

企业开发

我们可以将token的验证过程交给一个验证器来完成

class JwtAuth(BaseAuthentication):
    def authenticate(self, request):
        if request.method == "GET":
            return None

        req = request.data
        token = req.get('token')
        salt = settings.JWT_SALT
        try:
            payload = jwt.decode(token, salt, True)
            # 对token进行校验 如果失效抛出异常
        except  Exception as e:
            raise AuthenticationFailed({
                "status": 201,
                'error': "令牌错误 小黑子伪造了吧"
            })
        username: str = payload.get("urn")
        try:
            user = User.objects.get(username=username)
        except:
            return AuthenticationFailed({
                "status": 201,
                "data": f"用户不存在了"
            })
        return (user, None)


class prologin(APIView):
    authentication_classes = [JwtAuth, ]

    def put(self, req):
        return  Response({"status":200, "data":"Okay"})

    def get(self, req):
        JWTUserLogin().get(request=req)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

RabbitMQ

定义

rabbitmq 是一个消息队列 消息队列也称之为消息中间件

队列是一个先进先出的数据结构(FIFO)

作用

我们知道的 线程和进程python已经实现了队列 为什么还有Rabbit MQ呢

不同的语言(Java和C#和python的通讯)或者是Python本身实现的不同主机的分布式通讯

流量削峰

传统的web服务是来一个请求我处理一个 如果服务器并发量是10 突然来了11人会导致服务器的崩溃! 为了解决这个问题我们可以使用一个队列 存储从客户端请求来的消息 服务器每次从消息队列中取出自己合适数量的消息。这就是高并发的流量削峰

异步 解耦

某个系统 有订单 骑士 商家 后台四个单元模块 用户下订单之后订单系统收到请求 并将请求发送一份给骑士系统 骑士系统处理完毕之后 请求交给商家来处理 商家处理完之后后台进行记录 系列流程执行完毕之后通过订单系统交给客户

这样的链式架构往往有一个问题 其中有一环如果出现了那就Bug了导致流程无法正常执行 有很大的问题 而且时间的消耗也是极其巨大的

如果换成了由A模块并起之后所有的模块 虽然性能表现较好与避免了之前环环相扣的问题,但是我们代码的架构就变成了高耦合低内聚的架构了 每次添加一个新的模块都要在订单系统进行该模块的并发代码编写 很是不好!

我们可以创建一个消息队列 然后订单系统将需要处理的数据交给队列 队列拿到之后 监听队列的模块们不约而同的过来拿去数据 一人一份然后进行处理 (像极了你在家喂一群猫猫狗狗的样子) 这就是一种生产者消费者模式

工作模型

rabbitmq 有两个模式

  • 简单模式
  • 交换机模式
    • 发布订阅模式 - fanout
    • 关键字模式 - direct
    • 模糊匹配模式(通配符模式) -

简单模式

如果是生产者消费者模式,我们需要一个往队列里存任务的实例和一个监听队列并获取任务的实例

生产者

职责: 链接RabbitMQ 向队列里面添加消息

流程

  • 链接 RabbitMQ
  • 创建一个队列
  • 向队列里面插入数据
import pika
auth_user = pika.PlainCredentials("admin", "20021003")
# 创建一个用户认证器

conn = pika.BlockingConnection(
    pika.ConnectionParameters(
        "192.168.236.128",
        credentials=auth_user
    )
)
# 链接指定的 RabbitMQ 并指定用户认证器

channel = conn.channel()
# 获取一个频道 他可以对RabbitMQ继续控制

channel.queue_declare(queue="new_task")
# 创建队列 并指定一个队列的名称为 new_task

channel.basic_publish(
    exchange="",
    # 这是交换机参数 因为是简单模式 所以为空

    routing_key="new_task",
    # 指定的队列名称

    body="你好我的上帝"
    # 插入的数据
)
# 向指定的队列插入数据
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

消费者

职责: 监听MQ 并获取数据

流程:

  • 链接
  • 监听某个队列
  • 确定回调函数
import pika
auth_user = pika.PlainCredentials("admin", "20021003")
# 创建一个用户认证器

conn = pika.BlockingConnection(
    pika.ConnectionParameters(
        "192.168.236.128",
        credentials=auth_user
    )
)
# 链接指定的 RabbitMQ 并指定用户认证器

channel = conn.channel()
# 获取一个频道 他可以对RabbitMQ继续控制

channel.queue_declare(queue="new_task")
# 创建队列 如果存在默认跳过

# 确认回调函数
def call_back(ch, method, properties, body):
    body:bytes
    print(f"[*] 检测到消息 {body.decode('utf-8')}")

# 确定监听队列
channel.basic_consume(
    queue="new_task", # 监听队列
    auto_ack= True, # 默认应答
    on_message_callback=call_back # 回调
)

channel.start_consuming()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

这是全部的代码总览

import time
from threading import Thread

import pika

def _getter():
    auth_user = pika.PlainCredentials("admin", "20021003")
    # 创建一个用户认证器

    conn = pika.BlockingConnection(
        pika.ConnectionParameters(
            "192.168.236.128",
            credentials=auth_user
        )
    )
    channel = conn.channel()
    # 获取一个频道 他可以对RabbitMQ继续控制

    channel.queue_declare(queue="new_task")
    # 创建队列 如果存在默认跳过

    # 确认回调函数
    def call_back(ch, method, properties, body):
        body:bytes
        time.sleep(.1)
        print(f"[*] 检测到消息 {body.decode('utf-8')}")

    # 确定监听队列
    channel.basic_consume(
        queue="new_task", # 监听队列
        auto_ack= True, # 默认应答
        on_message_callback=call_back # 回调
    )


    channel.start_consuming()
def _putter():
    import random
    auth_user = pika.PlainCredentials("admin", "20021003")
    # 创建一个用户认证器

    conn = pika.BlockingConnection(
        pika.ConnectionParameters(
            "192.168.236.128",
            credentials=auth_user
        )
    )
    channel =conn.channel()
    channel.queue_declare("new_task")
    while 1:
        channel.basic_publish(
            routing_key='new_task',
            exchange="",
            body=f"我是一个一个的{random.randint(1,100)}数据啊"
        )

if __name__ == '__main__':
    t1 = Thread(target=_getter)
    t2 = Thread(target=_putter)
    t1.start()
    t2.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

默认应答

basic_consume 可以设置我们的默认的应答参数: auto_ack

当数据被消费者取走的时候执行callback函数 如果callback有bug或者是需要重新做一个方案 我们重写了消费者的代码 但是当我们再次从数据中去数据的时候,因为数据早早的被取走了所以已经没有了

将默认应答修改为手动应答即可

...
def call_back(ch, method, properties, body):
    body:bytes
    # time.sleep(.1)
    str_body = body.decode('utf-8')
    if str_body != "我是一个一个的52数据啊":
        print(f"[*] 检测到消息 {str_body}")

        ch.basic_ack(delivery_tag=method.delivery_tag)
        # 手动发送删除消息的代码
    else:
        print("有错误啦!")
        raise KeyError("我处理不了52的数据 呜呜呜呜")



# 确定监听队列
channel.basic_consume(
    queue="new_task", # 监听队列
    auto_ack= False, # 默认应答
    on_message_callback=call_back # 回调
)
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

持久化参数

如果生产者将数据存在了MQ 但是消费者还没来得及去获取 MQ 就绷不住了 挂了,导致在队列里面的数据全部丢失, 怎么办?

在创建队列的方法queue_declare添加参数 durable=True注意当申明一个队列之后就无法对其的持久化进行修改了, 单单一个队列具有持久化的能力还不够,我们需要对他publish的数据也进行持久化的申明:

channel.basic_publish(
	exchange="",
    ...,
	properties=pika.BasicProperties(
    	delivery_mode=2
    )    
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

分发参数

如果应用场景里出现了多个消费者 (似乎现实也是) 生产者在某队列A中存放了一个数据 但是A B两个生产者都监听了该队列 这个数据究竟交给谁好呢?(人手一份? 那是发布订阅模式)默认的简单分发机制称之为轮询机制,一人一个谁也不许多谁也不兴少,如果生产者上传了一个数据给队列 队列有多个客户端在监听 那么这个数据会给最先监听队列的那个消费者。以此类推

这种轮询的方式固然不错 但是当数据交给某个消费者的时候,消费者本身处理数据的能力特别的慢,导致轮询无法继续往下执行 不好

我们指定两个消费者 一个消费者的callback我们设置sleep(2)表示该消费者的处理速度为2s 另外一个sleep(1)表示速度为1s.

我们是可以在消费者使用 channle.basic_qos(prefeth_count=1) 设置一个公平的分发机制。谁干活干完了就把数据交给谁。

交换机模式

发布订阅模式

发布订阅模式主要的功能是通过上传者开辟一个交换机 所有的客户端准备一个队列 并订阅某个交换机后 每当生产者生产一份数据给交换机 所有订阅交换机的队列们将会立刻人手一份生产者生产的新数据

在发布订阅模式下我们的生产者消费者在做什么呢?

生产者
  • 链接 RabbitMQ
  • 添加一个 交换机
  • 向交换机中添加数据
import pika
auth = pika.PlainCredentials('admin','20021003')
conn = pika.BlockingConnection(
    pika.ConnectionParameters(
        "192.168.236.128",
        credentials=auth
    )
)
chl = conn.channel()

chl.exchange_declare(
    exchange="tasks", # 一个交换寄的名称
    exchange_type="fanout" # 生产交换机的模式 这里是发布订阅模式
)
# 生成一个交换机

chl.basic_publish(
    exchange="tasks",
    routing_key='',
    body="这是一个消息 你可以进行订阅"
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
消费者
  • 链接 RabbitMQ
  • 每个消费者都需要创建一个队列
  • 让队列绑定指定的交换机
总览
import time

import pika

auth_user = pika.PlainCredentials('admin', "20021003")
connect = pika.BlockingConnection(
    pika.ConnectionParameters(
        '192.168.236.128',
        credentials=auth_user
    )
)
chl_tmp = connect.channel()
exchange_name = "exchage_test"
chl_tmp.exchange_declare(exchange_name, exchange_type="fanout")  # 创建一个全局的交换机
chl_tmp.queue_declare('a')

def putter():
    """
    生产者
    :return:
    """
    auth_user = pika.PlainCredentials('admin', "20021003")
    connect = pika.BlockingConnection(
        pika.ConnectionParameters(
            '192.168.236.128',
            credentials=auth_user
        )
    )
    chl = connect.channel()
    exchange_name = "exchage_test"
    chl.exchange_declare(exchange_name, exchange_type="fanout")  # 创建一个全局的交换机
    chl.queue_declare('a')


    chl.basic_publish(
        exchange=exchange_name,
        routing_key='a',
        body="23"
    )



def __call_back(chl, method, properties, body):
    body:bytes
    print(body.decode("utf-8"))


def getter():
    chl = connect.channel()

    queue = chl.queue_declare("", exclusive=True)
    queue_name = queue.method.queue
    chl.queue_bind(
        exchange=exchange_name,
        queue=queue_name
    )
    chl.basic_consume(
        queue=queue_name,
        auto_ack=True,
        on_message_callback=__call_back
    )
    chl.start_consuming()

print("【生产者】")

from threading import Thread
t1 = Thread(target=putter)
t2 = Thread(target=getter)
t1.start()
t2.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

踩坑:

如果你是顺序执行代码的 请先启动消费者 在启动生产者

关键字模式

关键字模式是发布订阅模式的一个增强,我们知道绑定交换机的队列一旦知道交换机被赋予了新的值就会立刻获取一份拷贝数据 以至于人手一份,但是有些数据可能对某个具体的消费者是无用的 有些是有用的,换句话来说就是 不想让交换机发送的所有数据订阅的队列都有响应,这是极其不好的事情。

现在RabbitMQ有了一个新的方案: 队列绑定某个交换机的时候,顺便会携带一个关键字的参数,生产者在向交换机发送数据的时候也会携带一个关键字 如果关键字匹配成功那么就发送反之不发送。

关键字参数就是我们 queue_bindroute_keybasic_publishroute_key 如果需要多个可以 调用多次 queue_bind 或者 basic_publish

import pika

exchange = "direct_test"


def connect():
    auth = pika.PlainCredentials('admin', '20021003')
    connect = pika.BlockingConnection(
        pika.ConnectionParameters(
            '192.168.236.128', credentials=auth
        )
    )
    return connect


def init():
    chl = connect().channel()
    chl.exchange_declare(exchange=exchange, exchange_type="direct")


def putter(key=None):
    c = connect()
    chl = c.channel()
    while True:
        chl.basic_publish(
            exchange=exchange,
            routing_key=key,
            body=f"我的关键字是 {key}"
        )
        print(f"关键字{key}的交换机数据已发送")

def __callback(chl, method, property, body):
    body:bytes
    print(f"指定关键字{chl.routing_key}消费者已收到数据: {body.decode('utf-8')}")

def getter(key=None):
    c = connect()
    chl = c.channel()
    ret = chl.queue_declare("", exclusive=True)
    queuename: str = ret.method.queue
    chl.queue_bind(
        exchange=exchange,
        queue=queuename,
        routing_key=key # 绑定一个关键字
    )
    # 如果需要绑定一个关键字 可以再写一个 queue_bind
    chl.routing_key = key
    chl.basic_consume(
        queue=queuename,
        auto_ack=True,
        on_message_callback=__callback
    )
    print(f"关键字{key}的选手开始监听")
    chl.start_consuming()

if __name__ == '__main__':
    import threading

    init()
    threading.Thread(target=getter, args=("admin",)).start()
    threading.Thread(target=getter, args=("guest",)).start()
    threading.Thread(target=putter, args=('admin',)).start()
    threading.Thread(target=putter, args=('guest',)).start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

通配符模式

关键字模式需要交换机和队列绑定的关键字需要完全匹配 但是我们可以通过通配符进行模糊匹配 使用方法就是 原来使用 basic_publish 设置的 route_key 不变 改变的是我们的 queue_bind 的值。 # 表示任意多个字符 , * 表示一个字符(这个和window的文件搜索引擎的通配符似乎是不大一样的…)

Celery

在RabbitMQ的过程中我们依旧会发现问题的所在

目前的架构我们可以得知: 任务在分发的时候 先会将任务放在交换机中 订阅交换机的队列从中去数据

我们可以使用多路复用技术或者是多线程进程 但是太麻烦了 我们可以直接使用Celery来实现并发的技术

是一个分布式系统 处理异步任务支持任务调度

  • 分布式
    • Celery有三部分 消息中间件 任务执行单元 任务执行结果存储
    • 我们将这三个单元存储在不同的服务器架构上
    • 通过消息通讯进行通信的系统
    • 负载均衡 避免单点故障
  • 异步任务请求
    • IO型阻塞 asyncio式编程
    • 现在问题俺来了 如果MQ的数据有很多呢? 换句话就是如果分发任务的生产者如果在同一时间中生产了多个任务是不是说队列最末尾的数据需要等到前面所有的数据都处理完了再去处理呢? 答案是NO!
  • 定时任务
    • 热数据更新
    • 零点发送消息

架构

  • 消息中间件

    • 推荐使用 RabbitMQ 但是Redis简单些 本课程使用redis做笔记
  • 任务执行单元

  • 任务结果存储f

特点

  • 简单
  • 快速
  • 高可用
  • 灵活

本教程使用 Redis

流程

消费者

  • 我们一般使用Celery执行那些任务呢: 一般是涉及IO操作的代码 比如发短消息(手机)
  • 通过一条命令是队列开始监听

生产者

  • 将异步函数方引用给Celery

  • 对异步函数进行方法的调用就可以执行所需要的实现的效果

底层实现的还是 类似交换机模式 的实现

HelloWorld

消费者

import time

import celery
backend = "redis://127.0.0.1:6379/1" # 结果集
brokr = "redis://127.0.0.1:6379/2" # 消息中间件

cel = celery.Celery( # 创建一个异步任务
    "test",
    backend=backend, 
    broker=brokr # 消息中间件
)
@cel.task
def send_email(name):
    print("正在准备短消息的发送")
    time.sleep(10)
    print("f")
    return "OJBK"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

然后我们使用命令行启动:

如果是在windows启动还需要: --pool=solo 这个参数

其中__init__是文件的名称(不需要.py!)

它能够做的事情:启动一个worker 监听任务

生产者

from getter import *

result = send_email.delay("yuan")
print(result.id)
result2 = send_msg.delay("alex")
print(result2.id)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

当一个celery的任务监听到数据之后并执行完毕之后 会将返回的结果放在我们指定的redis数据库中 —— backend 我们可以通过 delay 返回的对象的 id 属性在 redis数据库中进行获取

对ID的处理

taskid = result2.id
async_ret = AsyncResult(
    id=taskid,
    app=cel
)

... # 我可以做任何我想做的事情在这里

if async_ret.successful():
    print("成功!")
    ret = async_ret.get() # 获取任务的返回值
    print(ret)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

多任务结构

我们可以创建一个新的包 包里面存放我们的所有监听任务 也就是我们的消费者,其他功能不变

  • celery.py - 一些对Celery的配置文件
import celery

cel = celery.Celery(
    "celery_demo",
    backend="redis://127.0.0.1:6379/4", # 结果集
    broker="redis://127.0.0.1:6379/5", # 消息中间件
    include=[ # 包含任务所在的文件 对多个任务可以做模块化的分类
        "tasks_group.task1",
        "tasks_group.task2",
    ]
)
cel.conf.timezone = "Asia/Shanghai"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • task01 之类的代码 - 任务 (消费者)
import time

from tasks_group._celery import cel

@cel.task
def send_email(res):
    print(f'我在发送邮件 内容是{res}')
    time.sleep(10)
    return "ok"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 生产者
from tasks_group.task2 import send_msg
from tasks_group.task1 import send_email

ret = send_msg.delay("Hello")
print(ret.id)
ret = send_email.delay("Hello")
print(ret.id)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

celery 实现定时任务

指定时间的定时任务

使用 国标时间 传入 apply_async 方法的 eta 参数。可以进行定时任务的设置

import datetime

from tasks import send_msg as tsend_msg

ctime = datetime.datetime(2022, 10, 16, 23, 14)
print(ctime)
utc = datetime.datetime.utcfromtimestamp(
    ctime.timestamp()
)
print(utc)
ret = tsend_msg.apply_async(args=["Hello", ], eta=utc)
print(f'ret: {ret.id}')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

2022-10-16 23:14:00 点发送一个 发送消息的任务

模块化的定时任务

from datetime import timedelta
from celery.schedules import crontab
import celery

cel = celery.Celery(
    "mode_time_tasks",
    backend="redis://127.0.0.1:6379/1",
    broker="redis://127.0.0.1:6379/3",
    include=[
        "tasks_time.task1"
    ]
)
cel.conf.timezone = "Asia/Shanghai"
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
    "every10m": {  # 该key可以自定义
        "task": "tasks_time.task1.send_email", # 指定的任务
        "schedule": timedelta(seconds=10),  # 每十秒执行一次
        # "schedule": 10,  # 上一行的代码的简写
        # "schedule": crontab(minute="*/1"), # 每一分执行一次  * 表示每
        "args": ("就是给你发消息",) # 参数
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

这样配置之后我们可以直接使用消费者进行 从而不再对生产者进行编码了 为了让这个定时任务跑起来 我们需一行命令 每个指定的时间就向 redis队列 放置任务: celery -A tasks_time._celery beat

Django和Celery

我们需要在django 项目里生成一个包 celery 包的里面装着许多的任务包这些就是我们的消费者,每个消费者包里面需要一个 task.py 这里面存放着我们的消费函数,回到 celery 包下面 创建一个config 和一个 main

大致结构像这样:

接下来代码部分:

main.py

import os

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "celeryInDjango.settings")
# 将celery绑定给django 需要放在最前面 避免使用django数据库无效的问题

from celery import Celery

app = Celery('taskManager')


app.config_from_object("celery_tasks.config")
# 加载配置文件

app.autodiscover_tasks(
    ['celery_tasks.async_task', "celery_tasks.time_task"]
)
# 会自动加载指定目录的 `task.py` 文件
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

config

broker_url = "redis://127.0.0.1:6379/5"
result_backend = "redis://127.0.0.1:6379/4"
  • 1
  • 2

某个task:

from celery_tasks.main import app
import time

@app.task
def send_msg(msg):
    print(f"【celery - 异步任务】正在发送代码{msg}")
    time.sleep(10)
    print("END")
    return "ok"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

某个view:

# Create your views here.
from celery.result import AsyncResult
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.views import APIView

from celery_tasks.async_task import tasks
from celery_tasks.main import app


class test(APIView):
    def get(self, request: Request):
        id = request.data.get('id')
        async_ret = AsyncResult(
            id=id,
            app=app
        )
        print(async_ret.status)
        if async_ret.successful():
            return  Response({"ret": async_ret.get()})
        else:
            return Response({"ret": "数据还在处理 请稍后"})


    def post(self, request):
        ret = tasks.send_msg.delay("hello")
        return Response({"status": "OJBK", "id": ret.id})
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

启动celery的指令: celery -A celery_tasks.main worker -l info --pool=solo 启动django的指令: python manage.py runserver.

django-redis

对于Django-redis的官方文档:https://django-redis-chs.readthedocs.io/zh_CN/latest/

我们在安装完毕 django-redis 之后可以进行类似的操作:

CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://127.0.0.1:6379/15",
        "OPTIONS": {
            "CLIENTS_CLASS": "django_redis.client.DefaultClient",
            "REDIS_CLIENT_CLASS" : "fakeredis.FakeStrictRedis",
            "PICKLE_VERSION": -1,
            # 使用最新的一个 pickle序列化版本进行存储python的对象
            "IGNORE_EXCEPTIONS": True,
            # 忽略异常
        }
    }
}

SESSION_ENGINE = "django.contrib.sessions.backends.cache"
# 让 session 利用 缓存存储
SESSION_CACHE_ALIAS = "default" # 指向的是 CACHES 某个指定的键
# 别名

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
from django.core.cache import cache
from django.http import HttpResponse


# Create your views here.


def PutReids(request):
    cache.set("key", "asdasd")
    return HttpResponse("已缓存")

def GetReids(request):
    return HttpResponse(cache.get("key"))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/258104
推荐阅读
  

闽ICP备14008679号