如何通过input()控制线程?

2024-05-19 07:23:49 发布

您现在位置:Python中文网/ 问答频道 /正文

我想运行一个进程与我的主代码并行的代码,但也希望通过命令提示符访问它的参数或启动/停止进程。你知道吗

我的机器是win7 64位。我想说的是:

from multiprocessing import Process

class dllapi():
    ...

def apiloop(params, args):
    apiclient = dllapi(**args)
    while True:
        apiclient.cycle()
        params = [....]

def mainloop(args):
    p = Process(target = apiloop, args=(params, args, ))
    while True:
        cmd = input()
        if cmd == 'kill':
            p.terminate()
        if cmd == 'stop':
            pass # no idea
        if cmd == 'resume':
            pass # no idea
        if cmd == 'report':
            print (params)

我想说得简单一点。我确实尝试将apiloop设为线程,但是input()可以冻结程序并停止apiloop的工作,直到我按下enter。。。你知道吗

为了共享apiloop进程的参数,我尝试了queue和pipe,但是,在我看来,queue需要.join来等待apiloop完成,并且pipe有缓冲区限制。你知道吗

(其实我可以apiclient.cycle公司每1s运行一次,但我希望保持apiclient活动)

我想知道是否值得深入挖掘多处理(例如,也将尝试manager…)或有其他更适合我的情况的方法。提前谢谢。。。你知道吗

更新日期:201809170953

经理的一些进展如下:

from multiprocessing import Process, Manager

class dllapi():
    ...
class webclientapi():
    ...

def apiloop(args, cmd, params):
    apiclient = dllapi(**args)
    status = True
    while True:
        # command from main
        if cmd == 'stop':
            status = False
        elif cmd == 'start':
            status = True
        cmd = None
        # stop or run
        if status == True:
            apiclient.cycle()
        # update parameters
        params['status'] = status

def uploadloop(cmds, params):
    uploadclient = webclientapi()
    status = True
    while True:
        # command from main
        if cmd == 'stop':
            status = False
        elif cmd == 'start':
            status = True
        cmd = None
        # stop or run
        if status == True:
            # upload 'status' from apiclient to somewhere
            uploadclient.cycle(params['status'])

