我试图在翻滚窗口中聚合一些数据,然后将process函数应用于窗口中的数据。 我使用expires参数来处理延迟事件(让我们想象一下,在n+1分钟的前10秒内,我们可以得到一个属于n分钟的事件)
def parse_millis(ms):
return datetime.fromtimestamp(int(ms) / 1000)
def process_window_function(window_info, values: list):
logger.info(f"Processing window with "
f"start = {datetime.fromtimestamp(window_info[1][0])}, "
f"end = {datetime.fromtimestamp(window_info[1][1])}")
logger.info(values)
class InputClass(faust.Record, coerce=True):
id: str
timestamp: datetime = DatetimeField(date_parser=parse_millis)
value: int
tumbling_window_table = (
app.Table(
'tumbling_window_table',
default=list,
on_window_close=process_window_function,
)
.tumbling(size=60, expires=timedelta(seconds=10))
.relative_to_field(InputClass.timestamp)
)
input_topic = app.topic("input.topic", value_type=InputClass)
@app.agent(input_topic)
async def process_input(stream):
event: InputClass
async for event in stream:
logger.info(f"Event with timestamp {event.timestamp} is stored in window state")
list_of_values = tumbling_window_table[event.id].value()
list_of_values.append(event.value)
tumbling_window_table[event.id] = list_of_values
我希望只有当n+1窗口的10秒被传递到处理延迟事件时,才调用n窗口的进程窗口函数
如果表的expires参数小于size参数,则在windown+1的第一个事件之后,将立即调用windown的process\u window\u函数。看起来浮士德只是忽略了过期。对于此类行为,将跳过可能稍晚到达的晚事件
如果expires参数等于或大于该大小,则将正确处理延迟事件,但我不希望延迟超过10秒
{"id":"sensor-1","timestamp":1614808641000,"value":1}
{"id":"sensor-1","timestamp":1614808677000,"value":2}
{"id":"sensor-1","timestamp":1614808681000,"value":3}
[2021-03-03 21:58:07,510] [1] [INFO] [^Worker]: Ready
[2021-03-03 21:58:41,955] [1] [INFO] Event with timestamp 2021-03-03 21:57:21 is stored in window state
[2021-03-03 21:59:00,574] [1] [INFO] Event with timestamp 2021-03-03 21:57:57 is stored in window state
[2021-03-03 21:59:16,963] [1] [INFO] Event with timestamp 2021-03-03 21:58:01 is stored in window state
[2021-03-03 21:59:16,987] [1] [INFO] Processing window with start = 2021-03-03 21:57:00, end = 2021-03-03 21:57:59.900000
[2021-03-03 21:59:16,988] [1] [INFO] [1, 2]
3.7.9
faust-streaming==0.6.1
python-rocksdb
我有可能在弗林克实施这种行为,但在浮士德遇到了这个问题。 我做错了什么
这可能是我遇到的同一个问题,如果是这样,这可能就是解决方案。 我必须手动设置
clean_up_interval
,因为它默认为30秒。 此属性是检查过期表数据的时间在以典型的
app = faust.App()
方式定义应用程序之后,您可以通过设置app.conf.table_clean_up_interval = <time as int or float>
来实现您可以在settings.py文件和一个工作示例(最近可能更改了?here)中找到此方法
唯一的问题似乎是,如果应用程序崩溃(或重新平衡),on_window_close(打开窗口关闭)似乎不会正确触发——就像没有工作人员观看而过期的窗口永远消失了,你永远也不知道它们。但我还没有使用RocksDB,只是在记忆中,所以也许还有更多的东西可以帮助你
我仍然在尝试了解后期事件,因为我使用相同的过程来执行很长的聚合(比如每隔1秒3个月),但我无法确定非常旧的数据是放在与其时间戳匹配的窗口中,还是放在当前窗口中。我认为它会根据与窗口的时间戳关系将其放在正确的窗口中,但无法确认
相关问题 更多 >
编程相关推荐