当前位置:   article > 正文

Python 异步 redis_redis.asyncio

redis.asyncio

From:https://zhuanlan.zhihu.com/p/24720629

  1. aioredis 要求装上 hiredis , 而 aredis 可以不需要相关依赖地运行,速度上两者持平,且都可以使用 hiredis 来作为 parser ,用 uvloop 代替 asyncio 的 eventloop 来加速
  2. asyncio_redis 使用了 Python 提供的 protocol 来进行异步通信,而 aredis 则使用 StreamReader 和 StreamWriter 来进行异步通信,在运行速度上两倍于 asyncio_redis ,附上 benchmark
  3. aioredis 和 asyncio_redis 这两个客户端目前都还没有对于集群的支持,相对来说 aredis 的功能更为全面一些

1、aredis

github 地址:https://github.com/NoneGG/aredis
aredis 官方英文文档:https://aredis.readthedocs.io/en/latest/
aredis 一个高效和用户友好的异步Redis客户端:https://www.ctolib.com/aredis.html
更多使用示例:https://github.com/NoneGG/aredis/tree/master/examples

安装:pip install aredis

单节点

  1. import asyncio
  2. from aredis import StrictRedis
  3. async def example():
  4. client = StrictRedis(host='127.0.0.1', port=6379, db=0)
  5. await client.flushdb()
  6. await client.set('foo', 1)
  7. assert await client.exists('foo') is True
  8. await client.incr('foo', 100)
  9. assert int(await client.get('foo')) == 101
  10. await client.expire('foo', 1)
  11. await asyncio.sleep(0.1)
  12. await client.ttl('foo')
  13. await asyncio.sleep(1)
  14. assert not await client.exists('foo')
  15. loop = asyncio.get_event_loop()
  16. loop.run_until_complete(example())

集群版

  1. import asyncio
  2. from aredis import StrictRedisCluster
  3. async def example():
  4. client = StrictRedisCluster(host='172.17.0.2', port=7001)
  5. await client.flushdb()
  6. await client.set('foo', 1)
  7. await client.lpush('a', 1)
  8. print(await client.cluster_slots())
  9. await client.rpoplpush('a', 'b')
  10. assert await client.rpop('b') == b'1'
  11. loop = asyncio.get_event_loop()
  12. loop.run_until_complete(example())

2、aioredis

github 地址:https://github.com/aio-libs/aioredis
官方文档:https://aioredis.readthedocs.io/en/v1.3.0/

从 redis.py  4.2.0rc1+ 开始,Aioredis 已经集成到 redis.py 里面,

导入:from redis import asyncio as aioredis

安装:pip install aioredis

连接 redis

  1. import asyncio
  2. import aioredis
  3. async def main():
  4. redis = await aioredis.create_redis_pool('redis://localhost')
  5. await redis.set('my-key', 'value')
  6. value = await redis.get('my-key', encoding='utf-8')
  7. print(value)
  8. redis.close()
  9. await redis.wait_closed()
  10. asyncio.run(main())

连接到指定 db

  1. 指定 db 参数:redis = await aioredis.create_redis_pool('redis://localhost', db=1)
  2. 在 URL 中指定 db:redis = await aioredis.create_redis_pool('redis://localhost/2')
  3. 使用 select 方法:
    redis = await aioredis.create_redis_pool('redis://localhost/')
    await redis.select(3)

连接带密码的 redis

# 密码可以通过关键之指定,也可以通过 URL 指定:

redis = await aioredis.create_redis_pool('redis://localhost', password='sEcRet')

redis = await aioredis.create_redis_pool('redis://:sEcRet@localhost/')

结果编码:aioredis 默认返回字节类型,可以通过传递关键字 encoding="utf-8" 自动解码,也可以获取到字节类型后,通过 decode("utf-8") 进行解码。

示例代码:

  1. import asyncio
  2. import aioredis
  3. async def main():
  4. redis = await aioredis.create_redis_pool('redis://localhost')
  5. await redis.set('key', 'string-value')
  6. bin_value = await redis.get('key')
  7. assert bin_value == b'string-value'
  8. str_value = await redis.get('key', encoding='utf-8')
  9. assert str_value == 'string-value'
  10. await redis.hmset_dict(
  11. 'hash', key1='value1', key2='value2', key3=123
  12. )
  13. result = await redis.hgetall('hash', encoding='utf-8')
  14. assert result == {
  15. 'key1': 'value1',
  16. 'key2': 'value2',
  17. 'key3': '123', # 注意: redis 返回的int会作为str对待
  18. }
  19. redis.close()
  20. await redis.wait_closed()
  21. asyncio.run(main())

