滚动窗口“expires”参数小于“size”不起作用

2024-10-02 22:26:04 发布

您现在位置:Python中文网/ 问答频道 /正文

复制步骤

我试图在翻滚窗口中聚合一些数据,然后将process函数应用于窗口中的数据。 我使用expires参数来处理延迟事件(让我们想象一下,在n+1分钟的前10秒内,我们可以得到一个属于n分钟的事件)

def parse_millis(ms):
    return datetime.fromtimestamp(int(ms) / 1000)


def process_window_function(window_info, values: list):
    logger.info(f"Processing window with "
                f"start = {datetime.fromtimestamp(window_info[1][0])}, "
                f"end = {datetime.fromtimestamp(window_info[1][1])}")
    logger.info(values)


class InputClass(faust.Record, coerce=True):
    id: str
    timestamp: datetime = DatetimeField(date_parser=parse_millis)
    value: int


tumbling_window_table = (
    app.Table(
        'tumbling_window_table',
        default=list,
        on_window_close=process_window_function,
    )
        .tumbling(size=60, expires=timedelta(seconds=10))
        .relative_to_field(InputClass.timestamp)
)

input_topic = app.topic("input.topic", value_type=InputClass)


@app.agent(input_topic)
async def process_input(stream):
    event: InputClass
    async for event in stream:
        logger.info(f"Event with timestamp {event.timestamp} is stored in window state")
        list_of_values = tumbling_window_table[event.id].value()
        list_of_values.append(event.value)
        tumbling_window_table[event.id] = list_of_values

预期行为

我希望只有当n+1窗口的10秒被传递到处理延迟事件时,才调用n窗口的进程窗口函数

实际行为

如果表的expires参数小于size参数,则在windown+1的第一个事件之后,将立即调用windown的process\u window\u函数。看起来浮士德只是忽略了过期。对于此类行为,将跳过可能稍晚到达的晚事件

如果expires参数等于或大于该大小,则将正确处理延迟事件,但我不希望延迟超过10秒

卡夫卡输入法

{"id":"sensor-1","timestamp":1614808641000,"value":1}
{"id":"sensor-1","timestamp":1614808677000,"value":2}
{"id":"sensor-1","timestamp":1614808681000,"value":3}

日志

[2021-03-03 21:58:07,510] [1] [INFO] [^Worker]: Ready 
[2021-03-03 21:58:41,955] [1] [INFO] Event with timestamp 2021-03-03 21:57:21 is stored in window state 
[2021-03-03 21:59:00,574] [1] [INFO] Event with timestamp 2021-03-03 21:57:57 is stored in window state 
[2021-03-03 21:59:16,963] [1] [INFO] Event with timestamp 2021-03-03 21:58:01 is stored in window state 
[2021-03-03 21:59:16,987] [1] [INFO] Processing window with start = 2021-03-03 21:57:00, end = 2021-03-03 21:57:59.900000 
[2021-03-03 21:59:16,988] [1] [INFO] [1, 2] 

版本

  • Python版本3.7.9
  • 浮士德版本faust-streaming==0.6.1
  • RocksDB版本python-rocksdb

我有可能在弗林克实施这种行为,但在浮士德遇到了这个问题。 我做错了什么


Tags: ininfoeventidvaluewith事件window
1条回答
网友
1楼 · 发布于 2024-10-02 22:26:04

这可能是我遇到的同一个问题,如果是这样,这可能就是解决方案。 我必须手动设置clean_up_interval,因为它默认为30秒。 此属性是检查过期表数据的时间

在以典型的app = faust.App()方式定义应用程序之后,您可以通过设置app.conf.table_clean_up_interval = <time as int or float>来实现

您可以在settings.py文件和一个工作示例(最近可能更改了?here)中找到此方法

唯一的问题似乎是,如果应用程序崩溃(或重新平衡),on_window_close(打开窗口关闭)似乎不会正确触发——就像没有工作人员观看而过期的窗口永远消失了,你永远也不知道它们。但我还没有使用RocksDB,只是在记忆中,所以也许还有更多的东西可以帮助你

我仍然在尝试了解后期事件,因为我使用相同的过程来执行很长的聚合(比如每隔1秒3个月),但我无法确定非常旧的数据是放在与其时间戳匹配的窗口中,还是放在当前窗口中。我认为它会根据与窗口的时间戳关系将其放在正确的窗口中,但无法确认

相关问题 更多 >