apachebeam pythonssdk是否可以进行状态处理?

2024-10-03 15:33:58 发布

您现在位置:Python中文网/ 问答频道 /正文

我一直在关注Timely (and Stateful) Processing with Apache Beam这篇文章,虽然内容全面而且写得很好,但它没有指定如何使用python实现相同的功能。更具体地说 它声明:

State and timers are not yet supported in Beam's Python SDK.

但它没有说明原因。。。这是不是有一个先天的原因导致了这一切不可能呢?在

我希望实现一个回放缓冲/窗口系统的信号处理系统,我的目标是实现。其中,长度为W的特征的滑动窗口/历史帧缓冲区随着最新窗口不断更新。在

在Java中,它的实现如下所示:

静态类FeatureFrameBuffer扩展DoFn、FeatureFrame>{ 整数缓冲区大小

    public FeatureFrameBuffer(Integer bufferSize) {
        this.bufferSize = bufferSize;
    }

    @StateId("buffer")
    private final StateSpec<BagState<KV<String, Double>>> bufferedFeatures = StateSpecs.bag();

    @StateId("count")
    private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

    @ProcessElement
    public void process(
                        ProcessContext context,
                        @StateId("buffer") BagState<KV<String, Double>> bufferState,
                        @StateId("count") ValueState<Integer> countState
                        ) {

        int count = firstNonNull(countState.read(), 0);
        count = count + 1;
        countState.write(count);
        bufferState.add(context.element());

        // Only output buffer if count is greater than bufferSize
        // Remove last element from buffer if count
        // greater than or equals buferSize
        if (count >= bufferSize) {
            bufferState.read();
            createFeatureFrame();
            context.output(featureFrame);
            bufferState.clear();
            countState.clear();
        }
    }
}

在开始开发定制实现之前,我想知道pythonsdk是否也能实现同样的效果。在这件事上给点建议就好了。在


Tags: andif系统buffercountcontext原因integer
2条回答

到目前为止,pythonsdk对有状态处理的支持仍然是一个开放的问题。请参见https://issues.apache.org/jira/browse/BEAM-2687,它被this问题阻塞:“实现Beam Python用户状态和定时器API”,尽管它正在进行中。在

2.0用户状态和9.0可用。但是文档还没有更新。在

相关问题 更多 >