使用Kafka和Python向Elasticsearch发送数据时抛出的RequestError

2024-06-01 11:04:06 发布

您现在位置:Python中文网/ 问答频道 /正文

我的主题名是mytopic。我的消费者能够从csv生成JSON数据。我的CSV在下面

owner,Rank,Model,mileage,priceincrore,torque
maws,1,Audi,20,2,7000
drav,2,Benz,23,3,8000
ere,3,Ford,12,1,5400

下面是我的python代码

from kafka import KafkaConsumer
import json
from elasticsearch import Elasticsearch
from datetime import datetime
import datetime

es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) 
consumer = KafkaConsumer(
   'mytopic',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='latest',
    enable_auto_commit=True)


for val in kafkaconsumer:
    current_values = loads(val.value)
    es.index(index='esindex', doc_type='my_type',id=1, body=current_values)

logger.warning('Undecodable raw error response from server: %s', err) raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info) RequestError: RequestError(400, 'mapper_parsing_exception', 'failed to parse')


Tags: fromimportlocalhostautodatetimeindexestype