在DataF上运行beam管道时,文件在关闭后被覆盖

2024-10-02 02:34:50 发布

您现在位置:Python中文网/ 问答频道 /正文

我已经创建了一个beam管道p在dataflow上运行,并希望在运行管道之前向文件中写入一些内容。我的代码是:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import time

pipeline_options = PipelineOptions(runner='DirectRunner')
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)

myString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum."

myFile3984573498534 = open('myfile2398457erity348t67349856734986739846.txt','w+')
myFile3984573498534.write(myString*100)
myFile3984573498534.close()

time.sleep(1)

r = p.run()

文件被正确写入,但一旦调用p.run()它就会被覆盖为空。有人能解释为什么会这样吗

注意事项:

  • 更改文件名和文件变量名不会影响结果
  • 我插入了time.sleep(1),以便在调用p.run()之前可以查看要写入的文件,并将文件覆盖为空。这是不必要的,可以更改/删除

Tags: 文件runinfromimport管道pipelinetime
1条回答
网友
1楼 · 发布于 2024-10-02 02:34:50

这个问题是由pipeline_options.view_as(SetupOptions).save_main_session = True行引起的

当管道运行时,beam将使用dill.dump_session序列化主会话并将其保存到文件中。然后它将使用dill.load_session加载同一个文件并反序列化它以重新创建主会话。它将使用dill.dump_session再次重新序列化主会话以发送给运行程序。序列化、反序列化然后重新序列化主会话的原因是修复序列化中的不一致,如https://github.com/uqfoundation/dill/issues/195中所述。这意味着所有跑步者都会有这个问题

本例中的主会话包含myFile3984573498534文件对象。反序列化后,它将使用w+模式以与最初打开文件相同的方式重新打开文件。这将立即覆盖文件。然后关闭此文件,管道以文件空白结束

最好的解决方法是以r+模式打开文件,以便在主会话反序列化期间以读取模式打开文件,从而不会对其进行修改

如果需要w+模式打开文件,则应在关闭文件后删除存储该文件的变量,即myFile3984573498534.close() 之后但在运行管道之前删除del(myFile3984573498534)。这将防止变量被序列化,因为它不再存在,从而导致文件不会被修改

相关问题 更多 >

    热门问题