python dask DataFrame,是否支持(简单的并行化)行?

2024-05-18 12:34:15 发布

您现在位置:Python中文网/ 问答频道 /正文

我最近发现了dask模块,它旨在成为一个易于使用的python并行处理模块。对我来说最大的卖点是它和熊猫一起工作。

在阅读了它的手册之后,我找不到一种方法来完成这个简单的并行化任务:

ts.apply(func) # for pandas series
df.apply(func, axis = 1) # for pandas DF row apply

现在,为了在达斯克实现这一点,阿法克

ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame

这是难看的语法,实际上比直接

df.apply(func, axis = 1) # for pandas DF row apply

有什么建议吗?

编辑:谢谢@MRocklin的地图功能。这似乎比普通大熊猫要慢。这与熊猫GIL释放问题有关还是我做错了?

import dask.dataframe as dd
s = pd.Series([10000]*120)
ds = dd.from_pandas(s, npartitions = 3)

def slow_func(k):
    A = np.random.normal(size = k) # k = 10000
    s = 0
    for a in A:
        if a > 0:
            s += 1
        else:
            s -= 1
    return s

s.apply(slow_func) # 0.43 sec
ds.map(slow_func).compute() # 2.04 sec

Tags: 模块pandasdffordssecdddask
2条回答

map_partitions

可以使用map_partitions函数将函数应用于数据帧的所有分区。

df.map_partitions(func, columns=...)

请注意,func一次只能得到数据集的一部分,而不是像使用pandas apply那样的整个数据集(如果要进行并行处理,您可能不需要这样做)

map/apply

您可以使用map在序列中按行映射函数

df.mycolumn.map(func)

您可以使用apply跨数据帧逐行映射函数

df.apply(func, axis=1)

线程与进程

从0.6.0版起dask.dataframes与线程并行。自定义Python函数不会从基于线程的并行性中得到太多好处。你可以试试流程

df = dd.read_csv(...)

df.map_partitions(func, columns=...).compute(scheduler='processes')

但要避免apply

但是,您应该避免在Pandas和Dask中使用定制Python函数apply。这常常是表现不佳的一个原因。可能是,如果您找到了一种以矢量化方式执行操作的方法,那么您的Pandas代码可能会快100倍,而且您根本不需要dask.dataframe。

考虑numba

对于您的特定问题,您可以考虑^{}。这将显著提高您的性能。

In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: s = pd.Series([10000]*120)

In [4]: %paste
def slow_func(k):
    A = np.random.normal(size = k) # k = 10000
    s = 0
    for a in A:
        if a > 0:
            s += 1
        else:
            s -= 1
    return s
## -- End pasted text --

In [5]: %time _ = s.apply(slow_func)
CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms
Wall time: 347 ms

In [6]: import numba
In [7]: fast_func = numba.jit(slow_func)

In [8]: %time _ = s.apply(fast_func)  # First time incurs compilation overhead
CPU times: user 179 ms, sys: 0 ns, total: 179 ms
Wall time: 175 ms

In [9]: %time _ = s.apply(fast_func)  # Subsequent times are all gain
CPU times: user 68.8 ms, sys: 27 µs, total: 68.8 ms
Wall time: 68.7 ms

免责声明,我为一家同时生产numbadask的公司工作,并雇佣了许多pandas开发人员。

从vdask.dataframe开始。将委托责任应用于map_partitions

@insert_meta_param_description(pad=12)
def apply(self, func, convert_dtype=True, meta=no_default, args=(), **kwds):
    """ Parallel version of pandas.Series.apply
    ...
    """
    if meta is no_default:
        msg = ("`meta` is not specified, inferred from partial data. "
               "Please provide `meta` if the result is unexpected.\n"
               "  Before: .apply(func)\n"
               "  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result\n"
               "  or:     .apply(func, meta=('x', 'f8'))            for series result")
        warnings.warn(msg)

        meta = _emulate(M.apply, self._meta_nonempty, func,
                        convert_dtype=convert_dtype,
                        args=args, **kwds)

    return map_partitions(M.apply, self, func,
                          convert_dtype, args, meta=meta, **kwds)

相关问题 更多 >

    热门问题