将csv文件写入卡夫卡主题

2024-10-01 13:43:36 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个大的csv,我想写一个卡夫卡主题

def producer():
    producer = KafkaProducer(bootstrap_servers='mykafka-broker')
    with open('/home/antonis/repos/testfile.csv') as file:
        reader = csv.DictReader(file, delimiter=";")
        for row in reader:
            producer.send(topic='stable_topic', value=row)
            producer.flush()

if __name__ == '__main__':
    producer()

此代码生成一个错误:

AssertionError: value must be bytes

该文件看起来像:

"timestamp","name","age"
2020-03-01 00:00:01,John,36
2020-03-01 00:00:01,Peter,22

有人能帮我吗


Tags: producercsvname主题topicvaluedefbroker
2条回答

您需要正确地序列化您的值


下面应该可以做到这一点:

import json  

producer = KafkaProducer(
    bootstrap_servers='mykafka-broker',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

与其重新发明轮子,不如使用已经存在的非常好的轮子:)它是Kafka Connect,它是ApacheKafka的一部分

有几个可以从CSV读取的连接器,包括Kafka Connect spooldir(请参见example)和Filepulse

this talk中了解有关卡夫卡连接的更多信息

相关问题 更多 >