路径中的Python3Flask异步子进程挂起

2024-05-12 21:09:31 发布

您现在位置:Python中文网/ 问答频道 /正文

我在Ubuntu18.04上使用了Flask1.0.2和Python3.6。{async}当脚本完成时,{My-app}应该返回到stdouch},然后从后台返回一个脚本。在

我基本上是想从这篇文章中得到一个答案: Non-blocking read on a subprocess.PIPE in python

脚本已经成功启动,我从中获得了所有预期的输出,但问题是它永远不会返回(这意味着永远不会到达Killing subprocess now行)。当我从Linux终端检查进程列表(ps)时,后台脚本已经退出。在

我做错了什么?如何才能成功地打破async for line in process.stdout循环?在

在导入后的文件顶部,我创建了事件循环:

# Create a loop to run all the tasks in.
global eventLoop ; asyncio.set_event_loop(None)
eventLoop = asyncio.new_event_loop()
asyncio.get_child_watcher().attach_loop(eventLoop)

我在路由上方定义了异步协同路由:

^{pr2}$

我的路线是:

@app.route("/train_model", methods=["GET"])
def train_new_model():
    # Use global event loop
    global eventLoop   

    with closing(eventLoop):        
        eventLoop.run_until_complete(readAsyncFunctionAndKill("s.py"))

    return jsonify("done"), 200

调用的“s.py”脚本被标记为可执行,并且位于同一工作目录中。此处显示了缩写的脚本(它包含几个子进程并实例化了PyTorch类):

def main():

    # Ensure that swap is activated since we don't have enough RAM to train our model otherwise
    print("[%s] Activating swap now ..." % (os.path.basename(__file__)))
    subprocess.call("swapon -a", shell=True)

    # Need to initialize GPU
    print("[%s] Initializing GPU ..." % (os.path.basename(__file__)))
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    defaults.device = torch.device("cuda")
    with torch.cuda.device(0):
        torch.tensor([1.]).cuda()

    print("[%s] Cuda is Available: %s - with Name: %s ..." % (os.path.basename(__file__),torch.cuda.is_available(),torch.cuda.get_device_name(0)))

    try:

        print("[%s] Beginning to train new model and replace existing model ..." % (os.path.basename(__file__)))


        # Batch size
        bs = 16
        #bs = 8

        # Create ImageBunch
        tfms = get_transforms(do_flip=True,
                              flip_vert=True,
                              max_rotate=180.,
                              max_zoom=1.1,
                              max_lighting=0.5,
                              max_warp=0.1,
                              p_affine=0.75,
                              p_lighting=0.75)

        # Create databunch using folder names as class names
        # This also applies the transforms and batch size to the data
        os.chdir(TRAINING_DIR)
        data = ImageDataBunch.from_folder("TrainingData", ds_tfms=tfms, train='.', valid_pct=0.2, bs=bs)

        ...    

        # Create a new learner with an early stop callback
        learn = cnn_learner(data, models.resnet18, metrics=[accuracy], callback_fns=[
            partial(EarlyStoppingCallback, monitor='accuracy', min_delta=0.01, patience=3)])

        ... 

        print("[%s] All done training ..." % (os.path.basename(__file__)))

        # Success
        sys.exit(0)

    except Exception as err:

        print("[%s] Error training model [ %s ] ..." % (os.path.basename(__file__),err))
        sys.exit(255)

if __name__== "__main__":
  main()

Tags: topath脚本loopmodelosdevicecreate
1条回答
网友
1楼 · 发布于 2024-05-12 21:09:31

这里有几个问题:

  • 您正在导入时创建一个新的事件循环,一次,但在视图中关闭该事件循环。根本不需要关闭循环,因为循环已关闭,第二个请求现在将失败。

  • 异步事件循环是线程安全的,不应该在线程之间共享。绝大多数Flask部署都将使用线程来处理传入的请求。您的代码包含了应该如何处理的回声,但不幸的是,这不是正确的方法。E、 g.asyncio.get_child_watcher().attach_loop(eventLoop)基本上是冗余的,因为eventLoop = asyncio.new_event_loop()如果在主线程上运行,已经做到了这一点。在

    这是你所看到的问题的主要候选者。

  • 您的代码假定可执行文件实际上是存在的和可执行的。您应该处理OSError异常(和子类),因为非限定的s.py只有在使其可执行时才有效,它以#!shebang行开头,位于PATH。它不能工作仅仅因为它在同一个目录中,你也不想依赖当前的工作目录。

  • 您的代码假设进程在某个时刻关闭stdout。如果子进程从未关闭stdout(当进程退出时自动发生的事情),那么async for line in process.stdout:循环也将永远等待。考虑在代码中添加超时,以避免在出错的子进程上被阻塞。

