我有一个大的csv,我想写一个卡夫卡主题
def producer():
producer = KafkaProducer(bootstrap_servers='mykafka-broker')
with open('/home/antonis/repos/testfile.csv') as file:
reader = csv.DictReader(file, delimiter=";")
for row in reader:
producer.send(topic='stable_topic', value=row)
producer.flush()
if __name__ == '__main__':
producer()
此代码生成一个错误:
AssertionError: value must be bytes
该文件看起来像:
"timestamp","name","age"
2020-03-01 00:00:01,John,36
2020-03-01 00:00:01,Peter,22
有人能帮我吗
您需要正确地序列化您的值
下面应该可以做到这一点:
与其重新发明轮子,不如使用已经存在的非常好的轮子:)它是Kafka Connect,它是ApacheKafka的一部分
有几个可以从CSV读取的连接器,包括Kafka Connect spooldir(请参见example)和Filepulse
在this talk中了解有关卡夫卡连接的更多信息
相关问题 更多 >
编程相关推荐