多个pyspark“window()”调用在执行“groupBy()时显示错误”

2024-10-04 05:28:58 发布

您现在位置:Python中文网/ 问答频道 /正文

这个问题是this answer的后续问题。出现以下情况时,Spark显示错误:

# Group results in 12 second windows of "foo", then by integer buckets of 2 for "bar"
fooWindow = window(col("foo"), "12 seconds"))

# A sub bucket that contains values in [0,2), [2,4), [4,6]...
barWindow = window(col("bar").cast("timestamp"), "2 seconds").cast("struct<start:bigint,end:bigint>")

results = df.groupBy(fooWindow, barWindow).count()

错误是:

"Multiple time window expressions would result in a cartesian product of rows, therefore they are currently not supported."

有什么方法可以达到预期的行为吗?在


Tags: ofanswerinfoo错误barcolwindow
1条回答
网友
1楼 · 发布于 2024-10-04 05:28:58

我能够想出一个使用this SO answer改编的解决方案。在

注意:此解决方案仅在最多有一个调用window时有效,这意味着不允许有多个时间窗口。在spark github上快速搜索显示有一个<= 1窗口的硬限制。在

通过使用withColumn为每一行定义bucket,我们可以直接按新列分组:

from pyspark.sql import functions as F
from datetime import datetime as dt, timedelta as td

start = dt.now()
second = td(seconds=1)
data = [(start, 0), (start+second, 1), (start+ (12*second), 2)]
df = spark.createDataFrame(data, ('foo', 'bar'))

# Create a new column defining the window for each bar
df = df.withColumn("barWindow", F.col("bar") - (F.col("bar") % 2))

# Keep the time window as is
fooWindow = F.window(F.col("foo"), "12 seconds").start.alias("foo")

# Use the new column created
results = df.groupBy(fooWindow, F.col("barWindow")).count().show()

# +         -+    -+  -+
# |                foo|barWindow|count|
# +         -+    -+  -+
# |2019-01-24 14:12:48|        0|    2|
# |2019-01-24 14:13:00|        2|    1|
# +         -+    -+  -+

相关问题 更多 >