我在Spark中遇到了一个问题,我正试图通过模仿here和here给出的建议来定义自己的UDAF来解决这个问题。我的最终目标是对给定窗口内的整数序列应用一系列复杂的位移位和逐位布尔操作。你知道吗
我遇到了一些问题,因为我的用例是在一个相当大的数据集上(约1亿行,为此我需要在2-7个元素长度的组上执行6次这样的位操作),因此我试图在scala中实现这一点。问题是,我对scala是全新的(我的主要语言是python),虽然scala本身看起来并没有那么难,但是一种新的语言加上UDAF类本身应用于window
的细节的结合让我有点困惑。你知道吗
为了使问题更具体,考虑一个pandas
DataFrame
:
keep = list(range(30))
for num in (3, 5, 11, 16, 22, 24):
keep.pop(num)
np.random.seed(100)
df = pd.DataFrame({
'id': 'A',
'date': pd.date_range('2018-06-01', '2018-06-30')[keep],
'num': np.random.randint(low=1, high=100, size=30)[keep]
})
产生:
id date num
0 A 2018-06-01 9
1 A 2018-06-02 25
2 A 2018-06-03 68
3 A 2018-06-05 80
4 A 2018-06-06 49
5 A 2018-06-08 95
6 A 2018-06-09 53
7 A 2018-06-10 99
8 A 2018-06-11 54
9 A 2018-06-12 67
10 A 2018-06-13 99
11 A 2018-06-15 35
12 A 2018-06-16 25
13 A 2018-06-17 16
14 A 2018-06-18 61
15 A 2018-06-19 59
16 A 2018-06-21 10
17 A 2018-06-22 94
18 A 2018-06-23 87
19 A 2018-06-24 3
20 A 2018-06-25 28
21 A 2018-06-26 5
22 A 2018-06-28 2
23 A 2018-06-29 14
我希望能够做的是,相对于当前行,找到天数,然后基于该值执行一些逐位操作。为了演示,留在熊猫(我必须做一个完整的外部连接,然后过滤来演示):
exp_df = df[['id', 'date']].merge(df, on='id') \ # full outer join on 'id'
.assign(days_diff = lambda df: (df['date_y'] - df['date_x']).dt.days) \ # number of days since my date of interest
.mask(lambda df: (df['days_diff'] > 3) | (df['days_diff'] < 0)) \ # nulls rows where days_diff isn't between 0 and 3
.dropna() \ # then filters the rows
.drop('date_y', axis='columns') \
.rename({'date_x': 'date', 'num': 'nums'}, axis='columns') \
.reset_index(drop=True)
exp_df[['nums', 'days_diff']] = exp_df[['nums', 'days_diff']].astype('int')
现在我执行逐位移位和其他逻辑:
# Extra values to add after bit-wise shifting (1 for shift of 1, 3 for shift of 2 ...)
additions = {val: sum(2**power for power in range(val)) for val in exp_df['days_diff'].unique()}
exp_df['shifted'] = np.left_shift(exp_df['nums'].values, exp_df['days_diff'].values) \
+ exp_df['days_diff'].apply(lambda val: additions[val])
在所有这些之后,exp_df
看起来如下(前10行):
id date nums days_diff shifted
0 A 2018-06-01 9 0 9
1 A 2018-06-01 25 1 51
2 A 2018-06-01 68 2 275
3 A 2018-06-02 25 0 25
4 A 2018-06-02 68 1 137
5 A 2018-06-02 80 3 647
6 A 2018-06-03 68 0 68
7 A 2018-06-03 80 2 323
8 A 2018-06-03 49 3 399
9 A 2018-06-05 80 0 80
现在我可以汇总:
exp_df.groupby('date')['shifted'].agg(lambda group_vals: np.bitwise_and.reduce(group_vals.values)
最终结果如下所示(如果我重新连接到原始的DataFrame
:
id date num shifted
0 A 2018-06-01 9 1
1 A 2018-06-02 25 1
2 A 2018-06-03 68 0
3 A 2018-06-05 80 64
4 A 2018-06-06 49 33
5 A 2018-06-08 95 3
6 A 2018-06-09 53 1
7 A 2018-06-10 99 1
8 A 2018-06-11 54 6
9 A 2018-06-12 67 3
10 A 2018-06-13 99 3
11 A 2018-06-15 35 3
12 A 2018-06-16 25 1
13 A 2018-06-17 16 0
14 A 2018-06-18 61 21
15 A 2018-06-19 59 35
16 A 2018-06-21 10 8
17 A 2018-06-22 94 6
18 A 2018-06-23 87 3
19 A 2018-06-24 3 1
20 A 2018-06-25 28 0
21 A 2018-06-26 5 1
22 A 2018-06-28 2 0
23 A 2018-06-29 14 14
好的,现在我已经演示了我的逻辑,我意识到我在Spark中基本上可以做同样的事情——在数据帧本身上执行完整的外部连接,然后过滤和聚合。你知道吗
我想知道的是,我是否可以避免执行完全联接,而是使用目标行作为输入,创建自己的UDAF在窗口函数上执行此聚合。基本上,我需要创建"days_diff"
列的等价物,以便执行所需的逻辑,这意味着将目标日期与指定窗口中的每个其他日期进行比较。这有可能吗?你知道吗
另外,我是否有理由担心使用self-join?我知道spark的所有处理都很慢,所以我很可能不需要担心。如果我使用self-join和我想象中的UDAF在一个窗口上进行所有这些操作,我应该期望性能类似吗?使用join-filter-aggregate方法,逻辑更加有序,更易于遵循,这是一个明显的优势。你知道吗
要知道的一件事是,我将在多个窗口上执行此逻辑。原则上,我可以在连接之后cache
过滤数据帧的最大版本,然后将其用于后续计算。你知道吗
目前没有回答
相关问题 更多 >
编程相关推荐