我将QuertyText存储在一个数据框中。将所有查询加载到中后,我希望对每个查询再次进行分析。目前,我有~50k要评估。所以,一个接一个的,需要很长时间
所以,我想实现concurrent.futures。如何将存储在fullAnalysis中的单个QueryText作为变量传递给concurrent.futures并返回输出
这是我的全部代码:
import pandas as pd
import time
import gensim
import sys
import warnings
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
fullAnalysis = pd.DataFrame()
def fetch_data(jFile = 'ProcessingDetails.json'):
print("Fetching data...please wait")
#read JSON file for latest dictionary file name
baselineDictionaryFileName = 'Dictionary/Dictionary_05-03-2020.json'
#copy data to pandas dataframe
labelled_data = pd.read_json(baselineDictionaryFileName)
#Add two more columns to get the most similar text and score
labelled_data['SimilarText'] = ''
labelled_data['SimilarityScore'] = float()
print("Data fetched from " + baselineDictionaryFileName + " and there are " + str(labelled_data.shape[0]) + " rows to be evalauted")
return labelled_data
def calculateScore(inputFunc):
warnings.filterwarnings("ignore", category=DeprecationWarning)
model = gensim.models.Word2Vec.load('w2v_model_bigdata')
inp = inputFunc
print(inp)
out = dict()
strEvaluation = inp.split("most_similar ",1)[1]
#while inp != 'quit':
split_inp = inp.split()
try:
if split_inp[0] == 'help':
pass
elif split_inp[0] == 'similarity' and len(split_inp) >= 3:
pass
elif split_inp[0] == 'most_similar' and len(split_inp) >= 2:
for pair in model.most_similar(positive=[split_inp[1]]):
out.update({pair[0]: pair[1]})
except KeyError as ke:
#print(str(ke) + "\n")
inp = input()
return out
def main():
with ThreadPoolExecutor(max_workers=5) as executor:
for i in range(len(fullAnalysis)):
text = fullAnalysis['QueryText'][i]
arg = 'most_similar'+ ' ' + text
#for item in executor.map(calculateScore, arg):
output = executor.map(calculateScore, arg)
return output
if __name__ == "__main__":
fullAnalysis = fetch_data()
results = main()
print(f'results: {results}')
Python全局解释器锁或GIL只允许一个线程控制Python解释器。由于函数
calculateScore
可能受cpu限制,并且需要解释器执行其字节码,因此使用线程可能没有什么好处。另一方面,如果它主要执行I/O操作,那么它将在大部分运行时间内放弃GIL,允许其他线程运行。但这里的情况似乎并非如此。您可能应该使用来自concurrent.futures
的ProcessPoolExecutor
(两种方法都尝试一下,然后查看):如果从
ProcessPoolExecutor
构造函数中省略max_workers
参数(或指定值None
),默认值将是您机器上的处理器数量(不是一个坏的默认值)。指定大于现有处理器数量的值没有意义如果您不需要将未来与原始请求联系起来,那么
the_futures
可以只是一个列表,但最简单的是,您甚至不用麻烦使用as_completed
方法:应该提到的是,启动
ProcessPoolExecutor
函数的代码应该位于由if __name__ = '__main__':
控制的块中。如果不是,您将进入一个递归循环,每个子进程都会启动ProcessPoolExecutor
。但这里似乎就是这样。也许你想一直使用ProcessPoolExecutor
此外:
我不知道这句话是什么意思
。。。在函数
calculateStore
中。它可能是一个i/o绑定语句。但这似乎并没有因电话而异。如果是这种情况,并且函数中没有修改model
,那么该语句不应该移出函数并只计算一次吗?那么这个函数显然会运行得更快(而且显然受到cpu的限制)此外:
异常块
。。。这令人费解。您输入的值在返回之前永远不会被使用。如果要暂停执行,则不会输出错误消息
在Booboo的帮助下,我能够更新代码以包含ProcessPoolExecutor。这是我的更新代码。总的来说,处理速度提高了60%以上
我确实遇到了一个处理问题,发现这个主题BrokenPoolProcess解决了这个问题
相关问题 更多 >
编程相关推荐