无法从Redis订阅中获取数据?

2024-06-16 03:10:04 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图通过使用我的客户端应用程序上的订阅从redis通道获取数据。为此,我将python与asyncio和aioredis结合使用。你知道吗

我想使用我的订阅来更新我的主应用程序的变量,当这个变量在服务器上更改时,但是我无法将从订阅接收到的数据传递到我的主线程。你知道吗

根据aiorediswebsite,我实现了我的订阅:

sub = await aioredis.create_redis(
     'redis://localhost')

ch1 = await sub.subscribe('channel:1')
assert isinstance(ch1, aioredis.Channel)

async def async_reader(channel, globarVar):
    while await channel.wait_message():
        msg = await channel.get(encoding='utf-8')
        # ... process message ...
        globarVar = float(msg)
        print("message in {}: {}".format(channel.name, msg))

tsk1 = asyncio.ensure_future(async_reader(ch1, upToDateValue))

但是我无法更新全局变量,我猜python只是将当前值作为参数传递(这是我所期望的,但我想确定的)。你知道吗

有没有可行的方法从订阅中获取数据?或者将引用传递给我可以使用的共享变量或队列?你知道吗


Tags: 数据服务器redisasyncio应用程序客户端messageasync
1条回答
网友
1楼 · 发布于 2024-06-16 03:10:04

应该重新设计代码,这样就不需要全局变量。所有的处理都应该在收到消息时进行。但是,要修改全局变量,需要使用global关键字在函数中声明它。你不需要传递全局变量-你只需要使用它们。你知道吗

分包商:

import aioredis
import asyncio
import json

gvar = 2

# Do everything you need here or call another function
# based on the message.  Don't use a global variable.
async def process_message(msg):
  global gvar
  gvar = msg

async def async_reader(channel):
  while await channel.wait_message():
    j = await channel.get(encoding='utf-8')
    msg = json.loads(j)
    if msg == "stop":
      break
    print(gvar)
    await process_message(msg)
    print(gvar)

async def run(loop):
  sub = await aioredis.create_redis('redis://localhost')
  res = await sub.subscribe('channel:1')
  ch1 = res[0]
  assert isinstance(ch1, aioredis.Channel)

  await async_reader(ch1)

  await sub.unsubscribe('channel:1')
  sub.close()

loop = asyncio.get_event_loop()
loop.run_until_complete( run(loop) )
loop.close()

发布者:

import asyncio
import aioredis

async def main():
    pub = await aioredis.create_redis('redis://localhost')

    res = await pub.publish_json('channel:1', ["Hello", "world"])
    await asyncio.sleep(1)
    res = await pub.publish_json('channel:1', "stop")

    pub.close()


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())

相关问题 更多 >