我需要为数据库的每个元素运行一个函数。在
当我尝试以下操作时:
from multiprocessing import Pool
from pymongo import Connection
def foo():
...
connection1 = Connection('127.0.0.1', 27017)
db1 = connection1.data
my_pool = Pool(6)
my_pool.map(foo, db1.index.find())
我得到以下错误:
Job 1, 'python myscript.py ' terminated by signal SIGKILL (Forced quit)
我认为,这是由于db1.index.find()
在试图返回数百万个数据库元素时占用了所有可用的ram。。。在
我应该如何修改我的代码使其工作?在
这里有一些日志:
^{pr2}$实际功能如下:
def create_barrel(item):
connection = Connection('127.0.0.1', 27017)
db = connection.data
print db.index.count()
barrel = []
fls = []
if 'name' in item.keys():
barrel.append(WhitespaceTokenizer().tokenize(item['name']))
name = item['name']
elif 'name.utf-8' in item.keys():
barrel.append(WhitespaceTokenizer().tokenize(item['name.utf-8']))
name = item['name.utf-8']
else:
print item.keys()
if 'files' in item.keys():
for file in item['files']:
if 'path' in file.keys():
barrel.append(WhitespaceTokenizer().tokenize(" ".join(file['path'])))
fls.append(("\\".join(file['path']),file['length']))
elif 'path.utf-8' in file.keys():
barrel.append(WhitespaceTokenizer().tokenize(" ".join(file['path.utf-8'])))
fls.append(("\\".join(file['path.utf-8']),file['length']))
else:
print file
barrel.append(WhitespaceTokenizer().tokenize(file))
if len(fls) < 1:
fls.append((name,item['length']))
barrel = sum(barrel,[])
for s in barrel:
vs = re.findall("\d[\d|\.]*\d", s) #versions i.e. numbes such as 4.2.7500
b0 = []
for s in barrel:
b0.append(re.split("[" + string.punctuation + "]", s))
b1 = filter(lambda x: x not in string.punctuation, sum(b0,[]))
flag = True
while flag:
bb = []
flag = False
for bt in b1:
if bt[0] in string.punctuation:
bb.append(bt[1:])
flag = True
elif bt[-1] in string.punctuation:
bb.append(bt[:-1])
flag = True
else:
bb.append(bt)
b1 = bb
b2 = b1 + barrel + vs
b3 = list(set(b2))
b4 = map(lambda x: x.lower(), b3)
b_final = {}
b_final['_id'] = item['_id']
b_final['tags'] = b4
b_final['name'] = name
b_final['files'] = fls
print db.barrels.insert(b_final)
我注意到了一件有趣的事。然后按ctrl+c停止进程,得到以下结果:
python index2barrel.py
Traceback (most recent call last):
File "index2barrel.py", line 83, in <module>
my_pool.map(create_barrel, db1.index.find, 6)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 227, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 280, in map_async
iterable = list(iterable)
TypeError: 'instancemethod' object is not iterable
我的意思是,为什么多处理要把一些东西转换成列表?这不是问题的根源吗?在
从堆栈跟踪:
brk(0x231ccf000) = 0x231ccf000
futex(0x1abb150, FUTEX_WAKE_PRIVATE, 1) = 1
sendto(3, "+\0\0\0\260\263\355\356\0\0\0\0\325\7\0\0\0\0\0\0data.index\0\0"..., 43, 0, NULL, 0) = 43
recvfrom(3, "Some text from my database."..., 491663, 0, NULL, NULL) = 491663
... [manymany times]
brk(0x2320d5000) = 0x2320d5000
.... manymany times
上面的示例在strace输出和strace-o日志文件python中运行myscript.py 不会停下来。它只会吃掉所有可用的ram并写入日志文件。在
更新。用imap代替map解决了我的问题。在
map()函数将项分块提供给给定函数。默认情况下,此chunksize的计算方式如下(link to source):
这可能导致在您的情况下块太大,并让进程耗尽内存。尝试手动设置块大小,如下所示:
^{pr2}$编辑:您还应该考虑重用db连接并在使用后关闭它们。现在您为每个项创建新的db连接,而不调用
close()
。在EDIT2:还要检查
while
循环是否进入无限循环(可以解释症状)。在EDIT3:基于您添加的回溯,map函数尝试将光标转换为列表,导致一次获取所有项。这是因为它想找出集合中有多少项。这是来自pool.py的
map()
代码的一部分:您可以尝试此操作以避免转换为列表:
由于
find()
操作返回的是映射函数的光标,而且您说在执行此操作时不会出现问题for item in db1.index.find(): create_barrel(item)
看起来create_barrel
函数正常。在你能试着限制在游标中返回的结果的数量,看看这是否有用?我认为语法应该是:
如果你可以尝试一下,看看它是否有帮助,它可能有助于找到问题的原因。在
EDIT1:我认为你使用map函数的方式是错误的——我认为你应该在MongoPython驱动程序中使用map_reduce,那样的话,map函数将由mongod进程执行。在
相关问题 更多 >
编程相关推荐