如何通过对已经存在的列应用函数将列添加到pyspark数据帧?

2024-06-26 00:05:03 发布

您现在位置:Python中文网/ 问答频道 /正文

我想对DataFrame的列中的数据应用binning函数,并将结果存储在添加到DataFrame的新列中。你知道吗

理想情况下,我希望确保可以使用任何带有递归的自定义python函数,因为列中的行可以是数组,并且我希望将每个数组中的每个元素都装箱。我还想做其他的操作,除了只是装箱的数据最终。你知道吗

我知道我可以使用withColumn(...)添加一个新列,但是我不知道如何正确地放入为该新列生成数据的函数中。你知道吗

编辑: 这个similar question解决了创建用户定义函数的部分问题。 但是,它似乎不接受列表作为参数:

def put_number_in_bin(number, bins):
    if is_number(number):
        number = float(number)
        for i, b in enumerate(bins):
            if number <= b:
                bin_selected = str(i)
                break
        return bin_selected
    else:
        return str("NULL")

binning_udf = udf(lambda (x, bins): put_number_in_bin(x, bins), StringType())

bins = [0.0, 182.0, 309.4000000000001, 540.0, 846.0, 2714.0, 5872.561999999998, 10655.993999999999, 20183.062, 46350.379999999976, 4852207.7]

df_augment = df_all.withColumn("newCol1", binning_udf(df_all.total_cost, bins))

结果是出现以下错误:

TypeError: Invalid argument, not a string or column: [0.0, 182.0, 309.4000000000001, 540.0, 846.0, 2714.0, 5872.561999999998, 10655.993999999999, 20183.062, 46350.379999999976, 4852207.7] of type <type 'list'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

Tags: 数据函数innumberdataframedfifbin