如何在Python3中同时进行并行输入和输出?

2024-09-24 22:23:09 发布

您现在位置:Python中文网/ 问答频道 /正文

我需要设计一个脚本,该脚本使用终端的顶部作为输出,其中一些行在无限循环中每秒钟后打印一次,而底部部分继续接受用户输入,并在上面的部分(在定期输出中)打印它们

换句话说,我需要设计一种外壳

我尝试了以下幼稚的多线程方法:

#!/usr/bin/python3

from math import acos
from threading import Thread
from random import choice
from time import sleep
from queue import Queue, Empty

commandQueue = Queue()

def outputThreadFunc():
    outputs = ["So this is another output","Yet another output","Is this even working"] # Just for demo
    while True:
        print(choice(outputs))
        try:
            inp = commandQueue.get(timeout=0.1)
            if inp == 'exit':
                return
            else:
                print(inp)
        except Empty:
            pass        
        sleep(1)

def inputThreadFunc():
    while True:
        command = input("> ") # The shell
        if command == 'exit':
            return
        commandQueue.put(command)

# MAIN CODE
outputThread = Thread(target=outputThreadFunc)
inputThread = Thread(target=inputThreadFunc)
outputThread.start()
inputThread.start()
outputThread.join()
inputThread.join()

print("Exit")

但很明显,正如预期的那样,当用户不断键入时,输出行与输入行合并

有什么想法吗


Tags: 用户fromimport脚本queuesleepthreadcommand
3条回答

如评论中所述,使用了curses

更新

输入和输出使用两个子温

#!/usr/bin/python3

import curses

from math import acos
from threading import Thread
from random import choice
from time import sleep
from queue import Queue, Empty


commandQueue = Queue()

stdscr = curses.initscr()
stdscr.keypad(True)

upperwin = stdscr.subwin(2, 80, 0, 0)
lowerwin = stdscr.subwin(2,0)

def outputThreadFunc():
    outputs = ["So this is another output","Yet another output","Is this even working"] # Just for demo
    while True:
        upperwin.clear()
        upperwin.addstr(f"{choice(outputs)}")
        try:
            inp = commandQueue.get(timeout=0.1)
            if inp == 'exit':
                return
            else:
                upperwin.addch('\n')
                upperwin.addstr(inp)
        except Empty:
            pass

        upperwin.refresh()
        sleep(1)
        


def inputThreadFunc():
    while True:
        global buffer

        lowerwin.addstr("->")

        command = lowerwin.getstr()

        if command:
            command = command.decode("utf-8")
            commandQueue.put(command)
            lowerwin.clear()

            lowerwin.refresh()
            if command == 'exit':
                return

            
        


# MAIN CODE
outputThread = Thread(target=outputThreadFunc)
inputThread = Thread(target=inputThreadFunc)
outputThread.start()
inputThread.start()
outputThread.join()
inputThread.join()

stdscr.keypad(False)
curses.endwin()
print("Exit")


老办法

我已经编辑了您的示例,以使用getch而不是input

#!/usr/bin/python3

import curses
import datetime

from math import acos
from threading import Thread
from random import choice
from time import sleep
from queue import Queue, Empty

INFO_REFRESH_SECONDS = 1

commandQueue = Queue()
buffer = list()  # stores your input buffer
stdscr = curses.initscr()
stdscr.keypad(True)

def outputThreadFunc():
    outputs = ["So this is another output","Yet another output","Is this even working"] # Just for demo
    info = choice(outputs), datetime.datetime.now()
    while True:

        if datetime.datetime.now() - info[1] > datetime.timedelta(seconds=INFO_REFRESH_SECONDS):
            # refresh info after certain period of time

            info = choice(outputs), datetime.datetime.now()  # timestamp which info was updated

        inp = ''
        buffer_text = ''.join(buffer)
        try:
            command = commandQueue.get(timeout=0.1)
            if command == 'exit':
                return
            inp = f"\n{command}"
        except Empty:
            pass 
        output_string = f"{info[0]}{inp}\n->{buffer_text}"
        stdscr.clear()
        stdscr.addstr(output_string)
        stdscr.refresh()
        if inp:
            # to make sure you see the command
            sleep(1)
        


def inputThreadFunc():
    while True:
        global buffer

        # get one character at a time
        key = stdscr.getch()
        curses.echo()

        if chr(key) == '\n':
            command = ''.join(buffer)
            commandQueue.put(command)
            if command == 'exit':
                return
            buffer = []
        elif key == curses.KEY_BACKSPACE:
            
            if buffer:
                buffer.pop()
        else:
            buffer.append(chr(key))

            
        


# MAIN CODE
outputThread = Thread(target=outputThreadFunc)
inputThread = Thread(target=inputThreadFunc)
outputThread.start()
inputThread.start()
outputThread.join()
inputThread.join()

stdscr.keypad(False)
curses.endwin()
print("Exit")

最简单的解决方案是使用两个脚本;一个是打印输出的服务器,另一个是向服务器发送用户输入的客户端。然后,您可以使用像tmux这样的标准解决方案在两个窗格中打开这两个脚本

由于终端写入输出的方式,两者正在合并。它在缓冲区中收集输出,当时间合适时,它会立即输出所有内容。一个简单的解决方法是在每个实际语句之前使用'\n',这样每个新输出都位于单独的行上

#!/usr/bin/python3

                    .
                    .
                    .

            if inp == 'exit':
                return
            else:
                print("\n", inp) # CHANGE OVER HERE

                    .
                    .
                    .

        command = input("\n> ") # CHANGE OVER HERE
        if command == 'exit':
            return

                    .
                    .
                    .

print("Exit")

请注意,由于两个线程并行运行,下一个输出将在您完成键入并按enter键输入之前打印(除非您可以非常快速地键入并具有非常快速的反射)。希望这能回答你的问题

相关问题 更多 >