基于另一个数据帧的日期范围分割多个Dask数据帧的最快方法

2024-06-16 14:51:30 发布

您现在位置:Python中文网/ 问答频道 /正文

从ddf2 Dask数据帧中仅选择df1日期范围中的日期的最快方法是什么?所有超出范围的日期都应删除

df1-具有开始-结束日期范围的数据帧

        start       end
01 2018-06-25 2018-06-29
02 2019-05-06 2019-05-13
...

dd2-Dask数据帧(30M行)

(*)必须选择标记行

    date        value1
    2018-01-01  23
    2018-01-01  24
    2018-01-02  545
    2018-01-03  433
    2018-01-04  23
    *2018-06-25 234
    *2018-06-25 50
    *2018-06-25 120
    *2018-06-26 22
    *2018-06-27 32       
    *2018-06-27 123
    *2018-06-28 603
    *2018-06-29 625
    2019-01-01  734
    2019-01-01  241
    2019-01-01  231
    2019-01-02  211
    2019-01-02  214
    2019-05-05  234
    2019-05-05  111
    *2019-05-06 846
    *2019-05-06 231
    *2019-05-07 654
    *2019-05-07 119
    *2019-05-08 212
    *2019-05-08 122
    *2019-05-06 765
    *2019-05-13 231
    *2019-05-13 213
    *2019-05-13 443
    2019-05-14  321
    2019-05-14  231
    2019-05-15  123
...

输出:Dask数据帧需要附加切片

date        value1   
2018-06-25  234
2018-06-25  50
2018-06-25  120
2018-06-26  22
2018-06-27  32
2018-06-27  123
2018-06-28  603
2018-06-29  625
2019-05-06  846
2019-05-06  231
2019-05-07  654
2019-05-07  119
2019-05-08  212
2019-05-08  122
2019-05-06  765
2019-05-13  231
2019-05-13  213
2019-05-13  443

此代码正在工作,但我需要通过start&;df1中的结束日期范围用于过滤dd2,无需手动硬编码日期

dd2 = dd2[
    (dd2['date'] >= '2018-06-25') & (dd2['date'] <= '2018-06-29') |
    (dd2['date'] >= '2019-05-06') & (dd2['date'] <= '2019-05-13')
]

Tags: 数据方法代码标记date切片startdask
2条回答

这看起来可能有用:

from itertools import starmap

date_ddf = ddf.set_index("date")
slices = starmap(slice, df.values)

# There might be a more "Pandas-esque" way to do this, but I 
# don't know it yet.
sliced = map(date_ddf.__getitem__, slices)

# We have to reify the `map` object into a `list` for Dask.
concat_ddf = dd.concat(list(sliced))

concat_ddf.compute()

每次通过date_ddf.__getitem__上的map都会返回原始帧的一部分,因此dd.concat需要将其恢复到一起

这里是另一种方法,但使用列表理解按索引进行切片,并验证(最后)切片是否正确完成

进口

from datetime import datetime

import dask.dataframe as dd
import numpy as np
import pandas as pd
from dask import compute

指定可调输入

# Start date from which to create dummy data to use
data_start_date = "1700-01-01"
# Frequency of dummy data created (hourly)
data_freq = "H"
# number of rows of data to generate
nrows = 3_000_000
# Dask DataFrame chunk size; will be used later to determine how many files
# (of the dummy data generated here) will be exported to disk
chunksize = 75_000

使用切片边界日期生成df1

df1 = pd.DataFrame.from_records(
    [
        {"start": datetime(1850, 1, 6, 0, 0, 0), "end": datetime(1870, 9, 4, 23, 0, 0)},
        {"start": datetime(1880, 7, 6, 0, 0, 0), "end": datetime(1895, 4, 9, 23, 0, 0)},
        {"start": datetime(1910, 11, 25, 0, 0, 0), "end": datetime(1915, 5, 5, 23, 0, 0)},
        {"start": datetime(1930, 10, 8, 0, 0, 0), "end": datetime(1940, 2, 8, 23, 0, 0)},
        {"start": datetime(1945, 9, 9, 0, 0, 0), "end": datetime(1950, 1, 3, 23, 0, 0)},
    ]
)
print(df1)
       start                 end
0 1850-01-06 1870-09-04 23:00:00
1 1880-07-06 1895-04-09 23:00:00
2 1910-11-25 1915-05-05 23:00:00
3 1930-10-08 1940-02-08 23:00:00
4 1945-09-09 1950-01-03 23:00:00