简单 低级别 接口

  1. import asyncio
  2. import aioredis
  3. loop = asyncio.get_event_loop()
  4. async def go():
  5. conn = await aioredis.create_connection(('localhost', 6379), loop=loop)
  6. await conn.execute('set', 'my-key', 'value')
  7. val = await conn.execute('get', 'my-key')
  8. print(val)
  9. conn.close()
  10. await conn.wait_closed()
  11. loop.run_until_complete(go())
  12. # will print 'value'

简单 高级别 接口

  1. import asyncio
  2. import aioredis
  3. loop = asyncio.get_event_loop()
  4. async def go():
  5. redis = await aioredis.create_redis(('localhost', 6379), loop=loop)
  6. await redis.set('my-key', 'value')
  7. val = await redis.get('my-key')
  8. print(val)
  9. redis.close()
  10. await redis.wait_closed()
  11. loop.run_until_complete(go())
  12. # will print 'value'

连接池

  1. import asyncio
  2. import aioredis
  3. loop = asyncio.get_event_loop()
  4. async def func_1():
  5. conn = await aioredis.create_connection(
  6. ('localhost', 6379), loop=loop)
  7. await conn.execute('set', 'my-key', 'value')
  8. val = await conn.execute('get', 'my-key')
  9. print(val)
  10. conn.close()
  11. await conn.wait_closed()
  12. async def func_2():
  13. redis = await aioredis.create_redis(('localhost', 6379), loop=loop)
  14. await redis.set('my-key', 'value')
  15. val = await redis.get('my-key')
  16. print(val)
  17. redis.close()
  18. await redis.wait_closed()
  19. async def func_3():
  20. pool = await aioredis.create_pool(
  21. ('localhost', 6379),
  22. minsize=5, maxsize=10,
  23. loop=loop
  24. )
  25. async with await pool as redis: # 高级别 redis API 实例
  26. await redis.set('my-key', 'value')
  27. print(await redis.get('my-key'))
  28. # 优雅的关闭
  29. pool.close()
  30. await pool.wait_closed()
  31. async def func_4():
  32. redis_pool = await aioredis.create_pool(
  33. ('localhost', 6379),
  34. minsize=5, maxsize=10,
  35. loop=loop
  36. )
  37. async with redis_pool.get() as conn: # 高级别 redis API 实例
  38. await conn.execute('set', 'my-key', 'value')
  39. print(await conn.execute('get', 'my-key'))
  40. # graceful shutdown
  41. redis_pool.close()
  42. await redis_pool.wait_closed()
  43. loop.run_until_complete(func_1)

示例:

  1. from sanic import Sanic, response
  2. import aioredis
  3. app = Sanic(__name__)
  4. @app.route("/")
  5. async def handle(request):
  6. async with request.app.redis_pool.get() as redis:
  7. await redis.execute('set', 'my-key', 'value')
  8. val = await redis.execute('get', 'my-key')
  9. return response.text(val.decode('utf-8'))
  10. @app.listener('before_server_start')
  11. async def before_server_start(app, loop):
  12. app.redis_pool = await aioredis.create_pool(
  13. ('localhost', 6379),
  14. minsize=5,
  15. maxsize=10,
  16. loop=loop
  17. )
  18. @app.listener('after_server_stop')
  19. async def after_server_stop(app, loop):
  20. app.redis_pool.close()
  21. await app.redis_pool.wait_closed()
  22. if __name__ == '__main__':
  23. app.run(host="0.0.0.0", port=80)

事务( Multi/Exec )

  1. import asyncio
  2. import aioredis
  3. async def main():
  4. redis = await aioredis.create_redis_pool('redis://localhost')
  5. tr = redis.multi_exec()
  6. tr.set('key1', 'value1')
  7. tr.set('key2', 'value2')
  8. ok1, ok2 = await tr.execute()
  9. assert ok1
  10. assert ok2
  11. asyncio.run(main())

