在python beam函数中使用不可序列化的对象

2024-05-18 11:40:57 发布

您现在位置:Python中文网/ 问答频道 /正文

根据beam的文件:

Transient fields in your function object are not transmitted to worker instances, because they are not automatically serialized.

我的代码:

class myBeamFunction(beam.DoFn):

    def __setstate__(self, state):
        self.__dict__ = state
        self.my_nonserialisable_object = new Nonserialisable_object()

    def process(self, element):
        return self.my_nonserialisable_object.do(element)

我的应用程序失败,因为:

RuntimeError: maximum recursion depth exceeded

从stacktrace上看,我发现是因为:

Python/2.7/lib/python/site-packages/apache_beam/internal/pickler.py

我想知道有没有一种方法可以在worker实例中初始化函数所需的所有不可序列化变量?你知道吗

谢谢你。你知道吗


Tags: 文件inselffieldsobjectmydefnot
1条回答
网友
1楼 · 发布于 2024-05-18 11:40:57

这可以通过开始绑定方法。@Javadoc。你知道吗

Initialize the state in each DoFn instance, in a DoFn.StartBundle method. This is good if the initialization doesn't depend on any information known only by the main program or computed by earlier pipeline operations, but is the same for all instances of this DoFn for all program executions, say setting up empty caches or initializing constant data.

在python中恰好是相同的。所以稍加修改:

class myBeamFunction(beam.DoFn):

    def __init__(self):
        self.my_nonserialisable_object = None

    def start_bundle(self, context=None):
         self.my_nonserialisable_object = new Nonserialisable_object()

    def process(self, element):
        return self.my_nonserialisable_object.do(element)

相关问题 更多 >