python多处理卡住了(可能正在读取csv)

2024-10-04 11:23:58 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在努力学习如何使用multiprocessing,但我遇到了一个问题。你知道吗

我正在尝试运行以下代码:

import multiprocessing as mp
import random
import string

random.seed(123)

# Define an output queue
output = mp.Queue()

# define a example function
def rand_string(length, output):
    """ Generates a random string of numbers, lower- and uppercase chars. """
    rand_str = ''.join(random.choice(
                        string.ascii_lowercase
                        + string.ascii_uppercase
                        + string.digits)
                   for i in range(length))
    output.put(rand_str)

# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, output)) for x in range(4)]

# Run processes
for p in processes:
    p.start()

# Exit the completed processes
for p in processes:
    p.join()

# Get process results from the output queue
results = [output.get() for p in processes]

print(results)

here

代码本身运行正常,但当我用函数替换rand_string(读取数据帧中的一堆csv文件)时,代码永远不会结束。你知道吗

功能如下:

def readMyCSV(clFile):

    aClTable = pd.read_csv(clFile)

    # I do some processing here, but at the end the 
    # function returns a Pandas DataFrame

    return(aClTable)

然后我包装函数,以便在参数中允许Queue

def readMyCSVParWrap(clFile, outputq):
    outputq.put(readMyCSV(clFile))

我用以下方法构建流程:

processes = [mp.Process(target=readMyCSVParWrap, args=(singleFile,output)) for singleFile in allFiles[:5]]

如果我这样做,代码永远不会停止运行,结果也永远不会打印出来。你知道吗

如果我只在输出队列中放入clFile字符串,例如:

outputq.put((clFile))

结果打印正确(只是一个文件列表)

当我查看htop时,我看到有5个进程正在生成,但它们不使用任何CPU。你知道吗

最后,如果我单独运行readMyCSV函数,它会正常工作(返回Pandas数据帧)

我做错什么了吗? 我在一个笔记本上运行这个,也许这是个问题?你知道吗


Tags: the代码inimportforoutputstringput
1条回答
网友
1楼 · 发布于 2024-10-04 11:23:58

似乎进程上的join-语句导致了死锁。进程无法终止,因为它们等待队列中的项被消费,但在代码中,这只在加入之后发生。你知道吗

Joining processes that use queues

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread method of the queue to avoid this behaviour.)

This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically. docs

文档进一步建议用queue.getjoin交换行,或者只是删除join。你知道吗

同样重要的是:

Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such a starting a new process)...protect the “entry point” of the program by using if name == 'main':. ibid

相关问题 更多 >