pyspark json-fi中的spark流到数据帧

2024-09-30 18:17:00 发布

您现在位置:Python中文网/ 问答频道 /正文

我需要Pypark的帮助。我正在从kafka流式传输json数据,我需要在pyspark中转换as Dataframe。为了流式传输,我使用了下面的代码。在

from __future__ import print_function
import sys
import csv
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import Row
import pandas as pd
global gspark
def convert_Json2DF(time,rdd):
    nf = gspark.read.json(rdd)
    nf.toDF().show()
    # Convert RDD[String] to RDD[Row] to DataFrame
    #rowRdd = rdd.map(lambda w: Row(word=w))
    #wordsDataFrame = gspark.createDataFrame(rowRdd)
    #pdf = wordsDataFrame.toDF()
    #pdf.show()
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)
    gspark = SparkSession \
        .builder \
        .appName("SparkSteaming Kafka Receiver") \
        .config("spark.some.config.option", "some-value") \
        .config("spark.ui.port", 22300) \
        .config("spark.executor.instances", 4) \
        .config("spark.executor.cores", 4) \
        .getOrCreate()
    sc = gspark.sparkContext
    SQLContext= SQLContext(sc)
    ssc = StreamingContext(sc, 15)
    zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    lines = kvs.map(lambda (key,value): json.loads(value))
    lines.pprint()
    lines.foreachRDD(Json2DF)
ssc.start()
ssc.awaitTermination()

对于上面的代码,我无法将json数据转换为Dataframe。有谁能纠正我在哪里需要做的更改,在Json2DF函数或main函数中做的。在

谢谢 巴拉


Tags: kafkafromimportconfigjsonsqlsysspark
1条回答
网友
1楼 · 发布于 2024-09-30 18:17:00

首先,确保所有json数据都有相同的模式。在

def check_json(js, col):
    try:
        data = json.loads(js)
        return [data.get(i) for i in col]
    except:
        return []


def convert_json2df(rdd, col):
    ss = SparkSession(rdd.context)
    if rdd.isEmpty():
        return
    df = ss.createDataFrame(rdd, schema=StructType("based on 'col'"))
    df.show()


cols = ['idx', 'name']

sc = SparkContext()
ssc = StreamingContext(sc, 5)

lines = ssc.socketTextStream('localhost', 9999) \
    .map(lambda x: check_json(x, cols)) \
    .filter(lambda x: x) \
    .foreachRDD(lambda x: convert_json2df(x, cols))

ssc.start()
ssc.awaitTermination()

相关问题 更多 >