如何组合每个和如果_

2024-09-30 18:16:13 发布

您现在位置:Python中文网/ 问答频道 /正文

我开始学习liteflow,想知道是否有可能以某种方式将for_eachif_结合起来。我尝试了以下方法:

from liteflow.core import *


class Hello(StepBody):
    def run(self, context: StepExecutionContext) -> ExecutionResult:
        print("Hello")
        return ExecutionResult.next()


class DoStuff(StepBody):

    def run(self, context: StepExecutionContext) -> ExecutionResult:
        print(f"doing stuff...{context.execution_pointer.context_item}")
        return ExecutionResult.next()


class Goodbye(StepBody):
    def run(self, context: StepExecutionContext) -> ExecutionResult:
        print("Goodbye")
        return ExecutionResult.next()


class MyWorkflow(Workflow):

    def id(self):
        return "MyWorkflow"

    def version(self):
        return 1

    def build(self, builder: WorkflowBuilder):
        builder\
            .start_with(Hello)\
            .for_each(lambda data, context: ["abc", "def", "xyz"])\
                .if_(lambda d, c: not d.startswith("x"))\
                    .do(lambda x: x.start_with(DoStuff))\
            .then(Goodbye)


host = configure_workflow_host()
host.register_workflow(MyWorkflow())
host.start()

wid = host.start_workflow("MyWorkflow", 1, None)

input()
host.stop()

这是不起作用的('NoneType' object has no attribute 'startswith'),如果我用0 < 1之类的东西替换条件,DoStuff将执行一次而不是三次而没有if_,甚至打印“错误”的输出。你知道吗

我知道在这个简单的例子([x for x in ["abc", "def", "xyz"] if not x.startswith("x")])中,我可以通过在for_each中使用LC来消除if_,但是可以想象一个更复杂的工作流。你知道吗

那么,如何将for_eachif_结合起来,以便从for_each中的for_each访问循环变量,并将正确的上下文传递给do?或者这是不可能的,我必须检查步骤的run方法中的条件吗?你知道吗


Tags: runselfhosthelloforreturnifdef