我的主题名是mytopic。我的消费者能够从csv生成JSON数据。我的CSV在下面
owner,Rank,Model,mileage,priceincrore,torque
maws,1,Audi,20,2,7000
drav,2,Benz,23,3,8000
ere,3,Ford,12,1,5400
下面是我的python代码
from kafka import KafkaConsumer
import json
from elasticsearch import Elasticsearch
from datetime import datetime
import datetime
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
consumer = KafkaConsumer(
'mytopic',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
enable_auto_commit=True)
for val in kafkaconsumer:
current_values = loads(val.value)
es.index(index='esindex', doc_type='my_type',id=1, body=current_values)
logger.warning('Undecodable raw error response from server: %s', err) raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info) RequestError: RequestError(400, 'mapper_parsing_exception', 'failed to parse')
目前没有回答
相关问题 更多 >
编程相关推荐