Sanic服务器python3.5创建异步队列以写入AWS SQ

2024-05-09 20:10:06 发布

您现在位置:Python中文网/ 问答频道 /正文

测试Sanic,目前有一些路由,当命中时会触发对SQS的写入。尝试通过将信息添加到队列中,使写入异步,然后由Sanic服务器返回的响应“独立”(以非阻塞方式)使用。在

下面是我目前掌握的代码。它调用了SQS,但似乎我在使用错误的循环/创建多个循环时出错->;我收到一个错误,声明“循环参数必须与未来一致”,服务器只是挂起,根本不返回响应。在

另外,Sanic使用uvloop,我不确定应该如何/是否应该将队列集成到uvloop而不是单独的asyncio循环中。通过传递一个uvloop来实例化Sanic服务器(uvloop.new_event_循环()). 在

import asyncio
asyncio_loop = asyncio.get_event_loop
async_queue = asyncio.Queue()

async def consumer_async_queue(q):
    while True:
        item = await q.get()
        if item is None:
            q.task_done()
            break
        else:
            # function call to SQS
            await record_to_sqs(item['json'], item['log_to_meta'])
            q.task_done()

async def producer_async_queue(q, item):
    await q.put(item)
    await q.put(None)
    await q.join()

async def main(q, item):
    producers = asyncio_loop.create_task(producer_async_queue(q, item))
    consumers = asyncio_loop.create_task(consumer_async_queue(q))
    await asyncio.wait([producers] + [consumers])

async def service():
    * Other Stuff *
    try:
        print(dir(asyncio_loop))
        asyncio_loop.create_task(main(async_queue, item))
        asyncio_loop.run_forever()
    except Exception as e:
        print(e)
        print("ERRORING")
    finally:
        pass
        # asyncio_loop.close()


@app.route('/api/example', methods=['GET'])
async def route(request):
    return await service(request)

Tags: toloopasynciotaskasyncuvloop队列queue