我正在努力学习如何使用multiprocessing
,但我遇到了一个问题。你知道吗
我正在尝试运行以下代码:
import multiprocessing as mp
import random
import string
random.seed(123)
# Define an output queue
output = mp.Queue()
# define a example function
def rand_string(length, output):
""" Generates a random string of numbers, lower- and uppercase chars. """
rand_str = ''.join(random.choice(
string.ascii_lowercase
+ string.ascii_uppercase
+ string.digits)
for i in range(length))
output.put(rand_str)
# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, output)) for x in range(4)]
# Run processes
for p in processes:
p.start()
# Exit the completed processes
for p in processes:
p.join()
# Get process results from the output queue
results = [output.get() for p in processes]
print(results)
从here
代码本身运行正常,但当我用函数替换rand_string
(读取数据帧中的一堆csv文件)时,代码永远不会结束。你知道吗
功能如下:
def readMyCSV(clFile):
aClTable = pd.read_csv(clFile)
# I do some processing here, but at the end the
# function returns a Pandas DataFrame
return(aClTable)
然后我包装函数,以便在参数中允许Queue
:
def readMyCSVParWrap(clFile, outputq):
outputq.put(readMyCSV(clFile))
我用以下方法构建流程:
processes = [mp.Process(target=readMyCSVParWrap, args=(singleFile,output)) for singleFile in allFiles[:5]]
如果我这样做,代码永远不会停止运行,结果也永远不会打印出来。你知道吗
如果我只在输出队列中放入clFile字符串,例如:
outputq.put((clFile))
结果打印正确(只是一个文件列表)
当我查看htop
时,我看到有5个进程正在生成,但它们不使用任何CPU。你知道吗
最后,如果我单独运行readMyCSV
函数,它会正常工作(返回Pandas
数据帧)
我做错什么了吗? 我在一个笔记本上运行这个,也许这是个问题?你知道吗
似乎进程上的
join
-语句导致了死锁。进程无法终止,因为它们等待队列中的项被消费,但在代码中,这只在加入之后发生。你知道吗文档进一步建议用
queue.get
和join
交换行,或者只是删除join
。你知道吗同样重要的是:
相关问题 更多 >
编程相关推荐