我有下面的DataFrame,它有模式和数据。现在我只想把这些数据写到Kafka主题中,它是vehicleData
Data
+---------------+------------+------+------+-------------+------+
|Registration_No|Plate_Number| Maker| Model|Vehicle_Color|Status|
+---------------+------------+------+------+-------------+------+
| LER-15A-9681| LER9681|Suzuki|Mehran| RED| 0|
| LEV-15-7044| LEV7044| Honda| Civic| GREEN| 0|
| LEC-15-1946| LEC1946| Honda| Civic| WHITE| 0|
Schema is:
root
|-- Registration_No: string (nullable = true)
|-- Plate_Number: string (nullable = true)
|-- Maker: string (nullable = true)
|-- Model: string (nullable = true)
|-- Vehicle_Color: string (nullable = true)
|-- Status: string (nullable = true)
Code is:
raw_data.selectExpr("CAST(Registration_No AS STRING)", "CAST(Plate_Number AS STRING)",
"CAST(Maker AS STRING)", "CAST(Model AS STRING)",
"CAST(Vehicle_Color AS STRING)", "CAST(Status AS STRING)").write\
.format("kafka") \
.option("kafka.bootstrap.servers", "127.0.0.1:9092")\
.option("topic","vehicleData") \
.save()
我得到的错误是:
Traceback (most recent call last):
File "/Users/saeed.butt/PycharmProjects/untitled4/Main.py", line 52, in <module>
.option("topic","vehicleData") \
File "/Users/saeed.butt/PycharmProjects/untitled4/venv/lib/python2.7/site-packages/pyspark/sql/readwriter.py", line 737, in save
self._jwrite.save()
File "/Users/saeed.butt/PycharmProjects/untitled4/venv/lib/python2.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/Users/saeed.butt/PycharmProjects/untitled4/venv/lib/python2.7/site-packages/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;'
kafka服务器正在localhost:9092
上运行
数据帧应该有一个值列才能写入kafka。添加一个名为“value”的列并将其写入kafka
相关问题 更多 >
编程相关推荐