测试Sanic,目前有一些路由,当命中时会触发对SQS的写入。尝试通过将信息添加到队列中,使写入异步,然后由Sanic服务器返回的响应“独立”(以非阻塞方式)使用。在
下面是我目前掌握的代码。它调用了SQS,但似乎我在使用错误的循环/创建多个循环时出错->;我收到一个错误,声明“循环参数必须与未来一致”,服务器只是挂起,根本不返回响应。在
另外,Sanic使用uvloop,我不确定应该如何/是否应该将队列集成到uvloop而不是单独的asyncio循环中。通过传递一个uvloop来实例化Sanic服务器(uvloop.new_event_循环()). 在
import asyncio
asyncio_loop = asyncio.get_event_loop
async_queue = asyncio.Queue()
async def consumer_async_queue(q):
while True:
item = await q.get()
if item is None:
q.task_done()
break
else:
# function call to SQS
await record_to_sqs(item['json'], item['log_to_meta'])
q.task_done()
async def producer_async_queue(q, item):
await q.put(item)
await q.put(None)
await q.join()
async def main(q, item):
producers = asyncio_loop.create_task(producer_async_queue(q, item))
consumers = asyncio_loop.create_task(consumer_async_queue(q))
await asyncio.wait([producers] + [consumers])
async def service():
* Other Stuff *
try:
print(dir(asyncio_loop))
asyncio_loop.create_task(main(async_queue, item))
asyncio_loop.run_forever()
except Exception as e:
print(e)
print("ERRORING")
finally:
pass
# asyncio_loop.close()
@app.route('/api/example', methods=['GET'])
async def route(request):
return await service(request)
目前没有回答
相关问题 更多 >
编程相关推荐