创建虚拟数据

  • 我们将在这里指定一个名为wanted的列,所有行都是False
df = pd.DataFrame(
    np.random.rand(nrows),
    index=pd.date_range(data_start_date, periods=nrows, freq="h"),
    columns=["value1"],
)
df.index.name = "date"
df["wanted"] = False
print(df.head())
                       value1  wanted
date                                 
1700-01-01 00:00:00  0.504119   False
1700-01-01 01:00:00  0.582796   False
1700-01-01 02:00:00  0.383905   False
1700-01-01 03:00:00  0.995389   False
1700-01-01 04:00:00  0.592130   False

现在,如果想要的行与df1中的行具有相同的日期,我们将把它们更改为True

  • 这样做的原因是为了我们以后可以检查切片是否正确
  • 这个步骤和wanted列在您的实际用例中是不必要的,但只需要检查我们的工作
for _, row in df1.iterrows():
    df.loc[row['start']: row['end'], "wanted"] = True
df = df.reset_index()
print(df.head())
print(df["wanted"].value_counts().to_frame())
                 date    value1  wanted
0 1700-01-01 00:00:00  0.504119   False
1 1700-01-01 01:00:00  0.582796   False
2 1700-01-01 02:00:00  0.383905   False
3 1700-01-01 03:00:00  0.995389   False
4 1700-01-01 04:00:00  0.592130   False
        wanted
False  2530800
True    469200

请注意,在wanted列上调用.value_counts()会显示此列中的True值的数量,如果我们正确地对数据进行了切片,我们应该预期这些值的数量。这是使用pandas.DataFrame中的数据完成的,但稍后我们将使用dask.DataFrame中的相同数据完成

现在,我们将数据导出到本地的多个.parquet文件中

  • 通常,我们希望从直接从磁盘加载到dask的数据开始
  • 要将数据导出到多个.parquet文件,我们将把pandas.DataFrame转换为dask.DataFrame,然后设置chunksize参数,该参数将确定创建了多少文件(chunksize行将放置在每个导出的文件中-source
ddf = dd.from_pandas(df, chunksize=chunksize)
ddf.to_parquet("data", engine="auto")

现在将所有.parquet文件直接加载到单个dask.DataFrame中,并将date列设置为索引

  • 设置索引的计算代价很高,但我们只在将文件直接读取到dask.DataFrame中时才指定它,而不是在之后更改它
ddf = dd.read_parquet(
    "data/",
    dtype={"value1": "float64"},
    index="date",
    parse_dates=["date"],
)
print(ddf)
Dask DataFrame Structure:
                      value1 wanted
npartitions=40                     
1700-01-01 00:00:00  float64   bool
1708-07-23 00:00:00      ...    ...
...                      ...    ...
2033-09-07 00:00:00      ...    ...
2042-03-28 23:00:00      ...    ...
Dask Name: read-parquet, 40 tasks

现在,我们准备使用^{中的日期进行切片。我们将使用列表理解来迭代df1中的每一行,使用该行对数据进行切片(在dask.DataFrame中),然后调用dd.concat(正如@joebeeson所做的那样)

slices = dd.concat([ddf.loc[row['start']: row['end']] for _, row in df1.iterrows()])

最后,在这个延迟的dask对象列表上进行计算,得到一个pandas.DataFrame切片,以给出所需的日期

ddf_sliced_computed = compute(slices)[0].reset_index()
print(ddf_sliced_computed.head())
print(ddf_sliced_computed["wanted"].value_counts().to_frame())
                 date    value1  wanted
0 1850-01-06 00:00:00  0.671781    True
1 1850-01-06 01:00:00  0.455022    True
2 1850-01-06 02:00:00  0.490212    True
3 1850-01-06 03:00:00  0.240171    True
4 1850-01-06 04:00:00  0.162088    True
      wanted
True  469200

如您所见,我们在wanted列中切掉了具有正确数量的True值的行。我们可以使用前面用来生成后来写入磁盘的伪数据的pandas.DataFrame来显式地验证这一点

assert all(ddf_sliced_computed["wanted"] == True)
assert (
    df[df["wanted"] == True]
    .reset_index(drop=True)
    .equals(ddf_sliced_computed[ddf_sliced_computed["wanted"] == True])
)

注释

  1. 这需要3百万行。您正在处理3000万行,因此如果要检查计时等,您必须修改在开始时生成的虚拟数据

相关问题 更多 >