如何在保持订单的同时使用生成器上的线程?

2024-05-03 11:29:34 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个简单的代码,它对生成器中的每个项目运行一个GET请求,我正在尝试加速:

def stream(self, records):
    # type(records) = <type 'generator'>
    for record in records:
        # record = OrderedDict([('_time', '1518287568'), ('data', '5552267792')])
        output = rest_api_lookup(record[self.input_field])

        record.update(output)
        yield record

现在,这是在一个线程上运行的,因为每次REST调用都要等到上一个REST调用完成。在

在使用这个伟大的答案(https://stackoverflow.com/a/28463266/1150923)之前,我已经在Python中使用了一个列表中的多线程处理,但是我不确定如何在生成器而不是列表上重用相同的策略。在

我从一个开发伙伴那里得到了一些建议,他建议我将生成器分成100个元素列表,然后关闭池,但我不知道如何从生成器创建这些列表。在

我还需要保持原来的顺序,因为我需要yield record按正确的顺序。在


Tags: 项目代码selfrest列表outputstreamget
2条回答

我假设您不想首先将生成器records转换为列表。加快处理速度的一种方法是将记录传递到ThreadPoolExecutor块中。执行器将为块的所有项同时处理rest_api_lookup。然后你只需要“取消”你的结果。下面是一些正在运行的示例代码(它不使用类,抱歉,但我希望它显示了原理):

from concurrent.futures import ThreadPoolExecutor
from time import sleep

pool = ThreadPoolExecutor(8) # 8 threads, adjust to taste and # of cores

def records():
    # simulates records generator
    for i in range(100):
        yield {'a': i}


def rest_api_lookup(a):
    # simulates REST call :)
    sleep(0.1)
    return {'b': -a}


def stream(records):
    def update_fun(record):
        output = rest_api_lookup(record['a'])
        record.update(output)
        return record
    chunk = []
    for record in records:
        # submit update_fun(record) into pool, keep resulting Future
        chunk.append(pool.submit(update_fun, record))
        if len(chunk) == 8:
            yield chunk
            chunk = []
    if chunk:
        yield chunk

def unchunk(chunk_gen):
    """Flattens a generator of Future chunks into a generator of Future results."""
    for chunk in chunk_gen:
        for f in chunk:
            yield f.result() # get result from Future

# Now iterate over all results in same order as generated by records()    
for result in unchunk(stream(records())):
    print(result)

啊!在

更新:我在模拟的REST调用中添加了一个sleep,使其更真实。这个分块版本在我的机器上只需1.5秒。顺序版本需要10秒(正如预期的那样,100*0.1s=10s)。在

下面是一个如何使用concurrent.futures进行操作的示例:

from concurrent import futures
from concurrent.futures import ThreadPoolExecutor

class YourClass(object):

    def stream(self, records):
        for record in records:
            output = rest_api_lookup(record[self.input_field])
            record.update(output)
        # process your list and yield back result.
        yield {"result_key": "whatever the result is"}

    def run_parallel(self):
        """ Use this method to do the parallel processing """

        # The important part - concurrent futures 
        # - set number of workers as the number of jobs to process - suggest 4, but may differ 
        #   this will depend on how many threads you want to run in parallel
        with ThreadPoolExecutor(4) as executor:
            # Use list jobs for concurrent futures
            # Use list scraped_results for results
            jobs = []
            parallel_results = []

            # Pass some keyword arguments if needed - per job  
            record1 = {} # your values for record1 - if need more - create
            record2 = {} # your values for record2 - if need more - create
            record3 = {} # your values for record3 - if need more - create
            record4 = {} # your values for record4 - if need more - create

            list_of_records = [[record1, record2], [record3, record4],]


            for records in list_of_records:
                # Here we iterate 'number of records' times, could be different
                # We're adding stream, could be different function per call
                jobs.append(executor.submit(self.stream, records))

            # Once parallel processing is complete, iterate over results 
            # append results to final processing without any networking
            for job in futures.as_completed(jobs):
                # Read result from future
                result = job.result()
                # Append to the list of results
                parallel_results.append(result)
            # Use sorted to sort by key to preserve order
            parallel_results = sorted(parallel_results, key=lambda k: k['result_key']) 
            # Iterate over results streamed and do whatever is needed
            for result is parallel_results:
                print("Do something with me {}".format(result))

相关问题 更多 >