如何编写一个方法来检查代理是否完成了Osbrain中的任务?

2024-09-30 06:11:25 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个问题,关于如何编写一个适当的函数来检查代理是否完成了Osbrain中的任务。我有三个代理,传输代理、节点代理和协调代理。协调代理的主要任务是同步其他代理的操作。协调器代理绑定到SYNC_PUB,节点和传输代理子绑定到协调器代理。我的初始实现在第一次timestep/迭代后挂起。我是否错误地实施了状态检查方法

from osbrain import run_nameserver, run_agent, Agent

import time

SYNCHRONIZER_CHANNEL_1 = 'coordinator1'


class TransportAgent(Agent):
    def transportAgent_first_handler(self, message):
        # time.sleep(2)
        self.log_info(message)
        self.send(SYNCHRONIZER_CHANNEL_1, 'is_done', handler='process_reply')

    def process_reply(self, message):
        yield 1


class NodeAgent(Agent):
    def NodeAgent_first_handler(self, message):
        time.sleep(2)
        self.log_info(message)
        self.send(SYNCHRONIZER_CHANNEL_1, 'is_done', handler='process_reply')

    def process_reply(self, message):
        yield 1


class SynchronizerCoordinatorAgent(Agent):
    def on_init(self):
        self.network_agent_addr = self.bind('SYNC_PUB', alias=SYNCHRONIZER_CHANNEL_1, handler='status_handler')
        self.status_list = []

    def first_synchronization(self, time_step, iteration):
        self.send(SYNCHRONIZER_CHANNEL_1, message={'time_step': time_step, 'iteration': iteration},
                  topic='first_synchronization')

    def status_handler(self, message):
        yield 'I have added you to the status_list'
        self.status_list.append(message)

    def status_checker(self):
        count = 0
        while len(self.status_list) < 2:
            count += 1
            time.sleep(1)
            return
        self.status_list.clear()


    def init_environment(self):
        self.TransportAgent = run_agent('TransportAgent', base=TransportAgent)

        self.NodeAgent = run_agent('NodeAgent', base=NodeAgent)

        self.TransportAgent.connect(self.network_agent_addr, alias=SYNCHRONIZER_CHANNEL_1,
                                    handler={'first_synchronization': TransportAgent.transportAgent_first_handler})
        self.NodeAgent.connect(self.network_agent_addr, alias=SYNCHRONIZER_CHANNEL_1,
                               handler={'first_synchronization': NodeAgent.NodeAgent_first_handler})


if __name__ == '__main__':

    ns = run_nameserver()
    synchronizer_coordinator_agent = run_agent('Synchronizer_CoordinatorAgent',
                                               base=SynchronizerCoordinatorAgent)
    synchronizer_coordinator_agent.init_environment()

    for iteration in range(1, 2):
        for time_step in range(0, 90, 30):
            synchronizer_coordinator_agent.first_synchronization(time_step=time_step, iteration=iteration)
            synchronizer_coordinator_agent.status_checker()
    time.sleep(1)

它打印这个然后挂起

(NetworkAgent):{'time_step':0,“迭代”:1}

(RMOAgent):{'time_step':0',iteration':1}


Tags: runself代理messagetimedefstepstatus
1条回答
网友
1楼 · 发布于 2024-09-30 06:11:25

是的,看来你的status_checker()方法可能坏了。我猜您希望该方法一直阻止,直到status_list中有两条消息(一条来自节点代理,另一条来自传输代理)

因此,您可能正在寻找以下内容:

def status_checker(self):
    while True:
        if len(self.status_list) == 2:
            break
        time.sleep(1)
    self.status_list.clear()

但是,当您从代理调用该方法时:

synchronizer_coordinator_agent.status_checker()

协调器正在阻止执行该调用,因此它将不会处理其他传入消息。一个快速且肮脏的解决方法是使用如下^{} call

synchronizer_coordinator_agent.unsafe.status_checker()

我在这里看到的主要问题是如何处理来自__main__的状态检查器。您应该将同步/步骤移动到协调器中。这意味着:

  • __main__调用协调器上的.start_iterations()方法,以便协调器执行第一步
  • 然后使您的协调器对传入消息作出反应(一旦收到2条消息,它将执行下一步)
  • 协调器一直执行步骤,直到完成
  • 在main中,只需定期监视协调器代理,以查看它何时完成
  • 使用ns.shutdown()关闭系统

您的主屏幕可能如下所示:

if __name__ == "__main__":

    ns = run_nameserver()

    coordinator = run_agent(...)
    coordinator.init_environment()
    coordinator.start_iterations()

    while not coordinator.finished():
        time.sleep(0.5)

    ns.shutdown()

与此无关,但请注意,您当前的range(1, 2)只会导致一次迭代(这可能是有意的)。如果需要2次迭代,可以使用range(2)

相关问题 更多 >

    热门问题