从python到pyspark EDIT的复杂函数:连接问题(我想)

2024-06-25 06:46:28 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试将两个数据帧上的pandas函数转换为pyspark函数。你知道吗

特别是,我有一个键和函数作为字符串的数据帧,即:

> mv

| Keys |       Formula        | label |
---------------------------------------
| key1 | 'val1 + val2 - val3' | name1 |
| key2 | 'val3 + val4'        | name2 |
| key3 | 'val1 - val4'        | name3 |

以及数据帧df:

> df

| Keys | Datetime | names | values |
------------------------------------
| key1 | tmstmp1  | val1  |  0.3   |
| key1 | tmstmp1  | val2  |  0.4   |
| key1 | tmstmp1  | val3  |  0.2   |
| key1 | tmstmp1  | val4  |  1.2   |
| key1 | tmstmp2  | val1  |  0.5   |
| key2 | tmstmp1  | val1  |  1.1   |
| key2 | tmstmp2  | val3  |  1.0   |
| key2 | tmstmp1  | val3  |  1.3   |

等等。你知道吗

我已经创建了两个函数来读取代码,计算字符串ok和度量表达式,并返回1.数据帧最后我发现了。你知道吗

def evaluate_vm(x, regex):
    m = re.findall(regex, x)
    to_replace = ['#[' + i + ']' for i in m]
    replaces = [i.split(', ') for i in m]
    replacement = ["df.loc[(df.Keys == %s) & (df.names == %s), ['Datetime', 'values']].dropna().set_index('Datetime')"%tuple(i) for i in replaces]
    for i in range(len(to_replace)):
        x = x.replace(to_replace[i], replacement[i])
    return eval(x)


def _mv_(X):
    formula = evaluate_vm(X.Formula)
    formula['Keys'] = X.Keys
    formula.reset_index(inplace = True)
    formula.rename_axis({'Formula': 'Values'}, axis = 1, inplace = True)
    return formula[['Keys', 'Datetime', 'names', 'Values']]

在那之后我的代码是

res = pd.concat([_mv_(mv.loc[i]) for i in mv.index])

而res是我需要得到的。你知道吗

注意:为了使其易于理解,我稍微修改了函数和输入:无论如何,我认为问题不在这里。你知道吗

事情是这样的。我想用Pypark来改造这个东西。你知道吗

这就是我写的代码。你知道吗

from pyspark.sql.functions import pandas_udf, PandasUDFType, struct
from pyspark.sql.types import FloatType, StringType, IntegerType, TimestampType, StructType, StructField

EvaluateVM = pandas_udf(lambda x: _mv_(x),\
                        functionType = PandasUDFType.GROUPED_MAP, \
                        returnType = StructType([StructField("Keys", StringType(), False),
                                                 StructField("Datetime", TimestampType(), False),\
                                                 StructField("names", StringType(), False),\
                                                 StructField("Values", FloatType(), False)])
                       )

res = EvaluateVM(struct([mv[i] for i in mv.columns]))

这是“几乎”工作:当我打印res这里的结果。你知道吗

> res
Column<<lambda>(named_struct(Keys, Keys, Formula, Formula))>

我看不到res的内部:我认为它创建了类似python iterable的东西,但我希望得到与pandas相同的结果。你知道吗

我该怎么办?我全搞错了吗?你知道吗

编辑:我认为问题可能在于,在pandas中,我创建了一个数据帧列表,在对它们求值后将其连接起来;在pyspark中,我使用了一种apply(_mv_, axis = 1):这种语法甚至在pandas中也给了我错误(cannot concatenate dataframe of dimension 192 into one of size 5,类似的东西),我的解决方法是pandas.concat([…])。我不知道这在pyspark中是否也有效,或者是否有某种方法可以避免这种情况。你知道吗

编辑2:抱歉,我没有编写预期的输出:

| Keys | Datetime | label |     values      |
---------------------------------------------
| key1 | tmstmp1  | name1 | 0.3 + 0.4 - 0.2 |
| key1 | tmstmp1  | name2 |    0.2 + 1.2    |

等等。values列应该包含数值结果,为了让您理解,我在这里编写了操作数。你知道吗


Tags: 数据函数inpandasfordatetimereskeys