在多线程应用程序中使用asyncio子进程时,您确实希望阅读Python asyncio文档中的两个部分:

  • Concurrency and Multithreading section,说明几乎所有异步对象都不是线程安全的。您不想直接从其他线程向循环中添加任务;您需要为每个线程使用一个事件循环,或者使用^{} function在特定线程中的循环上运行协同程序。

  • 对于Python 3.7之前的版本,您还需要阅读Subprocess and Threads section,因为在该版本之前asyncio使用非阻塞os.waitpid(-1, os.WNOHANG)调用来跟踪子状态,并且依赖于使用信号处理(只能在主线程上完成)。python3.8删除了这个限制(通过在一个单独的线程中添加一个新的child watcher implementation调用,该调用使用一个阻塞的每个进程os.waitpid()调用,但要牺牲额外的内存。在

    但是,您不需要坚持默认的儿童观察者策略。您可以使用^{}并传入一个different process watcher instance。实际上,这意味着向后移植3.8 ^{} implementation

对于您的用例,实际上不需要为每个线程运行一个新的事件循环。根据需要在单独的线程中运行单个循环。如果在单独的线程中使用循环,则根据Python版本的不同,可能需要在主线程上有一个正在运行的循环,或者使用不同的进程监视程序。一般来说,在WSGI服务器的主线程上运行asyncio循环并不容易,甚至不可能。在

因此,您需要在一个单独的线程中永久地运行一个循环,并且需要使用一个没有主线程循环的子进程监视程序。下面是一个实现,对于Python版本3.6及更高版本,它应该可以工作:

import asyncio
import itertools
import logging
import time
import threading

try:
    # Python 3.8 or newer has a suitable process watcher
    asyncio.ThreadedChildWatcher
except AttributeError:
    # backport the Python 3.8 threaded child watcher
    import os
    import warnings

    # Python 3.7 preferred API
    _get_running_loop = getattr(asyncio, "get_running_loop", asyncio.get_event_loop)

    class _Py38ThreadedChildWatcher(asyncio.AbstractChildWatcher):
        def __init__(self):
            self._pid_counter = itertools.count(0)
            self._threads = {}

        def is_active(self):
            return True

        def close(self):
            pass

        def __enter__(self):
            return self

        def __exit__(self, exc_type, exc_val, exc_tb):
            pass

        def __del__(self, _warn=warnings.warn):
            threads = [t for t in list(self._threads.values()) if t.is_alive()]
            if threads:
                _warn(
                    f"{self.__class__} has registered but not finished child processes",
                    ResourceWarning,
                    source=self,
                )

        def add_child_handler(self, pid, callback, *args):
            loop = _get_running_loop()
            thread = threading.Thread(
                target=self._do_waitpid,
                name=f"waitpid-{next(self._pid_counter)}",
                args=(loop, pid, callback, args),
                daemon=True,
            )
            self._threads[pid] = thread
            thread.start()

        def remove_child_handler(self, pid):
            # asyncio never calls remove_child_handler() !!!
            # The method is no-op but is implemented because
            # abstract base class requires it
            return True

        def attach_loop(self, loop):
            pass

        def _do_waitpid(self, loop, expected_pid, callback, args):
            assert expected_pid > 0

            try:
                pid, status = os.waitpid(expected_pid, 0)
            except ChildProcessError:
                # The child process is already reaped
                # (may happen if waitpid() is called elsewhere).
                pid = expected_pid
                returncode = 255
                logger.warning(
                    "Unknown child process pid %d, will report returncode 255", pid
                )
            else:
                if os.WIFSIGNALED(status):
                    returncode = -os.WTERMSIG(status)
                elif os.WIFEXITED(status):
                    returncode = os.WEXITSTATUS(status)
                else:
                    returncode = status

                if loop.get_debug():
                    logger.debug(
                        "process %s exited with returncode %s", expected_pid, returncode
                    )

            if loop.is_closed():
                logger.warning("Loop %r that handles pid %r is closed", loop, pid)
            else:
                loop.call_soon_threadsafe(callback, pid, returncode, *args)

            self._threads.pop(expected_pid)

    # add the watcher to the loop policy
    asyncio.get_event_loop_policy().set_child_watcher(_Py38ThreadedChildWatcher())

__all__ = ["EventLoopThread", "get_event_loop", "stop_event_loop", "run_coroutine"]

logger = logging.getLogger(__name__)

class EventLoopThread(threading.Thread):
    loop = None
    _count = itertools.count(0)

    def __init__(self):
        name = f"{type(self).__name__}-{next(self._count)}"
        super().__init__(name=name, daemon=True)

    def __repr__(self):
        loop, r, c, d = self.loop, False, True, False
        if loop is not None:
            r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
        return (
            f"<{type(self).__name__} {self.name} id={self.ident} "
            f"running={r} closed={c} debug={d}>"
        )

    def run(self):
        self.loop = loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

        try:
            loop.run_forever()
        finally:
            try:
                shutdown_asyncgens = loop.shutdown_asyncgens()
            except AttributeError:
                pass
            else:
                loop.run_until_complete(shutdown_asyncgens)
            loop.close()
            asyncio.set_event_loop(None)

    def stop(self):
        loop, self.loop = self.loop, None
        if loop is None:
            return
        loop.call_soon_threadsafe(loop.stop)
        self.join()

_lock = threading.Lock()
_loop_thread = None

def get_event_loop():
    global _loop_thread
    with _lock:
        if _loop_thread is None:
            _loop_thread = EventLoopThread()
            _loop_thread.start()
        return _loop_thread.loop

def stop_event_loop():
    global _loop_thread
    with _lock:
        if _loop_thread is not None:
            _loop_thread.stop()
            _loop_thread = None

def run_coroutine(coro):
    return asyncio.run_coroutine_threadsafe(coro, get_event_loop())

上面是我为Python3 Asyncio call from Flask route发布的相同的“run async with Flask”解决方案,但是添加了ThreadedChildWatcher后端口。在

然后可以使用从get_event_loop()返回的循环来运行子进程,方法是调用run_coroutine_threadsafe()

^{pr2}$

请注意,上面的函数可能会超时(以秒为单位),这是您等待子进程完成的最大时间量。在

相关问题 更多 >