优雅地终止Autobahn ApplicationRunner().run(),使用信号.SIGINT

2024-05-20 14:17:11 发布

您现在位置:Python中文网/ 问答频道 /正文

我之前问过how to run several ^{} instances from within the same python process,但没有阻止他们。在

问题已经解决了,但是我遇到了一个新问题。在

终止这些mp.Process实例非常困难。我知道ApplicationRunner.run()中的代码在KeyboardInterrupt时退出,但我无法正确触发它。在

样本代码:

class PoloniexSession(ApplicationSession):

    @coroutine
    def onJoin(self, *args, **kwargs):
        channel = self.config.extra['channel']

        def onTicker(*args, **kwargs):
            self.config.extra['queue'].put((channel, (args, kwargs, time.time())))

        try:
            yield from self.subscribe(onTicker, self.config.extra['channel'])

        except Exception as e:
            raise


class PlnxEndpoint(mp.Process):
    def __init__(self, endpoint, q, **kwargs):
        super(PlnxEndpoint, self).__init__(name='%s Endpoint Process' %
                                                endpoint, **kwargs)
        self.endpoint = endpoint
        self.q = q

    def run(self):
        self.runner = ApplicationRunner("wss://api.poloniex.com:443", 'realm1',
                                   extra={'channel': self.endpoint, 
                                          'queue': self.q})
        self.runner.run(PoloniexSession)

    def join(self, *args, **kwargs):
        def sig_handler(x, y):
            pass
        signal.signal(signal.SIGINT, sig_handler)
        super(PlnxEndpoint, self).join(*args, **kwargs)


class PoloniexWSS(WSSAPI):
    def __init__(self, endpoints=None):
        super(PoloniexWSS, self).__init__(None, 'Poloniex')
        self.data_q = mp.Queue()
        self.connections = {}
        if endpoints:
            self.endpoints = endpoints
        else:
            r = requests.get('https://poloniex.com/public?command=returnTicker')
            self.endpoints = list(r.json().keys())
            self.endpoints.append('ticker')

        for endpoint in self.endpoints:
            self.connections[endpoint] = PlnxEndpoint(endpoint, self.data_q)

    def start(self):
        super(PoloniexWSS, self).start()
        for conn in self.connections:
            self.connections[conn].start()

    def stop(self):
        for conn in self.connections:
            self.connections[conn].join()
        super(PoloniexWSS, self).stop()

虽然这充分地填充了self.q,但当我的子进程停止时,我仍然收到一个错误:

^{pr2}$

这让我相信我的signal.SIGINT没有在我想要的地方触发。在

According to the source code of ^{},一个SIGINT/KeyboardInterrupt应该优雅地结束serve_forever()方法。在

手动关闭asyncio.event_loop也会导致上述错误:

class PlnxEndpoint(mp.Process):
#...
    def join(self, *args, **kwargs):
        loop = get_event_loop()
        loop.stop()
        super(PlnxEndpoint, self).join(*args, **kwargs)
#...

Tags: runselfdefchannelargsmpconnectionsprocess
1条回答
网友
1楼 · 发布于 2024-05-20 14:17:11

毕竟,稍微摆弄一下就得到了一个相当简单的解决方案:

使用multiprocessing.Event(),我可以优雅地结束我的过程。在

class PoloniexSession(ApplicationSession):

    @coroutine
    def onJoin(self, *args, **kwargs):
        channel = self.config.extra['channel']

        def onTicker(*args, **kwargs):
            self.config.extra['queue'].put((channel, (args, kwargs, time.time())))

        if self.config.extra['is_killed'].is_set():
            raise KeyboardInterrupt()
        try:
            yield from self.subscribe(onTicker, self.config.extra['channel'])

        except Exception as e:
            raise


class PlnxEndpoint(mp.Process):
    def __init__(self, endpoint, q, **kwargs):
        super(PlnxEndpoint, self).__init__(name='%s Endpoint Process' %
                                                endpoint, **kwargs)
        self.endpoint = endpoint
        self.q = q
        self.is_killed = mp.Event()

    def run(self):
        self.runner = ApplicationRunner("wss://api.poloniex.com:443", 'realm1',
                                   extra={'channel': self.endpoint,
                                          'queue': self.q,
                                          'is_killed': self.is_killed})
        self.runner.run(PoloniexSession)

    def join(self, *args, **kwargs):
        self.is_killed.set()
        super(PlnxEndpoint, self).join(*args, **kwargs)

相关问题 更多 >