如何合并两个数据流(pyspark)?

2024-09-24 20:28:07 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个kafka流,里面有一些输入主题。 这是我为接受kafka流而编写的代码。在

conf = SparkConf().setAppName(appname) 
sc = SparkContext(conf=conf) 
ssc = StreamingContext(sc) 
kvs = KafkaUtils.createDirectStream(ssc, topics,\ 
            {"metadata.broker.list": brokers})

然后我创建两个原始流的键和值的数据流。在

^{pr2}$

然后我在值DStream中执行一些计算。 例如

val = values.flatMap(lambda x: x*2)

现在,我需要组合keys和valdstream并以Kafka流的形式返回结果。在

如何将val与相应的密钥组合?在


Tags: kafka代码主题confvalscsscappname
1条回答
网友
1楼 · 发布于 2024-09-24 20:28:07

您只需在2个数据流上使用join运算符来合并它们。 映射时,实际上是在创建另一个流。所以,join将帮助您将它们合并在一起。在

例如:

Joined_Stream = keys.join(values).(any operation like map, flatmap...)

相关问题 更多 >