PySpark在每个数据帧行上执行普通Python函数

2024-09-29 17:44:29 发布

您现在位置:Python中文网/ 问答频道 /正文

我有Spark DataFrameDF1,有数百万行。每行最多有100列

col1 | col2 | col3 | ... | colN
--------------------------------
v11  | v12  | v13  | ... | v1N
v21  | v22  | v23  | ... | v2N
...  | ...  | ...  | ... | ...

另外,我还有另一个DataFrameDF2,其中有数百行包含name和body列。Name包含函数名,body包含纯Python代码,返回true或false的布尔函数。这些函数在其逻辑中可以引用DF1中单行中的任何列

func_name | func_body
-----------------------------------------------
func1     |   col2 < col45
func2     |   col11.contains("London") and col32*col15 < col21
funcN     |   .... 

我需要将这两个数据帧-DF1和DF2连接起来,并将DF2中的每个函数应用于DF1中的每一行。每个函数都必须能够接受来自DF1的参数,比如说带有键/值对的字典数组,这些键/值对表示来自DF1的对应行的所有列的名称/值

我知道如何连接DF1和DF2,而且我知道Python函数的执行不会以分布式方式工作。现在没问题。这是一个暂时的解决方案。我只需要将DF1中的所有行分布到workers节点上,并将每个Python函数应用到apachespark应用程序的不同任务中的DF1的每一行。对eval()进行求值,并传递包含键/值对的字典数组,正如我前面提到的

通常,每个Python函数都是一个标记,如果某个函数返回true,我希望将其分配给DF1中的行。例如,这是生成的数据帧DF3:

col1 | col2 | col3 | ... | colN | tags
--------------------------------------
v11  | v12  | v13  | ... | v1N  | [func1, func76, funcN]
v21  | v22  | v23  | ... | v2N  | [func32]
...  | ...  | ...  | ... | ...  | [..., ..., ..., ..., ...]

PySpark是否可行?如果可以,请举例说明如何实现?UDF函数使用Mapfrom DF.columns作为输入参数是正确的方法还是可以用更简单的方式实现?Spark对一个时间点可以注册多少UDF函数(数量)有任何限制吗


Tags: 函数bodysparkcol2col3col1df1df2
1条回答
网友
1楼 · 发布于 2024-09-29 17:44:29

可以使用SQL表达式实现这一点,SQL表达式可以使用^{}进行计算。但是,由于SQL表达式不能作为列值进行计算(请参见此post),您将无法连接这两个数据帧,因此您必须将函数收集到一个列表中(因为您只有数百行,它可以放在内存中)

以下是一个工作示例,您可以根据自己的需求进行调整:

data1 = [(1, "val1", 4, 5, "A", 10), (0, "val2", 7, 8, "B", 20),
         (9, "val3", 8, 1, "C", 30), (10, "val4", 2, 9, "D", 30),
         (20, "val5", 6, 5, "E", 50), (3, "val6", 100, 2, "X", 45)]

df1 = spark.createDataFrame(data1, ["col1", "col2", "col3", "col4", "col5", "col6"])

data2 = [("func1", "col1 + col3 = 5 and col2 like '%al1'"),
         ("func2", "col6 = 30 or col1 * col4 > 20"),
         ("func3", "col5 in ('A', 'B', 'C') and col6 - col1 < 30"),
         ("func4", "col2 like 'val%' and col1 > 0")]

df2 = spark.createDataFrame(data2, ["func_name", "func_body"])

# get functions into a list
functions = df2.collect()

# case/when expression to evaluate the functions
satisfied_expr = [when(expr(f.func_body), lit(f.func_name)) for f in functions]

# add new column tags
df1.withColumn("tags", array(*satisfied_expr)) \
    .withColumn("tags", expr("filter(tags, x -> x is not null)")) \
    .show(truncate=False)

在添加数组列tags之后,使用^{}函数删除对应于未满足表达式的空值。此功能仅从Spark 2.4+开始提供,对于旧版本,您必须使用和UDF

给出:

+  +  +  +  +  +  +          -+
|col1|col2|col3|col4|col5|col6|tags                 |
+  +  +  +  +  +  +          -+
|1   |val1|4   |5   |A   |10  |[func1, func3, func4]|
|0   |val2|7   |8   |B   |20  |[func3]              |
|9   |val3|8   |1   |C   |30  |[func2, func3, func4]|
|10  |val4|2   |9   |D   |30  |[func2, func4]       |
|20  |val5|6   |5   |E   |50  |[func2, func4]       |
|3   |val6|100 |2   |X   |45  |[func4]              |
+  +  +  +  +  +  +          -+

相关问题 更多 >

    热门问题