Python中有效的时间限制队列?

2024-09-27 17:52:58 发布

您现在位置:Python中文网/ 问答频道 /正文

在实时流数据上复制pandas滚动窗口功能的有效方法是什么?在

假设我们想保持最后一个n观察值的总和,collections.deque加上maxlen参数是可行的,但是如果不是固定的n我们需要最后m秒的值的和呢?在

使用pandas.Series将是低效的,因为底层的numpy数组是不可变的,因此不适合处理实时数据流-每次追加都复制整个数组。在

cachetools.TTLCache这样的东西很适合存储实时数据,但是对于计算来说效率很低-要获得总和,需要每次迭代每个元素。在

目前,我维护实时数据流总和的方法使用collections.deque生存时间参数和一个用于丢弃旧值的while循环:

import time
from collections import deque

class Accumulator:
    def __init__(self, ttl):
        self.ttl = ttl
        self.sum = 0
        self.q = deque()

    def append(self, value):
        self.q.append((time.time(), value))
        self.sum += value
        self.discard_old_values()

    def discard_old_values(self):
        cutoff_time = time.time() - self.ttl
        try:
            while self.q[0][0] < cutoff_time:
                self.sum -= self.q.popleft()[1]
        except IndexError:
            pass

    def get_sum(self):
        self.discard_old_values()
        return self.sum

所以问题是:

  1. 我们能做得更好吗?在
  2. 有没有办法摆脱这里难看的while循环?在
  3. 这种“TTL queue”数据结构的通用名称是什么?在
  4. 有没有一个流行的Python库已经实现了它?在
  5. 有没有办法在可变集合上利用pandas滚动窗口?在

Tags: 数据selfpandastimevaluedefcollectionsold
1条回答
网友
1楼 · 发布于 2024-09-27 17:52:58

以下是我对discard_old_values的改进建议:

def discard_old_values(self):
    die = time.time() - self.ttl
    while(len(self.q) > 0 and self.q[0][0] < die):
        self.sum -= self.q.popleft()[1]

我决定不为可能被丢弃的每个元素调用time.time(),而是调用它一次并执行算术,只找到一次“死亡时间”。在

这节省了抛出异常的成本,以及多次调用time的成本。在

相关问题 更多 >

    热门问题