并行列表过滤

2024-09-14 18:10:52 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个需要根据某些条件筛选的项目列表。我想知道Dask是否可以并行地进行这种过滤,因为列表很长(有几千万条记录)

基本上,我需要做的是:

items = [
    {'type': 'dog', 'weight': 10},
    {'type': 'dog', 'weight': 20},
    {'type': 'cat', 'weight': 15},
    {'type': 'dog', 'weight': 30},
]

def item_is_valid(item):
    item_is_valid = True

    if item['type']=='cat':
        item_is_valid = False
    elif item['weight']>20:
        item_is_valid = False
    # ...
    # elif for n conditions

    return item_is_valid

items_filtered = [item for item in items if item_is_valid(item)]

有了Dask,我可以做到以下几点:

def item_is_valid_v2(item):
    """Return the whole item if valid."""
    item_is_valid = True

    if item['type']=='cat':
        item_is_valid = False
    elif item['weight']>20:
        item_is_valid = False
    # ...
    # elif for n conditions
    
    if item_is_valid:
        return item

results = []
item = []
for item in items:
    delayed = dask.delayed(item_is_valid)(item)
    results.append(delayed)

results = dask.compute(*results)

但是,我得到的结果包含一些None值,然后需要以某种非并行方式过滤掉这些值

({'type': 'dog', 'weight': 10}, {'type': 'dog', 'weight': 20}, None, None)

Tags: nonefalseforifistypeitemsitem
1条回答
网友
1楼 · 发布于 2024-09-14 18:10:52

也许bagAPI可以帮助您,这是一个粗略的伪代码:

import dask.bag as db

bag = db.from_sequence() # or better yet read it from disk
result = bag.filter(item_is_valid) # note this uses the first version (bool)

要检查这是否有效,请检查result.take(5)的结果,如果结果令人满意:

computed_result = result.compute()

相关问题 更多 >