为什么python中管道的变量不能作为线程间通信的管道工作

2024-09-28 17:22:20 发布

您现在位置:Python中文网/ 问答频道 /正文

在我研究IPC时,特别是在python3.4.1中处理线程和套接字时, 我经历了一些有点奇怪的事情,我不太明白发生了什么。 目前(目前)我正在使用一个匿名管道os.管道()发送 一种发送给保持套接字连接的线程的终止信号。在

我的目标是以一种优雅的方式终止线程。一开始我尝试使用布尔标志,但是由于select调用被阻塞,我不得不向选择。选择可以读取;套接字、管道、stdin等,因此中断了select调用。在

在我发现如何使用管道与线程通信并穿透select调用之前,我中断了对测试分支的开发。在

让我解释一下我的情况。 基本上这是有效的…:

import os
import threading
import select

class MyThread(threading.Thread):
    def __init__(self, pipein):
        threading.Thread.__init__(self)
        # The pipe to listen for terminate signals on
        self.pipein = pipein
        self.stopped = False
        self.inputs = [self.pipein]

    def run(self):
        print("Thread-1 Started")
        while not self.stopped:
            inputready, outputready, errors = select.select(self.inputs, [], [])
            for i in inputready:
                if i == self.pipein:
                    signal = os.read(self.pipein, 64)
                    # 64 An arbitrary length that should be enough in any case
                    print("The thread received stuff on the pipe: %s" % signal)
                    if signal == b'stop':
                        print("Stop command received.")
                        print("Exiting.")
                        self.stopped = True
                        break

if __name__ == "__main__":

    # Create the communication pipes
    pipe = os.pipe()

    # Start the worker thread
    print("Starting the worker thread...")
    t = MyThread(pipe[0])
    t.start()
    print("Worker thread started.")


    stopped = False

    # Enter the main loop
    while not stopped:
        command = input("Command to send to thread: ")
        os.write(pipe[1], bytes(command, 'UTF-8'))
        stopped = True

如果我在终端中输入'stop'我会得到:

^{pr2}$

但这并不是:

import os
import threading
import select

class MyThread(threading.Thread):
    def __init__(self, pipein):
        threading.Thread.__init__(self)
        # The pipe to listen for terminate signals on
        self.pipein = pipein
        self.stopped = False
        self.inputs = [self.pipein]

    def run(self):
        print("Thread-1 Started")
        while not self.stopped:
            inputready, outputready, errors = select.select(self.inputs, [], [])
            for i in inputready:
                if i == self.pipein:
                    signal = os.read(self.pipein, 64)
                    # 64 An arbitrary length that should be enough in any case
                    print("The thread received stuff on the pipe: %s" % signal)
                    if signal == b'stop':
                        print("Stop command received.")
                        print("Exiting.")
                        self.stopped = True
                        break

if __name__ == "__main__":

    # Create the communication pipes
    pipein, pipeout = os.pipe() # Seperate into reader fd and writer fd

    # Start the worker thread
    print("Starting the worker thread...")
    t = MyThread(pipein) # Give the thread the receiver
    t.start()
    print("Worker thread started.")


    stopped = False

    # Enter the main loop
    while not stopped:
        command = input("Command to send to thread: ")
        # Write on the variable of pipe[1]: pipeout
        os.write(pipeout, bytes(command, 'UTF-8'))
        stopped = True

不同的是,得到一个

OSError: [Errno 9] Bad file descriptor

尝试从从管道创建的变量中读取或写入时

比如:

pipein, pipeout = os.pipe()

或者

pipe = os.pipe()
pipein = pipe[0]
pipeout = pipe[1]

但是,如果我使用管道[0]和管道[1]分别读取和写入操作系统读取()和操作系统写入()效果很好!在

因此,为pipe[0]或pipe[1]创建任何类型的变量都不起作用,因此我get和OSError。 如果我创建一个类调用通信器,并将管道[0]和管道[1]作为实例变量,则同样适用。在

有人能解释一下为什么会这样吗?我将永远无法写入管道[1]的变量,还是仅仅因为我在线程之间穿梭?在

如果你知道线程间通信的另一种方法,可以在select调用中使用或中断,我会洗耳恭听。在

我试过一个斯金吉奥或io.{OtherIOHere}但它们不支持fileno()调用,因此不能使用select

