redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
[channel] = await redis.psubscribe('bigfoot:broadcast:channel:*')
while True:
message = await channel.get()
pp(message)
asyncio.run(main())
用于婚配形式的 redis.psubscribe 函数和非形式婚配的 redis.subscribe 函数都前往 Python 列表, 以便包含不定数量的元素。 顺序将解构这个列表(Python 的术语是解包)以取得我想要的通道, 并在之后运用 .get 停止阻塞调用以等候下一条音讯。由于我想要接纳一切跟大脚兽有关的音讯, 所以我在这段代码的第 10 行运用 redis.psubscribe 订阅了一个 Glob 作风的形式, 经过运用 bigfoot:broadcast:channel:* 作为形式, 客户端将接纳到一切以 bigfoot:broadcast:channel: 扫尾的事情。
发布事情十分复杂, 下面是代码:
import asyncio
import aioredis
async def main():
redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
await asyncio.gather(
publish(redis, 1, 'Possible vocalizations east of Makanda'),
publish(redis, 2, 'Sighting near the Columbia River'),
publish(redis, 2, 'Chased by a tall hairy creature')
)
redis.close()
await redis.wait_closed()
def publish(redis, channel, message):
return redis.publish(f'bigfoot:broadcast:channel:{channel}', message)
asyncio.run(main())
值得留意的是, 发布与订阅是一个发送即遗忘机制(fire-and-forget)。 假设代码发布了一个事情但是却没有人监听, 那么该事情就会消逝。 假设你想让本人的事情继续存在, 那么可以思索运用前面提到的队列, 又或许接上去将要引见的 Redis 流。这段代码的重点是第 18 行, 它运用了名字十分直接的 redis.publish 来讲音讯发布至所需的通道。
运用 Redis 贮存数据流
除了发布与订阅之外, Redis 还可以运用流来发布和订阅事情。 Redis 流 是一个十分大的话题, 但运用它只需求 掌握大批命令 。 从 Python 来看, 这些命令的用法都是十分复杂的, 我将逐一向你阐明。
下面的代码将把三次大脚兽的目击事情添加到流外面。
import asyncio
import aioredis
async def main():
redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
await asyncio.gather(
add_to_stream(redis, 1, 'Possible vocalizations east of Makanda', 'Class B'),
add_to_stream(redis, 2, 'Sighting near the Columbia River', 'Class A'),
add_to_stream(redis, 3, 'Chased by a tall hairy creature', 'Class A'))
redis.close()
await redis.wait_closed()
def add_to_stream(redis, id, title, classification):
return redis.xadd('bigfoot:sightings:stream', {
(责任编辑:admin)