Pysp中如何利用滑动窗口对时间序列数据进行转换

2024-10-05 10:39:11 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试基于时间序列数据的滑动窗口提取特征。 在Scala中,似乎有一个基于this postthe documentationsliding函数

import org.apache.spark.mllib.rdd.RDDFunctions._

sc.parallelize(1 to 100, 10)
  .sliding(3)
  .map(curSlice => (curSlice.sum / curSlice.size))
  .collect()

我的问题是PySpark中有类似的函数吗?或者,如果还没有这样的函数,我们如何实现类似的滑动窗口转换?


Tags: the数据函数orgimportapachedocumentation时间
3条回答

要添加到venuktan的答案中,这里是如何使用Spark SQL创建一个基于时间的滑动窗口,并保留窗口的全部内容,而不是对其进行聚合。在我的用例中将时间序列数据预处理到滑动窗口以输入到Spark ML中时,这是必需的

这种方法的一个限制是,我们假设您希望随着时间推移使用滑动窗口。

首先,您可以创建Spark数据帧,例如通过读取CSV文件:

df = spark.read.csv('foo.csv')

我们假设CSV文件有两列:一列是unix时间戳,另一列是要从中提取滑动窗口的列。

from pyspark.sql import functions as f

window_duration = '1000 millisecond'
slide_duration = '500 millisecond'

df.withColumn("_c0", f.from_unixtime(f.col("_c0"))) \
    .groupBy(f.window("_c0", window_duration, slide_duration)) \
    .agg(f.collect_list(f.array('_c1'))) \
    .withColumnRenamed('collect_list(array(_c1))', 'sliding_window')

附加:要将此数组列转换为Spark ML所需的DenseVector格式,see the UDF approach here

额外的好处:取消嵌套结果列,这样滑动窗口的每个元素都有自己的列try this approach here

我希望这有帮助,如果我能澄清什么,请告诉我。

spark 1.4具有窗口功能,如下所述: https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

希望有帮助,请告诉我。

据我所知,sliding函数在Python中不可用,SlidingRDD是一个私有类,不能在MLlib外部访问。

如果要在现有RDD上使用sliding,可以创建穷人的sliding,如下所示:

def sliding(rdd, n):
    assert n > 0
    def gen_window(xi, n):
        x, i = xi
        return [(i - offset, (i, x)) for offset in xrange(n)]

    return (
        rdd.
        zipWithIndex(). # Add index
        flatMap(lambda xi: gen_window(xi, n)). # Generate pairs with offset
        groupByKey(). # Group to create windows
        # Sort values to ensure order inside window and drop indices
        mapValues(lambda vals: [x for (i, x) in sorted(vals)]).
        sortByKey(). # Sort to makes sure we keep original order
        values(). # Get values
        filter(lambda x: len(x) == n)) # Drop beginning and end

或者您可以尝试这样的方法(在^{}的帮助下)

from toolz.itertoolz import sliding_window, concat

def sliding2(rdd, n):
    assert n > 1

    def get_last_el(i, iter):
        """Return last n - 1 elements from the partition"""
        return  [(i, [x for x in iter][(-n + 1):])]

    def slide(i, iter):
        """Prepend previous items and return sliding window"""
        return sliding_window(n, concat([last_items.value[i - 1], iter]))

    def clean_last_items(last_items):
        """Adjust for empty or to small partitions"""
        clean = {-1: [None] * (n - 1)}
        for i in range(rdd.getNumPartitions()):
            clean[i] = (clean[i - 1] + list(last_items[i]))[(-n + 1):]
        return {k: tuple(v) for k, v in clean.items()}

    last_items = sc.broadcast(clean_last_items(
        rdd.mapPartitionsWithIndex(get_last_el).collectAsMap()))

    return rdd.mapPartitionsWithIndex(slide)

相关问题 更多 >

    热门问题