SimPy进程据说是文档中的一个生成器,我希望它具有PEP 342中的send方法
Processes are described by simple Python generators. You can call them process function or process method, depending on whether it is a normal function or method of a class. During their lifetime, they create events and yield them in order to wait for them to be triggered.
我可以使用send从另一台发电机向发电机发送消息,例如将充电站发电机注入电动汽车发电机
from typing import Dict, Optional, Generator
import time
def ev(env) -> Generator:
while True:
next_charging_station = yield
print("next_charging_station[{}]".format(next_charging_station))
def dispathcer(env, ev) -> Generator:
while True:
time.sleep(3)
print('Dispatching the next charging station')
ev.send("new station")
process_ev = ev(env)
next(process_ev)
process_dispatcher = dispathcer(env, process_ev)
Dispatching the next charging station
next_charging_station[new station]
Dispatching the next charging station
next_charging_station[new station]
但是,SymPy进程没有发送方法
import simpy
env = simpy.Environment()
def ev(env):
while True:
next_charging_station = yield
print("next_charging_station[{}]".format(next_charging_station))
def dispathcer(env, ev):
while True:
print('Dispatching the next charging station at %d' % (env.now))
ev.send("new station")
rocess_ev = env.process(ev(env))
process_dispatcher = env.process(dispathcer(env, process_ev))
-----
Dispatching the next charging station at 0
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-12-ece25f162510> in <module>
1 process_ev = env.process(ev(env))
----> 2 process_dispatcher = env.process(dispathcer(env, process_ev))
<ipython-input-11-b89aeb1a9073> in dispathcer(env, ev)
11 while True:
12 print('Dispatching the next charging station at %d' % (env.now))
---> 13 ev.send("hoge")
AttributeError: 'Process' object has no attribute 'send'
如果这是设计的(发送和可能的关闭/抛出不可用),或者如果我误解了某些内容,请帮助理解此错误的原因。我认为所有的同步都需要由SimPy环境通过其事件来完成,否则环境可能会失去对进程状态的跟踪,但不能确定
请告知是否有方法从另一个进程更新进程中的内存。例如,如何更改代码以将充电站进程动态注入ev进程
import simpy
env = simpy.Environment()
def charging_station(env):
print('Start charging at %d' % (env.now))
charge_duration = 3
yield env.timeout(charge_duration)
return
def ev(env):
while True:
print('Start driving at %d' % (env.now))
driving_duration = 5
yield env.timeout(driving_duration)
print('Stop at a charging station at %d' % (env.now))
# [Q] Instead of creating a charging process inside the ev process,
# how to get it injected?
yield env.process(charging_station(env)) # <-----
env.process(ev(env))
env.run(until=20)
-----
Start driving at 0
Stop at a charging station at 5
Start charging at 5
Start driving at 8
Stop at a charging station at 13
Start charging at 13
Start driving at 16
如果可能的话,可以这样做
def new_ev(env): # <---- how to implement?
while True:
next_charging_station = yield # Get the next station process
print('Start charging at %d' % (env.now))
yield next_charging_station
print('Start driving at %d' % (env.now))
driving_duration = 5
yield env.timeout(driving_duration)
看起来只有在流程创建时,才能将其注入到流程中。因此,一种方法是将调度员注入ev流程,然后从ev流程内部获得充电站。但是,您希望显式地向其中注入任何实例,而不是从内部提取
import random
import simpy
env = simpy.Environment()
class Dispatcher(object):
env = None
def __init__(self, env):
self.env = env
print('Start dispatcher at time %d' % self.env.now)
# Start the run process everytime an instance is created.
#self.process = env.process(self.run())
def run(self):
while True:
print('Start dispatcher at time %d' % self.env.now)
yield self.env.timeout(1)
def next_charger_station(self):
return self._charge(random.randint(1, 1000), self.env)
def _charge(self, id, env):
print('station %s start charging at time %d' % (id, env.now))
yield env.timeout(5)
return
def ev(env, dispatcher):
while True:
print('EV stops at the new charging station at time %d' % (env.now))
yield env.process(dispatcher.next_charger_station())
print('EV start driving from the charging station at time %d' % (env.now))
yield env.timeout(5)
dispatcher = Dispatcher(env)
process_ev = env.process(ev(env, dispatcher))
env.run(until=30)
目前没有回答
相关问题 更多 >
编程相关推荐