<p>首先,必须将<strong><code>start_dt</code></strong>转换为<strong><code>timestamp</code></strong>格式,然后在收集列表后,我们可以使用<strong><code>transform(with index as i)</code></strong>函数和<strong><code>unix_timestamp</code></strong>获得所需的输出。(转换从<strong><code>spark2.4</code></strong>开始提供)</p>
<pre><code>from pyspark.sql import functions as F
df.show() #sample dataframe
#+ -+ -+
#|customer_id|start_dt |
#+ -+ -+
#|1 |2020-04-02T08:15:50+01:00|
#|1 |2020-04-02T08:15:53+01:00|
#|1 |2020-04-02T08:15:56+01:00|
#|1 |2020-04-02T08:16:01+01:00|
#|1 |2020-04-02T08:16:07+01:00|
#|1 |2020-04-02T08:21:05+01:00|
#|1 |2020-04-02T08:21:17+01:00|
#|1 |2020-04-02T08:21:30+01:00|
#|1 |2020-04-02T08:21:43+01:00|
#|1 |2020-04-02T08:21:49+01:00|
#+ -+ -+
only showing top 10 rows
df.withColumn("start_dt", F.to_timestamp('start_dt',"yyyy-MM-dd'T'HH:mm:ss'+'SS:SS"))\
.groupBy("customer_id").agg(F.collect_list('start_dt').alias('start_times'))\
.withColumn("start_times", F.expr("""transform(start_times,(x,i)-> IF(i>0 and (unix_timestamp(x)-\
unix_timestamp(start_times[i-1])>=600),1,0))"""))\
.show(truncate=False)
#+ -+ +
#|customer_id|start_times |
#+ -+ +
#|1 |[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]|
#+ -+ +
</code></pre>