我正在尝试将两个数据帧上的pandas函数转换为pyspark函数。你知道吗
特别是,我有一个键和函数作为字符串的数据帧,即:
> mv
| Keys | Formula | label |
---------------------------------------
| key1 | 'val1 + val2 - val3' | name1 |
| key2 | 'val3 + val4' | name2 |
| key3 | 'val1 - val4' | name3 |
以及数据帧df:
> df
| Keys | Datetime | names | values |
------------------------------------
| key1 | tmstmp1 | val1 | 0.3 |
| key1 | tmstmp1 | val2 | 0.4 |
| key1 | tmstmp1 | val3 | 0.2 |
| key1 | tmstmp1 | val4 | 1.2 |
| key1 | tmstmp2 | val1 | 0.5 |
| key2 | tmstmp1 | val1 | 1.1 |
| key2 | tmstmp2 | val3 | 1.0 |
| key2 | tmstmp1 | val3 | 1.3 |
等等。你知道吗
我已经创建了两个函数来读取代码,计算字符串ok和度量表达式,并返回1.数据帧最后我发现了。你知道吗
def evaluate_vm(x, regex):
m = re.findall(regex, x)
to_replace = ['#[' + i + ']' for i in m]
replaces = [i.split(', ') for i in m]
replacement = ["df.loc[(df.Keys == %s) & (df.names == %s), ['Datetime', 'values']].dropna().set_index('Datetime')"%tuple(i) for i in replaces]
for i in range(len(to_replace)):
x = x.replace(to_replace[i], replacement[i])
return eval(x)
def _mv_(X):
formula = evaluate_vm(X.Formula)
formula['Keys'] = X.Keys
formula.reset_index(inplace = True)
formula.rename_axis({'Formula': 'Values'}, axis = 1, inplace = True)
return formula[['Keys', 'Datetime', 'names', 'Values']]
在那之后我的代码是
res = pd.concat([_mv_(mv.loc[i]) for i in mv.index])
而res是我需要得到的。你知道吗
注意:为了使其易于理解,我稍微修改了函数和输入:无论如何,我认为问题不在这里。你知道吗
事情是这样的。我想用Pypark来改造这个东西。你知道吗
这就是我写的代码。你知道吗
from pyspark.sql.functions import pandas_udf, PandasUDFType, struct
from pyspark.sql.types import FloatType, StringType, IntegerType, TimestampType, StructType, StructField
EvaluateVM = pandas_udf(lambda x: _mv_(x),\
functionType = PandasUDFType.GROUPED_MAP, \
returnType = StructType([StructField("Keys", StringType(), False),
StructField("Datetime", TimestampType(), False),\
StructField("names", StringType(), False),\
StructField("Values", FloatType(), False)])
)
res = EvaluateVM(struct([mv[i] for i in mv.columns]))
这是“几乎”工作:当我打印res这里的结果。你知道吗
> res
Column<<lambda>(named_struct(Keys, Keys, Formula, Formula))>
我看不到res的内部:我认为它创建了类似python iterable的东西,但我希望得到与pandas相同的结果。你知道吗
我该怎么办?我全搞错了吗?你知道吗
编辑:我认为问题可能在于,在pandas中,我创建了一个数据帧列表,在对它们求值后将其连接起来;在pyspark中,我使用了一种apply(_mv_, axis = 1)
:这种语法甚至在pandas中也给了我错误(cannot concatenate dataframe of dimension 192 into one of size 5
,类似的东西),我的解决方法是pandas.concat([…])
。我不知道这在pyspark中是否也有效,或者是否有某种方法可以避免这种情况。你知道吗
编辑2:抱歉,我没有编写预期的输出:
| Keys | Datetime | label | values |
---------------------------------------------
| key1 | tmstmp1 | name1 | 0.3 + 0.4 - 0.2 |
| key1 | tmstmp1 | name2 | 0.2 + 1.2 |
等等。values列应该包含数值结果,为了让您理解,我在这里编写了操作数。你知道吗
目前没有回答
相关问题 更多 >
编程相关推荐