如何运行Luigi管道的并行实例:Pid集已在运行

2024-10-03 04:38:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个简单的管道。在

我想用Id 2381启动它一次,然后当第一个作业运行时,我想用Id 231开始第二次运行。第一次运行按预期完成。在

第二次运行返回此响应

Pid(s) set([10362]) already running
Process finished with exit code 0

我就这样开始跑步

运行一:

^{pr2}$

运行二:

luigi.run(
    cmdline_args=["--id='newId1322'", "--TaskTwo-id=231"],
    main_task_cls=TaskTwo()
)

每个任务都有一个惟一的ID,由luigi的task_ID_str(…)方法生成。为什么luigi认为任务已经在运行,而路易吉.帕拉马特,TaskTwo id和MockTarget文件都不同吗?在

管道代码:

import time
import uuid
from luigi.mock import MockTarget
import luigi


class TaskOne(luigi.Task):
    run_id = luigi.Parameter()

    def output(self):
        return MockTarget("TaskOne{0}".format(self.run_id), mirror_on_stderr=True)

    def run(self):
        _out = self.output().open('w')
        time.sleep(10)
        _out.write(u"Hello World!\n")
        _out.close()


class TaskTwo(luigi.Task):
    id = luigi.Parameter(default=uuid.uuid4().__str__())

    def output(self):
        return MockTarget("TaskTwo{0}".format(self.id), mirror_on_stderr=True)

    def requires(self):
        return TaskOne(self.id)

    def run(self):
        _out = self.output().open('w')
        time.sleep(10)
        _out.write(u"Hello World!\n")
        _out.close()

Tags: runimportselfidoutputreturn管道time
1条回答
网友
1楼 · 发布于 2024-10-03 04:38:47

看起来这可能是因为您没有连接到调度程序服务器,所以它尝试启动调度程序进程两次。你在追路易吉德吗?在

我可以让您的代码在命令行中运行,如下所示。首先,我创建了一个dir并将您的代码放入一个名为路易吉斯特.py(减去路易吉。快跑()命令)。我把目录改成我创建的目录。然后我跑了:

luigid  background  pidfile ./luigid.pid  logdir .  state-path .

然后我在同一个目录中打开了第二个终端。在第一次跑步中:

^{pr2}$

在第二次跑步中(大约一秒钟后):

PYTHONPATH=. luigi  module luigitest TaskOne  run-id newId13823  TaskTwo-id 2382  local-scheduler

这两个都输出“Hello World!”在

相关问题 更多 >