如何让浮士德发送原始回复

2024-05-13 09:45:08 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试使用Faust Stream获得类似字节的回复。但它总是将回复编码为sting(因为faust使用json编解码器进行回复)。是否可以以字节的形式获得回复

import asyncio
import faust


app = faust.App(
    'raw-example',
    broker=...,
    value_serializer='raw',
    reply_create_topic=True,
    topic_partitions=1,
    topic_replication_factor=3,
)

@app.timer(2.0, on_leader=True)
async def publish_greetings():
    print('PUBLISHING ON LEADER!')
    res = await say.ask(value=b'some greeting')
    print(f'Reply: {res} type {type(res)}')

@app.agent()
async def say(greetings):
    async for greeting in greetings:
        print(f'In listener: {greeting} type {type(greeting)}')
        yield greeting

app.conf.web_enabled = False        

async def start_worker(worker):
    await worker.start()

def manage_loop():
    loop = asyncio.get_event_loop()
    worker = faust.Worker(app, loop=loop, loglevel='WARNING')
    try:
        loop.run_until_complete(start_worker(worker))
    finally:
        worker.stop_and_shutdown()


manage_loop()

在这里,它在监听器say中接收bytes,但在ask返回的值中它将str

平台:Linux-4.15.0-1050-azure-x86_64-with-debian-stretch-sid

Python 3.7.3

浮士德:1.10.3


Tags: loopappasynctopic字节deftyperes