multi_exec() 创建和返回一个新的 MultiExec 对象用于缓冲命令,然后在 MULTI / EXEC 块中执行它们。

重要提示:不要在 类似 ( tr.set('foo', '123') ) 上 使用 await buffered 命令, 因为它将被永远阻塞。

下面的代码将会给永远阻塞:

tr = redis.multi_exec()
await tr.incr('foo')   # that's all. we've stuck!

发布订阅 模式

aioredis 提供了对 Redis 的 发布/订阅(Publish / Subscribe) 消息的支持。

To start listening for messages you must call either subscribe() or psubscribe() method. Both methods return list of Channel objects representing subscribed channels.

Right after that the channel will receive and store messages (the Channel object is basically a wrapper around asyncio.Queue). To read messages from channel you need to use get() or get_json() coroutines.

要开始监听消息,必须调用 subscribe() 或 psubscribe() 方法。这两个方法都返回一个列表,列表中的元素是 "订阅的 Channel(通道) 对象"。在此之后,Channel(通道) 将接收并存储消息 ( "Channel(通道) " 基本上是 asyncio.Queue 的包装器)。要从 channel 中读取消息,需要使用get()或get_json()协程。

订阅 和 阅读 频道 示例:

  1. import asyncio
  2. import aioredis
  3. async def main():
  4. redis = await aioredis.create_redis_pool('redis://localhost')
  5. ch1, ch2 = await redis.subscribe('channel:1', 'channel:2')
  6. assert isinstance(ch1, aioredis.Channel)
  7. assert isinstance(ch2, aioredis.Channel)
  8. async def reader(channel):
  9. async for message in channel.iter():
  10. print("Got message:", message)
  11. asyncio.get_running_loop().create_task(reader(ch1))
  12. asyncio.get_running_loop().create_task(reader(ch2))
  13. await redis.publish('channel:1', 'Hello')
  14. await redis.publish('channel:2', 'World')
  15. redis.close()
  16. await redis.wait_closed()
  17. asyncio.run(main())

订阅 和 阅读 模式:

  1. import asyncio
  2. import aioredis
  3. async def main():
  4. redis = await aioredis.create_redis_pool('redis://localhost')
  5. ch, = await redis.psubscribe('channel:*')
  6. assert isinstance(ch, aioredis.Channel)
  7. async def reader(channel):
  8. async for ch, message in channel.iter():
  9. print("Got message in channel:", ch, ":", message)
  10. asyncio.get_running_loop().create_task(reader(ch))
  11. await redis.publish('channel:1', 'Hello')
  12. await redis.publish('channel:2', 'World')
  13. redis.close()
  14. await redis.wait_closed()
  15. asyncio.run(main())

Sentinel ( 哨兵 )

Redis(主从复制、哨兵模式、集群):https://blog.csdn.net/Bilson99/article/details/118732296

哨兵的核心功能:在主从复制的基础上,哨兵引入了主节点的自动故障转移

哨兵模式的原理:哨兵(sentinel) 是一个分布式系统,用于对主从结构中的每台服务器进行监控,当出现故障时通过投票机制选择新的 Master 并将所有 Slave 连接到新的 Master。所以整个运行哨兵的集群的数量不得少于3个节点。

哨兵模式的作用

  • 监控:哨兵会不断地检查主节点和从节点是否运作正常。
  • 自动故障转移:当主节点不能正常工作时,哨兵会开始自动故障转移操作,它会将失效主节点的其中一个从节点升级为新的主节点,并让其他从节点改为复制新的主节点。
  • 通知(提醒):哨兵可以将故障转移的结果发送给客户端。

哨兵结构由两部分组成,哨兵节点和数据节点:

  • 哨兵节点:哨兵系统由一个或多个哨兵节点组成,哨兵节点是特殊的redis节点,不存储数据。
  • 数据节点:主节点和从节点都是数据节点。

哨兵的启动依赖于主从模式,所以须把主从模式安装好的情况下再去做哨兵模式,所有节点上都需要部署哨兵模式,哨兵模式会监控所有的 Redis 工作节点是否正常,当 Master 出现问题的时候,因为其他节点与主节点失去联系,因此会投票,投票过半就认为这个 Master 的确出现问题,然后会通知哨兵间,然后从 Slaves 中选取一个作为新的 Master。

