擅长:python、mysql、java
<p>要添加到venuktan的答案中,这里是如何使用Spark SQL创建一个基于时间的滑动窗口,并保留窗口的全部内容,而不是对其进行聚合。在我的用例中将时间序列数据预处理到滑动窗口以输入到Spark ML中时,这是必需的</p>
<p>这种方法的一个限制是,我们假设您希望随着时间推移使用滑动窗口。</p>
<p>首先,您可以创建Spark数据帧,例如通过读取CSV文件:</p>
<pre><code>df = spark.read.csv('foo.csv')
</code></pre>
<p>我们假设CSV文件有两列:一列是unix时间戳,另一列是要从中提取滑动窗口的列。</p>
<pre><code>from pyspark.sql import functions as f
window_duration = '1000 millisecond'
slide_duration = '500 millisecond'
df.withColumn("_c0", f.from_unixtime(f.col("_c0"))) \
.groupBy(f.window("_c0", window_duration, slide_duration)) \
.agg(f.collect_list(f.array('_c1'))) \
.withColumnRenamed('collect_list(array(_c1))', 'sliding_window')
</code></pre>
<p>附加:要将此数组列转换为Spark ML所需的DenseVector格式,<a href="https://stackoverflow.com/a/39026265/6028910">see the UDF approach here</a>。</p>
<p>额外的好处:取消嵌套结果列,这样滑动窗口的每个元素都有自己的列<a href="https://stackoverflow.com/a/47105784/6028910">try this approach here</a>。</p>
<p>我希望这有帮助,如果我能澄清什么,请告诉我。</p>