在for语句中使用concurrent.futures

2024-10-04 05:32:50 发布

您现在位置:Python中文网/ 问答频道 /正文

我将QuertyText存储在一个数据框中。将所有查询加载到中后,我希望对每个查询再次进行分析。目前,我有~50k要评估。所以,一个接一个的,需要很长时间

所以,我想实现concurrent.futures。如何将存储在fullAnalysis中的单个QueryText作为变量传递给concurrent.futures并返回输出

这是我的全部代码:

import pandas as pd
import time
import gensim
import sys
import warnings

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

fullAnalysis = pd.DataFrame()

def fetch_data(jFile = 'ProcessingDetails.json'):
    print("Fetching data...please wait")

    #read JSON file for latest dictionary file name
    baselineDictionaryFileName = 'Dictionary/Dictionary_05-03-2020.json'

    #copy data to pandas dataframe
    labelled_data = pd.read_json(baselineDictionaryFileName)

    #Add two more columns to get the most similar text and score
    labelled_data['SimilarText'] = ''
    labelled_data['SimilarityScore'] = float()

    print("Data fetched from " + baselineDictionaryFileName + " and there are " + str(labelled_data.shape[0]) + " rows to be evalauted")

    return labelled_data


def calculateScore(inputFunc):
    warnings.filterwarnings("ignore", category=DeprecationWarning) 

    model = gensim.models.Word2Vec.load('w2v_model_bigdata')

    inp = inputFunc
    print(inp)
    out = dict()

    strEvaluation = inp.split("most_similar ",1)[1]

    #while inp != 'quit':
    split_inp = inp.split()

    try:
        if split_inp[0] == 'help':
            pass
        elif split_inp[0] == 'similarity' and len(split_inp) >= 3:
            pass
        elif split_inp[0] == 'most_similar' and len(split_inp) >= 2:
            for pair in model.most_similar(positive=[split_inp[1]]):
                out.update({pair[0]: pair[1]})

    except KeyError as ke:
        #print(str(ke) + "\n")
        inp = input()
    return out

def main():
    with ThreadPoolExecutor(max_workers=5) as executor:
        for i in range(len(fullAnalysis)):
            text = fullAnalysis['QueryText'][i]
            arg = 'most_similar'+ ' ' + text
            #for item in executor.map(calculateScore, arg):
            output = executor.map(calculateScore, arg)

    return output

if __name__ == "__main__":
    fullAnalysis = fetch_data()
    results = main()
    print(f'results: {results}')

Tags: andimportmostfordataasconcurrentpd
2条回答

Python全局解释器锁或GIL只允许一个线程控制Python解释器。由于函数calculateScore可能受cpu限制,并且需要解释器执行其字节码,因此使用线程可能没有什么好处。另一方面,如果它主要执行I/O操作,那么它将在大部分运行时间内放弃GIL,允许其他线程运行。但这里的情况似乎并非如此。您可能应该使用来自concurrent.futuresProcessPoolExecutor(两种方法都尝试一下,然后查看):

def main():
    with ProcessPoolExecutor(max_workers=None) as executor:
        the_futures = {}
        for i in range(len(fullAnalysis)):
            text = fullAnalysis['QueryText'][i]
            arg = 'most_similar'+ ' ' + text
            future = executor.submit(calculateScore, arg)
            the_futures[future] = i # map future to request
        for future in as_completed(the_futures): # results as they become available not necessarily the order of submission
            i = the_futures[future] # the original index
            result = future.result() # the result

如果从ProcessPoolExecutor构造函数中省略max_workers参数(或指定值None),默认值将是您机器上的处理器数量(不是一个坏的默认值)。指定大于现有处理器数量的值没有意义

如果您不需要将未来与原始请求联系起来,那么the_futures可以只是一个列表,但最简单的是,您甚至不用麻烦使用as_completed方法:

def main():
    with ProcessPoolExecutor(max_workers=5) as executor:
        the_futures = []
        for i in range(len(fullAnalysis)):
            text = fullAnalysis['QueryText'][i]
            arg = 'most_similar'+ ' ' + text
            future = executor.submit(calculateScore, arg)
            the_futures.append(future)
        # wait for the completion of all the results and return them all:
        results = [f.result() for f in the_futures()] # results in creation order
        return results 

应该提到的是,启动ProcessPoolExecutor函数的代码应该位于由if __name__ = '__main__':控制的块中。如果不是,您将进入一个递归循环,每个子进程都会启动ProcessPoolExecutor。但这里似乎就是这样。也许你想一直使用ProcessPoolExecutor

此外:

我不知道这句话是什么意思

model = gensim.models.Word2Vec.load('w2v_model_bigdata')

。。。在函数calculateStore中。它可能是一个i/o绑定语句。但这似乎并没有因电话而异。如果是这种情况,并且函数中没有修改model,那么该语句不应该移出函数并只计算一次吗?那么这个函数显然会运行得更快(而且显然受到cpu的限制)

此外:

异常块

except KeyError as ke:
    #print(str(ke) + "\n")
    inp = input()

。。。这令人费解。您输入的值在返回之前永远不会被使用。如果要暂停执行,则不会输出错误消息

在Booboo的帮助下,我能够更新代码以包含ProcessPoolExecutor。这是我的更新代码。总的来说,处理速度提高了60%以上

我确实遇到了一个处理问题,发现这个主题BrokenPoolProcess解决了这个问题

output = {}
thePool = {}

def main(labelled_data, dictionaryRevised):

    args = sys.argv[1:]

    with ProcessPoolExecutor(max_workers=None) as executor:
        for i in range(len(labelled_data)):
            text = labelled_data['QueryText'][i]
            arg = 'most_similar'+ ' '+ text

            output = winprocess.submit(
            executor, calculateScore, arg
            )
            thePool[output] = i  #original index for future to request


        for output in as_completed(thePool): # results as they become available not necessarily the order of submission
            i = thePool[output] # the original index
            text = labelled_data['QueryText'][i]
            result = output.result() # the result

            maximumKey = max(result.items(), key=operator.itemgetter(1))[0]
            maximumValue = result.get(maximumKey)

            labelled_data['SimilarText'][i] = maximumKey
            labelled_data['SimilarityScore'][i] = maximumValue


    return labelled_data, dictionaryRevised

if __name__ == "__main__":
    start = time.perf_counter()

    print("Starting to evaluate Query Text for labelling...")

    output_Labelled_Data, output_dictionary_revised = preProcessor()

    output,dictionary = main(output_Labelled_Data, output_dictionary_revised)


    finish = time.perf_counter()
    print(f'Finished in {round(finish-start, 2)} second(s)')

相关问题 更多 >