def mainloop(args):

    manager = Manager()
    mpcmds = {}
    mpparams = {}
    mps = {}

    mpcmds   ['apiloop'] = manager.Value('u', 'start')
    mpparams ['apiloop'] = manager.dict()
    mps      ['apiloop'] = Process(target = apiloop, args=(args, mpcmds['apiloop'], mpparams['apiloop'])

    mpcmds   ['uploadloop'] = manager.Value('u', 'start')
    # mpparams ['uploadloop'] is directly from mpparams ['apiloop']
    mps      ['uploadloop'] = Process(target = uploadloop, args=(mpcmds['uploadloop'], mpparams['apiloop'])

    for key, mp in mps.items():
        mp.daemon = True
        mp.start()

    while True:
        cmd = input().split(' ')
        # kill daemon process with exit()
        if cmd[0] == 'bye':
            exit()
        # kill individual process
        if cmd[0] == 'kill':
            mps[cmd[1]].terminate()
        # stop individual process via command
        if cmd[0] == 'stop':
            mpcmds[cmd[1]] = 'stop'
        # stop individual process via command
        if cmd[0] == 'start':
            mpcmds[cmd[1]] = 'start'
        # report individual process info via command
        if cmd[0] == 'report':
            print (mpparams ['apiloop'])

希望这能帮上忙。你知道吗


Tags: fromcmdtrueifstatusargsparamsprocess
1条回答
网友
1楼 · 发布于 2024-05-19 07:23:49

我将向您展示如何仅使用线程解决一般问题,因为这是您首先尝试的,并且您的示例不需要子进程。你知道吗

在下面的示例中,dllapi类被命名为Zoo,它是threading.Thread的子类,添加了一些方法来允许执行控制。它在初始化时需要一些data,而它的cycle-方法只是在这个数据上重复迭代,并且只计算它看到特定项的次数。你知道吗

import time
import logging
from queue import Queue
from threading import Thread

from itertools import count, cycle


class Zoo(Thread):

    _ids = count(1)

    def __init__(self, cmd_queue, data, *args,
             log_level=logging.DEBUG, **kwargs):

        super().__init__()
        self.name = f'{self.__class__.__name__.lower()}-{next(self._ids)}'
        self.data = data
        self.log_level = log_level
        self.args = args
        self.kwargs = kwargs

        self.logger = self._init_logging()
        self.cmd_queue = cmd_queue

        self.data_size = len(data)
        self.actual_item = None
        self.iter_cnt = 0
        self.cnt = count(1)
        self.cyc = cycle(self.data)

    def cycle(self):
        item = next(self.cyc)
        if next(self.cnt) % self.data_size == 0:  # new iteration round
            self.iter_cnt += 1
        self.actual_item = f'{item}_{self.iter_cnt}'

    def run(self):
        """
        Run is the main-function in the new thread. Here we overwrite run
        inherited from threading.Thread.
        """
        while True:
            if self.cmd_queue.empty():
                self.cycle()
                time.sleep(1)  # optional heartbeat
            else:
                self._get_cmd()
                self.cmd_queue.task_done()  # unblocks prompter

    def stop(self):
        self.logger.info(f'stopping with actual item: {self.actual_item}')
        # do clean up
        raise SystemExit

    def pause(self):
        self.logger.info(f'pausing with actual item: {self.actual_item}')
        self.cmd_queue.task_done()  # unblocks producer joining the queue
        self._get_cmd()  # just wait blockingly until next command

    def resume(self):
        self.logger.info(f'resuming with actual item: {self.actual_item}')

    def report(self):
        self.logger.info(f'reporting with actual item: {self.actual_item}')
        print(f'completed {self.iter_cnt} iterations over data')

    def _init_logging(self):
        fmt = '[%(asctime)s %(levelname)-8s %(threadName)s' \
          ' %(funcName)s()]  - %(message)s'
        logging.basicConfig(format=fmt, level=self.log_level)
        return logging.getLogger()

    def _get_cmd(self):
        cmd = self.cmd_queue.get()
        try:
            self.__class__.__dict__[cmd](self)
        except KeyError:
            print(f'Command `{cmd}` is unknown.')

input是一个阻塞函数。你需要把它外包到一个单独的线程中,这样它就不会阻塞你的主线程。在下面的例子中,input被包装在Prompter,一个类的子类中穿线。穿线. Prompter将输入传递到命令队列。此命令队列由Zoo读取。你知道吗

class Prompter(Thread):
    """Prompt user for command input.
    Runs in a separate thread so the main-thread does not block.
    """
    def __init__(self, cmd_queue):
        super().__init__()
        self.cmd_queue = cmd_queue

    def run(self):
        while True:
            cmd = input('prompt> ')
            self.cmd_queue.put(cmd)
            self.cmd_queue.join()  # blocks until consumer calls task_done()


if __name__ == '__main__':

    data = ['ape', 'bear', 'cat', 'dog', 'elephant', 'frog']

    cmd_queue = Queue()
    prompter = Prompter(cmd_queue=cmd_queue)
    prompter.daemon = True

    zoo = Zoo(cmd_queue=cmd_queue, data=data)

    prompter.start()
    zoo.start()

终端会话示例:

$python control_thread_over_prompt.py
prompt> report
[2018-09-16 17:59:16,856 INFO     zoo-1 report()]  - reporting with actual item: dog_0
completed 0 iterations over data
prompt> pause
[2018-09-16 17:59:26,864 INFO     zoo-1 pause()]  - pausing with actual item: bear_2
prompt> resume
[2018-09-16 17:59:33,291 INFO     zoo-1 resume()]  - resuming with actual item: bear_2
prompt> report
[2018-09-16 17:59:38,296 INFO     zoo-1 report()]  - reporting with actual item: ape_3
completed 3 iterations over data
prompt> stop
[2018-09-16 17:59:42,301 INFO     zoo-1 stop()]  - stopping with actual item: elephant_3

相关问题 更多 >

    热门问题