<p>这里是另一种方法,但使用列表理解按索引进行切片,并验证(最后)切片是否正确完成</p>
<p>进口</p>
<pre><code>from datetime import datetime
import dask.dataframe as dd
import numpy as np
import pandas as pd
from dask import compute
</code></pre>
<p>指定可调输入</p>
<pre><code># Start date from which to create dummy data to use
data_start_date = "1700-01-01"
# Frequency of dummy data created (hourly)
data_freq = "H"
# number of rows of data to generate
nrows = 3_000_000
# Dask DataFrame chunk size; will be used later to determine how many files
# (of the dummy data generated here) will be exported to disk
chunksize = 75_000
</code></pre>
<p>使用切片边界日期生成<code>df1</code></p>
<pre><code>df1 = pd.DataFrame.from_records(
[
{"start": datetime(1850, 1, 6, 0, 0, 0), "end": datetime(1870, 9, 4, 23, 0, 0)},
{"start": datetime(1880, 7, 6, 0, 0, 0), "end": datetime(1895, 4, 9, 23, 0, 0)},
{"start": datetime(1910, 11, 25, 0, 0, 0), "end": datetime(1915, 5, 5, 23, 0, 0)},
{"start": datetime(1930, 10, 8, 0, 0, 0), "end": datetime(1940, 2, 8, 23, 0, 0)},
{"start": datetime(1945, 9, 9, 0, 0, 0), "end": datetime(1950, 1, 3, 23, 0, 0)},
]
)
print(df1)
start end
0 1850-01-06 1870-09-04 23:00:00
1 1880-07-06 1895-04-09 23:00:00
2 1910-11-25 1915-05-05 23:00:00
3 1930-10-08 1940-02-08 23:00:00
4 1945-09-09 1950-01-03 23:00:00
</code></pre>
<p>创建虚拟数据</p>
<ul>
<li>我们将在这里指定一个名为<code>wanted</code>的列,所有行都是<code>False</code></li>
</ul>
<pre><code>df = pd.DataFrame(
np.random.rand(nrows),
index=pd.date_range(data_start_date, periods=nrows, freq="h"),
columns=["value1"],
)
df.index.name = "date"
df["wanted"] = False
print(df.head())
value1 wanted
date
1700-01-01 00:00:00 0.504119 False
1700-01-01 01:00:00 0.582796 False
1700-01-01 02:00:00 0.383905 False
1700-01-01 03:00:00 0.995389 False
1700-01-01 04:00:00 0.592130 False
</code></pre>
<p>现在,如果想要的行与<code>df1</code>中的行具有相同的日期,我们将把它们更改为<code>True</code></p>
<ul>
<li>这样做的原因是为了我们以后可以检查切片是否正确</li>
<li>这个步骤和<code>wanted</code>列在您的实际用例中是不必要的,但只需要检查我们的工作</li>
</ul>
<pre><code>for _, row in df1.iterrows():
df.loc[row['start']: row['end'], "wanted"] = True
df = df.reset_index()
print(df.head())
print(df["wanted"].value_counts().to_frame())
date value1 wanted
0 1700-01-01 00:00:00 0.504119 False
1 1700-01-01 01:00:00 0.582796 False
2 1700-01-01 02:00:00 0.383905 False
3 1700-01-01 03:00:00 0.995389 False
4 1700-01-01 04:00:00 0.592130 False
wanted
False 2530800
True 469200
</code></pre>
<p>请注意,在<code>wanted</code>列上调用<code>.value_counts()</code>会显示此列中的<code>True</code>值的数量,如果我们正确地对数据进行了切片,我们应该预期这些值的数量。这是使用<code>pandas.DataFrame</code>中的数据完成的,但稍后我们将使用<code>dask.DataFrame</code>中的相同数据完成</p>
<p>现在,我们将数据导出到本地的多个<code>.parquet</code>文件中</p>
<ul>
<li>通常,我们希望从直接从磁盘加载到<code>dask</code>的数据开始</li>
<li>要将数据导出到多个<code>.parquet</code>文件,我们将把<code>pandas.DataFrame</code>转换为<code>dask.DataFrame</code>,然后设置<code>chunksize</code>参数,该参数将确定创建了多少文件(<code>chunksize</code>行将放置在每个导出的文件中-<a href="https://stackoverflow.com/a/63780962/4057186">source</a>)</li>
</ul>
<pre><code>ddf = dd.from_pandas(df, chunksize=chunksize)
ddf.to_parquet("data", engine="auto")
</code></pre>
<p>现在将所有<code>.parquet</code>文件直接加载到单个<code>dask.DataFrame</code>中,并将<code>date</code>列设置为索引</p>
<ul>
<li>设置索引的计算代价很高,但我们只在将文件直接读取到<code>dask.DataFrame</code>中时才指定它,而不是在之后更改它</li>
</ul>
<pre><code>ddf = dd.read_parquet(
"data/",
dtype={"value1": "float64"},
index="date",
parse_dates=["date"],
)
print(ddf)
Dask DataFrame Structure:
value1 wanted
npartitions=40
1700-01-01 00:00:00 float64 bool
1708-07-23 00:00:00 ... ...
... ... ...
2033-09-07 00:00:00 ... ...
2042-03-28 23:00:00 ... ...
Dask Name: read-parquet, 40 tasks
</code></pre>
<p>现在,我们准备使用^{<cd1>中的日期进行切片。我们将使用列表理解来迭代<code>df1</code>中的每一行,使用该行对数据进行切片(在<code>dask.DataFrame</code>中),然后调用<code>dd.concat</code>(正如@joebeeson所做的那样)</p>
<pre><code>slices = dd.concat([ddf.loc[row['start']: row['end']] for _, row in df1.iterrows()])
</code></pre>
<p>最后,在这个延迟的<code>dask</code>对象列表上进行计算,得到一个<code>pandas.DataFrame</code>切片,以给出所需的日期</p>
<pre><code>ddf_sliced_computed = compute(slices)[0].reset_index()
print(ddf_sliced_computed.head())
print(ddf_sliced_computed["wanted"].value_counts().to_frame())
date value1 wanted
0 1850-01-06 00:00:00 0.671781 True
1 1850-01-06 01:00:00 0.455022 True
2 1850-01-06 02:00:00 0.490212 True
3 1850-01-06 03:00:00 0.240171 True
4 1850-01-06 04:00:00 0.162088 True
wanted
True 469200
</code></pre>
<p>如您所见,我们在<code>wanted</code>列中切掉了具有正确数量的<code>True</code>值的行。我们可以使用前面用来生成后来写入磁盘的伪数据的<code>pandas.DataFrame</code>来显式地验证这一点</p>
<pre><code>assert all(ddf_sliced_computed["wanted"] == True)
assert (
df[df["wanted"] == True]
.reset_index(drop=True)
.equals(ddf_sliced_computed[ddf_sliced_computed["wanted"] == True])
)
</code></pre>
<p><strong>注释</strong></p>
<ol>
<li>这需要3百万行。您正在处理3000万行,因此如果要检查计时等,您必须修改在开始时生成的虚拟数据</li>
</ol>