我需要Pypark的帮助。我正在从kafka流式传输json数据,我需要在pyspark中转换as Dataframe。为了流式传输,我使用了下面的代码。在
from __future__ import print_function
import sys
import csv
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import Row
import pandas as pd
global gspark
def convert_Json2DF(time,rdd):
nf = gspark.read.json(rdd)
nf.toDF().show()
# Convert RDD[String] to RDD[Row] to DataFrame
#rowRdd = rdd.map(lambda w: Row(word=w))
#wordsDataFrame = gspark.createDataFrame(rowRdd)
#pdf = wordsDataFrame.toDF()
#pdf.show()
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
gspark = SparkSession \
.builder \
.appName("SparkSteaming Kafka Receiver") \
.config("spark.some.config.option", "some-value") \
.config("spark.ui.port", 22300) \
.config("spark.executor.instances", 4) \
.config("spark.executor.cores", 4) \
.getOrCreate()
sc = gspark.sparkContext
SQLContext= SQLContext(sc)
ssc = StreamingContext(sc, 15)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda (key,value): json.loads(value))
lines.pprint()
lines.foreachRDD(Json2DF)
ssc.start()
ssc.awaitTermination()
对于上面的代码,我无法将json数据转换为Dataframe。有谁能纠正我在哪里需要做的更改,在Json2DF函数或main函数中做的。在
谢谢 巴拉
首先,确保所有json数据都有相同的模式。在
相关问题 更多 >
编程相关推荐