<p>@werner的解决方案是完全有效的</p>
<p>在纯sparksql中,有一种方法可以在没有udf的情况下实现这一点</p>
<p>准备数据帧:</p>
<pre><code>from pyspark.sql import Row
spark.createDataFrame([
Row(user_id=100, seg1=90, seg2=20, seg3=76, seg4=100, seg5=30),
Row(user_id=200, seg1=56, seg2=15, seg3=67, seg4=99, seg5=25),
Row(user_id=300, seg1=87, seg2=38, seg3=45, seg4=97, seg5=40)]).createOrReplaceTempView("data")
spark.createDataFrame([
Row(seg_name = 'seg1', seg_threshold = 83),
Row(seg_name = 'seg2', seg_threshold = 25),
Row(seg_name = 'seg3', seg_threshold = 60),
Row(seg_name = 'seg4', seg_threshold = 98),
Row(seg_name = 'seg5', seg_threshold = 35)
]).createOrReplaceTempView("thr")
</code></pre>
<p>现在,您可以使用一个名为<a href="https://spark.apache.org/docs/latest/api/sql/index.html#stack" rel="nofollow noreferrer">stack</a>的边缘但非常有用的函数执行“unpivot”操作:</p>
<pre><code>spark.sql("""
WITH data_eva
AS (SELECT user_id,
Stack(5, 'seg1', seg1, 'seg2', seg2, 'seg3', seg3, 'seg4', seg4, 'seg5', seg5)
FROM data)
SELECT user_id,
Collect_list(col0)
FROM data_eva
JOIN thr
ON data_eva.col0 = thr.seg_name
WHERE col1 > seg_threshold
GROUP BY user_id
""").show()
</code></pre>
<p>这是输出:</p>
<pre><code>+ -+ +
|user_id|collect_list(col0)|
+ -+ +
| 100|[seg4, seg1, seg3]|
| 200| [seg4, seg3]|
| 300|[seg2, seg1, seg5]|
+ -+ +
</code></pre>
<p>你提到你有数百段。可以使用循环在堆栈函数内轻松生成表达式</p>
<p>这项技术在spark工具箱中非常有用</p>