我有一个可变的列数,比如说在这个例子中,我们有4列要与一个具有不同值的列(textX
)进行比较(id
):
d = [
{'id': 500, 'text1': 1000 ,'text2': 2000 ,'text3': 3000, 'text4': 5000},
{'id': 1500, 'text1': 1000 ,'text2': 2000 ,'text3': 3000, 'text4': 5000},
{'id': 2500, 'text1': 1000 ,'text2': 2000 ,'text3': 3000, 'text4': 5000},
{'id': 3500, 'text1': 1000 ,'text2': 2000 ,'text3': 3000, 'text4': 5000},
{'id': 4500, 'text1': 1000 ,'text2': 2000 ,'text3': 3000, 'text4': 5000},
{'id': 5500, 'text1': 1000 ,'text2': 2000 ,'text3': 3000, 'text4': 5000}
]
data = spark.createDataFrame(d)
我希望根据'id'的值对textX
列中的最小值和较大值进行操作。
例如,对于id
value=2500,我希望对值2000和3000进行操作。如果'id'的值为500,则它将是null和1000。
我尝试将这些列作为附加列,例如获取较低的列值
df_cols = data.columns
thresh_list = [x for x in df_cols if x.startswith('text')]
data.withColumn('inic_th', (col(x) for x in thresh_list if col('id') > col(x)))
但是得到一个错误:
col should be Column
我猜这是因为有多个列与条件匹配,但无法在此处插入
有没有人有办法根据第三列将一个操作转换成2个值,或者如何正确地获得这些边界?实际上,textX
列的数量会有所不同。由于性能问题,我将尽可能远离熊猫和UDF
下面是使用^{} 和^{} 函数以及^{} 表达式的另一种方法:
lowerBound
=maxthresh_cols
该状态为thresh_col < id
upperBound
=minthresh_cols
该状态满足条件{下面是一种使用spark高阶函数的方法>=2.4:
您可以使用
least
和greatest
获取相关列:然后你可以对
col1
和col2
进行操作相关问题 更多 >
编程相关推荐