火花嵌套RDD操作

2024-09-21 07:24:27 发布

您现在位置:Python中文网/ 问答频道 /正文

我有两个RDD说

   rdd1 = 
id            | created     | destroyed | price   
1            | 1            | 2            | 10        
2            | 1            | 5            | 11       
3            | 2            | 3            | 11        
4            | 3            | 4            | 12        
5            | 3            | 5            | 11       

rdd2 =

[1,2,3,4,5] # lets call these value as timestamps (ts)

rdd2基本上是使用范围(初始值、结束值、间隔)生成的。这里的参数可以不同。大小可以与rdd1相同或不同。其思想是根据rdd2的值,使用过滤criertia将记录从rdd1提取到rdd2(从rdd1获取的记录可以重复,正如您在输出中看到的那样)

过滤条件rdd1.created<;=ts<;rdd1.已销毁)

预期产量:

^{pr2}$

现在我想根据一些使用RDD2键的条件过滤RDD1。(如上所述)并返回连接RDD2键和RDD1过滤结果的结果

所以我要:

rdd2.map(lambda x : somefilterfunction(x, rdd1))  

def somefilterfunction(x, rdd1):
    filtered_rdd1 = rdd1.filter(rdd1[1] <= x).filter(rdd1[2] > x)
    prices = filtered_rdd1.map(lambda x : x[3])
    res = prices.collect()
    return (x, list(res))

我得到:

Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

我尝试过使用groupBy,但是由于这里rdd1的元素可以被一次又一次地重复,而与分组相比,分组只会将rdd1的每个元素都放到某个特定的槽中一次。在

现在唯一的方法就是使用一个普通的for循环,然后进行过滤,最后连接所有的东西。在

有什么建议吗?在


Tags: thelambdaltanmap记录条件rdd
1条回答
网友
1楼 · 发布于 2024-09-21 07:24:27

由于您使用常规范围,所以根本没有理由创建第二个RDD。您只需为每个记录生成特定范围内的值:

from __future__ import division # Required only for Python 2.x
from math import ceil
from itertools import takewhile

rdd1 = sc.parallelize([
    (1, 1, 2, 10),        
    (2, 1, 5, 11),       
    (3, 2, 3, 11),        
    (4, 3, 4, 12),        
    (5, 3, 5, 11),  
])


def generate(start, end, step):
    def _generate(id, created, destroyed, price):
        # Smallest ts >= created
        start_for_record = int(ceil((created - start) / step) * step + start)
        rng = takewhile(
            lambda x: created <= x < destroyed,
            xrange(start_for_record, end, step)) # In Python 3.x use range
        for i in rng:
            yield i, price

    return _generate

result = rdd1.flatMap(lambda x: generate(1, 6, 1)(*x)).groupByKey()

结果和结果:

^{pr2}$

相关问题 更多 >

    热门问题