<p>如果您正在寻找动态期间,首先将日期转换为时间戳,然后从今天开始减去所有时间戳,然后将(整数)除以要分组的时间间隔的时间戳。下面的代码按5天的间隔对行进行分组。你知道吗</p>
<pre><code>import pyspark.sql.functions as F
from datetime import datetime
# todays timestamp
Today = datetime.today().timestamp()
# how many timestamp is today
DAY_TIMESTAMPS = 24 * 60 * 60
df = sc.parallelize([
('2017-06-02 00:00:00','tim', 'page 1' ),
('2017-07-20 00:00:00','tim', 'page 1' ),
('2017-07-21 00:00:00','john', 'page 2' ),
('2017-07-22 00:00:00','john', 'page 2' ),
('2017-08-23 00:00:00','john', 'page 2' )
]).toDF(("datetime","user","page" ))
# group by five days
timeInterval = 5* DAY_TIMESTAMPS
df \
.withColumn('timestamp', F.unix_timestamp(F.to_date('datetime', 'yyyy-MM-dd HH:mm:ss'))) \
.withColumn('timeIntervalBefore', ((Today-F.col('timestamp'))/(timeInterval)).cast('integer')) \
.groupBy('timeIntervalBefore', 'page') \
.agg(F.count('user').alias('number of users')).show()
</code></pre>
<p>结果:</p>
<pre><code>+ + + -+
|timeIntervalBefore| page|number of users|
+ + + -+
| 70|page 2| 2|
| 80|page 1| 1|
| 70|page 1| 1|
| 64|page 2| 1|
+ + + -+
</code></pre>
<p>如果您需要估计时间段的日期:</p>
<pre><code>df \
.withColumn('timestamp', F.unix_timestamp(F.to_date('datetime', 'yyyy-MM-dd HH:mm:ss'))) \
.withColumn('timeIntervalBefore', ((Today-F.col('timestamp'))/(timeInterval)).cast('integer')) \
.groupBy('timeIntervalBefore', 'page') \
.agg(
F.count('user').alias('number_of_users'),
F.min('timestamp').alias('FirstDay'),
F.max('timestamp').alias('LastDay')) \
.select(
'page',
'number_of_users',
F.from_unixtime('firstday').alias('firstDay'),
F.from_unixtime('firstday').alias('lastDay')).show()
</code></pre>
<p>结果:</p>
<pre><code>+ + -+ -+ -+
| page|number_of_users| firstDay| lastDay|
+ + -+ -+ -+
|page 2| 2|2017-07-21 00:00:00|2017-07-21 00:00:00|
|page 1| 1|2017-06-02 00:00:00|2017-06-02 00:00:00|
|page 1| 1|2017-07-20 00:00:00|2017-07-20 00:00:00|
|page 2| 1|2017-08-23 00:00:00|2017-08-23 00:00:00|
+ + -+ -+ -+
</code></pre>