pyspark:在pyspark中创建新列时出错

2024-10-03 15:27:00 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个Pypark数据框

a = [
    (0.31, .3, .4, .6, 0.4),
    (.01, .2, .92, .4, .47),
    (.3, .1, .05, .2, .82),
    (.4, .4, .3, .6, .15),
]

b = ["column1", "column2", "column3", "column4", "column5"]

df = spark.createDataFrame(a, b)

现在我想根据以下条件创建一个新列

df.withColumn('new_column' ,(norm.ppf(F.col('column1')) - norm.ppf(F.col('column1') * F.col('column1'))) / (1 - F.col('column2')) ** 0.5)

但这是一个错误。 请帮忙

更新:我已替换并更正了列名

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-38-8dfe7d50be84> in <module>
----> 1 df.withColumn('new_column' ,(norm.ppf(F.col('PD')) - norm.ppf(F.col('PD') * F.col('PD'))) / (1 - F.col('rho_start')) ** 0.5)

~/anaconda3/envs/python3/lib/python3.6/site-packages/scipy/stats/_distn_infrastructure.py in ppf(self, q, *args, **kwds)
   1995         args = tuple(map(asarray, args))
   1996         cond0 = self._argcheck(*args) & (scale > 0) & (loc == loc)
-> 1997         cond1 = (0 < q) & (q < 1)
   1998         cond2 = cond0 & (q == 0)
   1999         cond3 = cond0 & (q == 1)

~/anaconda3/envs/python3/lib/python3.6/site-packages/pyspark/sql/column.py in __nonzero__(self)
    633 
    634     def __nonzero__(self):
--> 635         raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
    636                          "'~' for 'not' when building DataFrame boolean expressions.")
    637     __bool__ = __nonzero__

ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

Tags: inselfnormdfforargscolumncol
1条回答
网友
1楼 · 发布于 2024-10-03 15:27:00

现在还不清楚您的PDrho_start列可能是什么。但是我可以给你一个如何用pyspark调用scipy函数的例子

设置数据帧

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
a = [
    (0.31, .3, .4, .6, 0.4),
    (.01, .2, .92, .4, .47),
    (.3, .1, .05, .2, .82),
    (.4, .4, .3, .6, .15),
]

b = ["column1", "column2", "column3", "column4", "column5"]

df = spark.createDataFrame(a, b)
df.show()

输出:

+   -+   -+   -+   -+   -+
|column1|column2|column3|column4|column5|
+   -+   -+   -+   -+   -+
|   0.31|    0.3|    0.4|    0.6|    0.4|
|   0.01|    0.2|   0.92|    0.4|   0.47|
|    0.3|    0.1|   0.05|    0.2|   0.82|
|    0.4|    0.4|    0.3|    0.6|   0.15|
+   -+   -+   -+   -+   -+

您可以使用pandas_udf对计算进行矢量化

import pandas as pd
from scipy.stats import *    
from pyspark.sql.functions import pandas_udf

@pandas_udf('double')
def vectorized_ppf(x):
    return pd.Series(norm.ppf(x))

df.withColumn('ppf', vectorized_ppf('column1')).show()

输出:

+   -+   -+   -+   -+   -+         -+
|column1|column2|column3|column4|column5|                ppf|
+   -+   -+   -+   -+   -+         -+
|   0.31|    0.3|    0.4|    0.6|    0.4|-0.4958503473474533|
|   0.01|    0.2|   0.92|    0.4|   0.47|-2.3263478740408408|
|    0.3|    0.1|   0.05|    0.2|   0.82|-0.5244005127080409|
|    0.4|    0.4|    0.3|    0.6|   0.15|-0.2533471031357997|
+   -+   -+   -+   -+   -+         -+

pandas_udf不可用时,使用udf

有时很难让pandas_udf正常工作。您可以使用udf作为替代。
将scipy函数定义为udf

from scipy.stats import *
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType

@F.udf(DoubleType())
def ppf(x):
    return float(norm.ppf(x))

调用udf ppf创建值为column1new_column

df1 = df.withColumn('new_column' , ppf('column1'))
df1.show()

输出:

+   -+   -+   -+   -+   -+         -+
|column1|column2|column3|column4|column5|         new_column|
+   -+   -+   -+   -+   -+         -+
|   0.31|    0.3|    0.4|    0.6|    0.4|-0.4958503473474533|
|   0.01|    0.2|   0.92|    0.4|   0.47|-2.3263478740408408|
|    0.3|    0.1|   0.05|    0.2|   0.82|-0.5244005127080409|
|    0.4|    0.4|    0.3|    0.6|   0.15|-0.2533471031357997|
+   -+   -+   -+   -+   -+         -+

微观基准

我使用不同的输入大小运行pandas_udf(矢量化)和udf

  • 测试在一个带有Spark 3.0的双核databricks群集上运行
  • 函数返回df.select(ppf('column1')).collect()

udf vs pandas_udf

相关问题 更多 >