我有Spark DataFrameDF1,有数百万行。每行最多有100列
col1 | col2 | col3 | ... | colN
--------------------------------
v11 | v12 | v13 | ... | v1N
v21 | v22 | v23 | ... | v2N
... | ... | ... | ... | ...
另外,我还有另一个DataFrameDF2,其中有数百行包含name和body列。Name包含函数名,body包含纯Python代码,返回true或false的布尔函数。这些函数在其逻辑中可以引用DF1中单行中的任何列
func_name | func_body
-----------------------------------------------
func1 | col2 < col45
func2 | col11.contains("London") and col32*col15 < col21
funcN | ....
我需要将这两个数据帧-DF1和DF2连接起来,并将DF2中的每个函数应用于DF1中的每一行。每个函数都必须能够接受来自DF1的参数,比如说带有键/值对的字典数组,这些键/值对表示来自DF1的对应行的所有列的名称/值
我知道如何连接DF1和DF2,而且我知道Python函数的执行不会以分布式方式工作。现在没问题。这是一个暂时的解决方案。我只需要将DF1中的所有行分布到workers节点上,并将每个Python函数应用到apachespark应用程序的不同任务中的DF1的每一行。对eval()
进行求值,并传递包含键/值对的字典数组,正如我前面提到的
通常,每个Python函数都是一个标记,如果某个函数返回true,我希望将其分配给DF1中的行。例如,这是生成的数据帧DF3:
col1 | col2 | col3 | ... | colN | tags
--------------------------------------
v11 | v12 | v13 | ... | v1N | [func1, func76, funcN]
v21 | v22 | v23 | ... | v2N | [func32]
... | ... | ... | ... | ... | [..., ..., ..., ..., ...]
PySpark是否可行?如果可以,请举例说明如何实现?UDF函数使用Map
from DF.columns
作为输入参数是正确的方法还是可以用更简单的方式实现?Spark对一个时间点可以注册多少UDF函数(数量)有任何限制吗
可以使用SQL表达式实现这一点,SQL表达式可以使用^{} 进行计算。但是,由于SQL表达式不能作为列值进行计算(请参见此post),您将无法连接这两个数据帧,因此您必须将函数收集到一个列表中(因为您只有数百行,它可以放在内存中)
以下是一个工作示例,您可以根据自己的需求进行调整:
在添加数组列} 函数删除对应于未满足表达式的空值。此功能仅从Spark 2.4+开始提供,对于旧版本,您必须使用和UDF
tags
之后,使用^{给出:
相关问题 更多 >
编程相关推荐