使用python进行Spark流式处理:如何添加UUID列?

2024-10-01 02:39:27 发布

您现在位置:Python中文网/ 问答频道 /正文

我想在我的数据帧中添加一个具有生成id的列。我试过:

uuidUdf = udf(lambda x: str(uuid.uuid4()), StringType())
df = df.withColumn("id", uuidUdf())

但是,当我这样做时,我的输出目录中不会写入任何内容。当我删除这些行时,一切正常,所以肯定有一些错误,但我在控制台中看不到任何东西。在

我尝试过使用单调的\u递增的\u id()来代替生成UUID,但是在我的测试中,这会产生很多重复的结果。我需要一个唯一的标识符(不一定是UUID)。在

我该怎么做?在


Tags: 数据lambda目录id内容dfuuid行时
2条回答

请使用lit函数,以便为所有记录生成相同的id。 lit只执行一次函数并获取列值并将其添加到每个记录中。在

>>> df.show(truncate=False)
+ -+
|x  |
+ -+
|0  |
|1  |
|2  |
|3  |
|4  |
|5  |
|6  |
|7  |
|8  |
|9  |
+ -+
>>> import uuid
>>> id = str(uuid.uuid4())
>>> df = df.withColumn("id", lit(id))
>>> df.show(truncate=False)
+ -+                  +
|x  |id                                  |
+ -+                  +
|0  |923b69d6-4bee-423d-a892-79162df5684d|
|1  |923b69d6-4bee-423d-a892-79162df5684d|
|2  |923b69d6-4bee-423d-a892-79162df5684d|
|3  |923b69d6-4bee-423d-a892-79162df5684d|
|4  |923b69d6-4bee-423d-a892-79162df5684d|
|5  |923b69d6-4bee-423d-a892-79162df5684d|
|6  |923b69d6-4bee-423d-a892-79162df5684d|
|7  |923b69d6-4bee-423d-a892-79162df5684d|
|8  |923b69d6-4bee-423d-a892-79162df5684d|
|9  |923b69d6-4bee-423d-a892-79162df5684d|
+ -+                  +

使用udf并不能解决函数的问题,因为它对每一行都会被调用,我们最终会为每个调用获得新的uuid。在

^{pr2}$

请试试这个:

import uuid
from pyspark.sql.functions import udf

uuidUdf= udf(lambda : str(uuid.uuid4()),StringType())
Df1 = Df.withColumn("id",uuidUdf())

注意:您应该在添加新列之后分配给新的DF。(Df1=Df.WITH列(…)

相关问题 更多 >