我想创建一个类来包含我的通信管道,以便更好地使用,但直到我发现为什么管道变量不能工作,我不能。在

如有任何意见或建议,我们将不胜感激。在

编辑:

添加了一些调试测试:

import os
import threading
import time
import select

class MyThread(threading.Thread):
    def __init__(self, pipein):
        threading.Thread.__init__(self)
        self.pipein = pipein
        self.stopped = False
        self.inputs = [self.pipein]

    def run(self):
        print("Thread-1 Started")
        while not self.stopped:
            inputready, outputready, errors = select.select(self.inputs, [], [])
            for i in inputready:
                if i == self.pipein:
                    signal = os.read(self.pipein, 64)
                    print("The thread received stuff on the pipe: %s" % signal)
                    if signal == b'stop':
                        print("Stop command received.")
                        print("Exiting.")
                        self.stopped = True
                        break

if __name__ == "__main__":

    # Create the communication pipes
    pipe = os.pipe()
    pipein = pipe[0]
    pipeout = pipe[1]

    # Some Print debugs
    print(type(pipein))
    print(type(pipeout))
    print(pipein)
    print(pipeout)
    print(type(pipe))
    print(type(pipe[0]))
    print(type(pipe[1]))
    print(pipe[0])
    print(pipe[1])


    # Start the worker thread
    print("Starting the worker thread...")
    t = MyThread(pipein)
    t.start()
    print("Worker thread started.")

    # Enter the main loop

    stopped = False


    while not stopped:
        command = input("Command to send to thread: ")
        os.write(pipeout, bytes(command, 'UTF-8'))
        stopped = True

@戴夫,有趣的是,这现在起作用了,根本不知道为什么。我也这么做了这是两个不同的项目。在这两种情况下,我都无法写入pipe[1]的变量

 localhost:ipc.git $ python3 pipes.py
<class 'int'>
<class 'int'>
3
4
<class 'tuple'>
<class 'int'>
<class 'int'>
3
4
Starting the worker thread...
Thread-1 Started
Worker thread started.
Command to send to thread: stop
The thread received stuff on the pipe: b'stop'
Stop command received.
Exiting.
localhost:ipc.git $ 

编辑2

好的,我已经创建了Communicator类,用管道在线程之间进行通信。它带有易于使用的read()和write()方法。一切似乎都很好。不知道为什么以前没用。一定是和系统有关。也许我的插座和线程的工作有点紧张。在

以下是完整的功能代码:

导入操作系统 导入线程 导入选择

class MyThread(threading.Thread):
    def __init__(self, comm):
        threading.Thread.__init__(self)
        self.comm = comm
        self.stopped = False
        self.inputs = [self.comm.pipein]

    def run(self):
        print("Thread-1 Started")
        while not self.stopped:
            inputready, outputready, errors = select.select(self.inputs, [], [])
            for i in inputready:
                if i == self.comm.pipein:
                    signal = self.comm.read()
                    print("The thread received stuff on the pipe: %s" % signal)
                    if signal == b'stop':
                        print("Stop command received.")
                        print("Exiting.")
                        self.stopped = True
                        break

class Communicator:
    def __init__(self):
        self.pipe = os.pipe()
        self.pipein = self.pipe[0]
        self.pipeout = self.pipe[1]

    def write(self, msg):
        os.write(self.pipeout, msg)

    def read(self):
        return os.read(self.pipein, 64)

if __name__ == "__main__":

    # Create the communication pipes
    #pipe = os.pipe()
    #pipein = pipe[0]
    #pipeout = pipe[1]

    # Use the communicator class
    comm = Communicator()


    # Some Print debugs


    # Start the worker thread
    print("Starting the worker thread...")
    t = MyThread(comm)
    t.start()
    print("Worker thread started.")

    # Enter the main loop

    stopped = False

    while not stopped:
        command = input("Command to send to thread: ")
        comm.write(b'stop')
        stopped = True

谢谢你们的帮助。在


Tags: thetoselfsignalif管道osselect
1条回答
网友
1楼 · 发布于 2024-09-28 17:22:20

我把你的两个代码示例复制粘贴到我的macbook上的2个文件中,用python3.4.1(来自macports)运行它们,输入“stop”,它们都起作用了。在

你在用什么操作系统?在

编辑:看起来你“修复”了它。干得好。;)

相关问题 更多 >