需要特别注意的是,客观下线是主节点才有的概念;如果从节点和哨兵节点发生故障,被哨兵主观下线后,不会再有后续的客观下线和故障转移操作。

  1. import asyncio
  2. import aioredis
  3. async def main():
  4. sentinel = await aioredis.create_sentinel(
  5. ['redis://localhost:26379', 'redis://sentinel2:26379'])
  6. redis = sentinel.master_for('mymaster')
  7. ok = await redis.set('key', 'value')
  8. assert ok
  9. val = await redis.get('key', encoding='utf-8')
  10. assert val == 'value'
  11. asyncio.run(main())

Sentinel 客户端需要一个 Redis Sentinel 地址列表,来连接并开始发现服务。

调用 master_for() 或 slave_for() 方法 将返回连接到 Sentinel 监视的指定服务的 Redis 客户端。

Sentinel 客户端将自动检测故障转移并重新连接 Redis 客户端。

  1. import asyncio
  2. import aioredis
  3. loop = asyncio.get_event_loop()
  4. async def go():
  5. conn = await aioredis.create_connection(
  6. ('localhost', 6379), loop=loop)
  7. await conn.execute('set', 'my-key', 'value')
  8. val = await conn.execute('get', 'my-key')
  9. print(val)
  10. conn.close()
  11. await conn.wait_closed()
  12. loop.run_until_complete(go())
  13. # will print 'value'

示例

  1. import uuid
  2. import time
  3. import json
  4. import datetime
  5. import uvicorn
  6. from pathlib import Path
  7. from fastapi import FastAPI
  8. from typing import Optional
  9. import redis
  10. from redis import asyncio as aioredis
  11. from concurrent.futures import ThreadPoolExecutor, wait
  12. redis_config = {
  13. 'host': '172.16.30.180',
  14. 'port': 6379,
  15. 'db': 1,
  16. }
  17. redis_db_yibu = aioredis.StrictRedis(**redis_config)
  18. redis_db_tongbu = redis.StrictRedis(**redis_config)
  19. app = FastAPI()
  20. @app.get("/api_test")
  21. async def func_handle_request(q: Optional[str] = None):
  22. """和 Flask 不同,Flask 是使用 <>,而 FastAPI 使用 {}"""
  23. print(f'q ---> {q}')
  24. current_timestamp = datetime.datetime.now().timestamp()
  25. req_uuid = str(uuid.uuid5(uuid.NAMESPACE_URL, f'{current_timestamp}{q}'))
  26. await redis_db_yibu.hset('api_test_req', req_uuid, q)
  27. while True:
  28. result = await redis_db_yibu.hget('api_test_resp', req_uuid)
  29. if not result:
  30. time.sleep(0.1)
  31. print('睡眠 100ms 继续监听')
  32. continue
  33. break
  34. return {"result": result}
  35. def http_server():
  36. """
  37. :return:
  38. """
  39. print(f'{Path(__file__).stem}:app')
  40. uvicorn.run(f'{Path(__file__).stem}:app', host="0.0.0.0", port=9999)
  41. def func_consumer(task_string=None):
  42. task_dict = json.loads(task_string)
  43. data = {'resp': str(datetime.datetime.now())}
  44. for k, v in task_dict.items():
  45. redis_db_tongbu.hset('api_test_resp', k, json.dumps(data, ensure_ascii=False))
  46. redis_db_tongbu.hdel('api_test_req', k)
  47. print(f'请求 [{k}] ---> 处理成功')
  48. def func_producer():
  49. worker_count = 50
  50. with ThreadPoolExecutor(max_workers=worker_count) as pool:
  51. while True:
  52. task_list = redis_db_tongbu.hgetall('api_test_req')
  53. if task_list:
  54. for k, v in task_list.items():
  55. task_dict = {k.decode('utf-8'): v.decode('utf-8')}
  56. task_string = json.dumps(task_dict, ensure_ascii=False)
  57. pool.submit(func_consumer, task_string)
  58. pass
  59. else:
  60. # print('task 为空,睡100ms继续')
  61. time.sleep(0.1)
  62. pass
  63. def main():
  64. with ThreadPoolExecutor(max_workers=2) as pool:
  65. pool.submit(http_server)
  66. pool.submit(func_producer)
  67. if __name__ == '__main__':
  68. main()
  69. pass

3、asynio_redis

