如何更新最初通过register worker回调加载的dask worker容器上的设置?

2024-05-20 08:45:53 发布

您现在位置:Python中文网/ 问答频道 /正文

因此,我有两个容器与dask工人设置。我把它们称为

  • main其中有4个工人和完整的应用程序
  • remote它还有4个工作程序,是完整的应用程序代码,但没有运行完整的应用程序,只有工作程序代码

我正在通过使用register_worker_callbacks()的设置函数回调,从main容器向每个worker传递一个带有设置的字典

我是这样做的:

await client.register_worker_callbacks(lambda: setup_worker(log_config, settings))

以及设置功能

def setup_worker(log_config, settings_object):
    setup_logging(log_config)
    settings.__dict__.update(settings_object.__dict__)

这两个容器都有这段代码,因此工作正常,没有问题

但是我还有一个每天运行的cronjob,它从外部源检索一个新的设置文件(JSON格式),并从中重新加载/更新main容器上的settings对象

这个新更新的settings对象也需要传播和更新worker上的设置,基本上我需要一种方法来再次调用setup_worker函数的最后一行,做完全相同的事情。但是,由于此时工人已经注册并连接,我不能重复使用相同的回调,可以吗?我将如何实现同样的目标


Tags: 函数registerlogconfig应用程序settingsobjectmain
1条回答
网友
1楼 · 发布于 2024-05-20 08:45:53

它只需使用

await client.run(func, args)

如文件所述here

默认情况下,它接受一个可调用/函数并在所有工作进程上执行它,而不使用调度程序。因此,要更新通常由回调加载的相同设置,您可以在此处使用相同的参数调用相同的函数

因此,我最终得到了两个函数,如下所示:

def setup_worker(log_config, settings_object):
    setup_logging(log_config)
    update_settings(settings_object)

def update_settings(settings_object):
    settings.__dict__.update(settings_object.__dict__)

设置回调的设置方式与以前相同

await client.register_worker_callbacks(lambda: setup_worker(log_config, settings))

但是,当在main容器上更新设置时,我会调用一个附加函数,如下所示:

def update_settings():
    ...
    await client.run(update_settings, settings)

相关问题 更多 >