Kafka中使用python pyspark触发流式数据管道的问题

2024-05-20 02:31:40 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用下面的程序并在Anaconda(Spyder)中运行它,以创建从Kafka到Spark streaming&;的数据管道;用python

import sys
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from uuid import uuid1
import os


##Step 1: Initialize sparkcontext
spark_context = SparkContext(appName="Transformation Application")

###Step 2: Initialize streaming context
ssc = StreamingContext(spark_context, 5)

def utf8_decoder(s):
    """ Decode the unicode as UTF-8 """
    if s is None:
        return None
    return s.decode('utf-8')

message = KafkaUtils.createDirectStream(ssc,topics=['testtopic'],kafkaParams={"metadata.broker.list":"localhost:9092","key.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer","value.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer"},fromOffsets=None,messageHandler=None,keyDecoder=utf8_decoder,valueDecoder=utf8_decoder)
message
words = message.map(lambda x: x[1]).flatMap(lambda x: x.split(" "))
wordcount=words.map(lambda x: (x,1)).reduceByKey(lambda a,b:a+b)
wordcount.pprint()

当我打印消息、单词、单词时,如果没有得到正确的结果,则会得到十六进制值

message
Out[16]: <pyspark.streaming.kafka.KafkaDStream at 0x23f8b1f8248>

wordcount
Out[18]: <pyspark.streaming.dstream.TransformedDStream at 0x23f8b2324c8>

在我的主题(testtopic)中,我生成了一个字符串——“您好,您好”,然后wordcount应该给出每个单词的计数,但它给出了一些编码的十六进制值


Tags: kafkalambdafromimportnonemessagecontextutf8