GitHub 地址:https://github.com/jonathanslenders/asyncio-redis

官方英文文档:https://asyncio-redis.readthedocs.io/en/latest/

安装:pip install asyncio_redis

asynio_redis(下划线) 和 asyncio-redis(中划线)都已不再更新,推荐 aioredis

示例:Connection 类

asyncio_redis.Connection instance will take care of the connection and will automatically reconnect, using a new transport when the connection drops. This connection class also acts as a proxy to a asyncio_redis.RedisProtocol instance; any Redis command of the protocol can be called directly at the connection.

  1. import asyncio
  2. import asyncio_redis
  3. async def example():
  4. # Create Redis connection
  5. connection = await asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
  6. # Set a key
  7. await connection.set('my_key', 'my_value')
  8. # When finished, close the connection.
  9. connection.close()
  10. if __name__ == '__main__':
  11. loop = asyncio.get_event_loop()
  12. loop.run_until_complete(example())

示例:连接池

Requests will automatically be distributed among all connections in a pool. If a connection is blocking because of --for instance-- a blocking rpop, another connection will be used for new commands.

  1. import asyncio
  2. import asyncio_redis
  3. async def example():
  4. # Create Redis connection
  5. connection = await asyncio_redis.Pool.create(host='127.0.0.1', port=6379, poolsize=10)
  6. # Set a key
  7. await connection.set('my_key', 'my_value')
  8. # When finished, close the connection pool.
  9. connection.close()
  10. if __name__ == '__main__':
  11. loop = asyncio.get_event_loop()
  12. loop.run_until_complete(example())

示例

  1. import asyncio
  2. import asyncio_redis
  3. async def example():
  4. # Create Redis connection
  5. connection = await asyncio_redis.Pool.create(
  6. host='127.0.0.1', port=6379, poolsize=10
  7. )
  8. # Create transaction
  9. transaction = await connection.multi()
  10. # Run commands in transaction (they return future objects)
  11. f1 = await transaction.set('key', 'value')
  12. f2 = await transaction.set('another_key', 'another_value')
  13. # Commit transaction
  14. await transaction.exec()
  15. # Retrieve results
  16. result1 = await f1
  17. result2 = await f2
  18. # When finished, close the connection pool.
  19. connection.close()

只要有事务在其中运行,连接就会被占用。建议使用足够大的池大小。

示例:发布 / 订阅

Pub / sub 

  1. import asyncio
  2. import asyncio_redis
  3. async def example():
  4. # Create connection
  5. connection = await asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
  6. # Create subscriber.
  7. subscriber = await connection.start_subscribe()
  8. # Subscribe to channel.
  9. await subscriber.subscribe([ 'our-channel' ])
  10. # Inside a while loop, wait for incoming events.
  11. while True:
  12. reply = await subscriber.next_published()
  13. print('Received: ', repr(reply.value), 'on channel', reply.channel)
  14. # When finished, close the connection.
  15. connection.close()

示例:LUA 脚本

  1. import asyncio
  2. import asyncio_redis
  3. code = """
  4. local value = redis.call('GET', KEYS[1])
  5. value = tonumber(value)
  6. return value * ARGV[1]
  7. """
  8. async def example():
  9. connection = await asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
  10. # Set a key
  11. await connection.set('my_key', '2')
  12. # Register script
  13. multiply = await connection.register_script(code)
  14. # Run script
  15. script_reply = await multiply.run(keys=['my_key'], args=['5'])
  16. result = await script_reply.return_value()
  17. print(result) # prints 2 * 5
  18. # When finished, close the connection.
  19. connection.close()

示例:使用 Protocol类

  1. import asyncio
  2. import asyncio_redis
  3. async def example():
  4. loop = asyncio.get_event_loop()
  5. # Create Redis connection
  6. transport, protocol = await loop.create_connection(
  7. asyncio_redis.RedisProtocol, '127.0.0.1', 6379
  8. )
  9. # Set a key
  10. await protocol.set('my_key', 'my_value')
  11. # Get a key
  12. result = await protocol.get('my_key')
  13. print(result)
  14. # Close transport when finished.
  15. transport.close()
  16. if __name__ == '__main__':
  17. asyncio.get_event_loop().run_until_complete(example())
  18. pass

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/670187
推荐阅读
相关标签
  

闽ICP备14008679号