我有一个需要根据某些条件筛选的项目列表。我想知道Dask是否可以并行地进行这种过滤,因为列表很长(有几千万条记录)
基本上,我需要做的是:
items = [
{'type': 'dog', 'weight': 10},
{'type': 'dog', 'weight': 20},
{'type': 'cat', 'weight': 15},
{'type': 'dog', 'weight': 30},
]
def item_is_valid(item):
item_is_valid = True
if item['type']=='cat':
item_is_valid = False
elif item['weight']>20:
item_is_valid = False
# ...
# elif for n conditions
return item_is_valid
items_filtered = [item for item in items if item_is_valid(item)]
有了Dask,我可以做到以下几点:
def item_is_valid_v2(item):
"""Return the whole item if valid."""
item_is_valid = True
if item['type']=='cat':
item_is_valid = False
elif item['weight']>20:
item_is_valid = False
# ...
# elif for n conditions
if item_is_valid:
return item
results = []
item = []
for item in items:
delayed = dask.delayed(item_is_valid)(item)
results.append(delayed)
results = dask.compute(*results)
但是,我得到的结果包含一些None
值,然后需要以某种非并行方式过滤掉这些值
({'type': 'dog', 'weight': 10}, {'type': 'dog', 'weight': 20}, None, None)
也许
bag
API可以帮助您,这是一个粗略的伪代码:要检查这是否有效,请检查
result.take(5)
的结果,如果结果令人满意:相关问题 更多 >
编程相关推荐