在Python中创建自定义Spark RDD

2024-09-30 01:31:56 发布

您现在位置:Python中文网/ 问答频道 /正文

是否可以在Python中扩展Spark的RDDs来添加自定义操作符?如果不可能,那么如何为扩展RDD的类包装Scala代码,例如下面的: http://blog.madhukaraphatak.com/extending-spark-api/

编辑:我正在尝试创建一个新的RDD,比如PersonRDD,并在PersonRDD上添加一组新操作符,例如。个人计算机年收入(). 根据下面的链接,在Python中这样做并不容易。然而,由于这是一个旧的线程,我想知道是否有任何新的更新。如果没有,我希望使用Scala来实现,但是我不确定如何使用Py4J(mail)从Python调用这个类-归档文件.us.apache.org/mod\u mbox/spark user/201308.mbox/…)

任何建议或帮助将不胜感激。在

曼迪


Tags: 代码comapihttp编辑blogsparkrdd
2条回答

我也遇到了类似的问题,虽然到目前为止我还没有在扩展版上测试普通RDD的全部功能,但它的工作情况与预期的一样。如果我不需要在类中添加新的方法,那么最好的解决方案是在类中添加新的方法。以下是代码的一小部分:

from pyspark.rdd import RDD, PipelinedRDD

class CustomRDD(RDD):
    def __init__(self, rdd, first=True):
        if first:
            rdd = custom_parser(rdd)
        self._jrdd = rdd._jrdd
        self.is_cached = rdd.is_cached
        self.is_checkpointed = rdd.is_checkpointed
        self.ctx = rdd.ctx
        self._jrdd_deserializer = rdd._jrdd_deserializer
        self._id = rdd._id
        self.partitioner = rdd.partitioner

    def mapPartitionsWithIndex(self, f, preservesPartition=False):
        return CustomRDD(PipelinedRDD(self, f, preservesPartition), False)

    def union(self, other):
        return WebtrendsRDD(super(WebtrendsRDD, self).union(other), False)

    def custom_method(self):
        return CustomRDD(self.filter(lambda x: x.has_property()), False)

mapPartitionsWithIndex方法由许多其他RDD功能调用,因此涵盖了很多内容,但是还有许多其他方法必须用自己的构造函数包装起来,才能像我在union中那样不断地获取自己的CustomRDD。在

在分布式环境中计算精确的中间值需要付出一些努力,所以假设您想要一个RDD中所有值的平方。让我们将此方法称为squares,并假设它的工作原理如下:

assert rdd.squares().collect() == rdd.map(lambda x: x * x).collect()

1。修改pyspark.RDD定义:

^{pr2}$

注意:如果修改类定义,每个实例都将访问squares。在

2。创建RDD子类:

class RDDWithSquares(RDD):
    def squares(self):
        return self.map(lambda x: x * x)

rdd = sc.parallelize([1, 2, 3])
rdd.__class__ = RDDWithSquares # WARNING: see a comment below

分配一个类是一种肮脏的方法,因此在实践中,您应该以适当的方式创建一个RDD(参见示例context.parallelize实现)。在

3。向实例添加方法

import types

rdd = sc.parallelize([1, 2, 3])
# Reusing squares function defined above
rdd.squares = types.MethodType(squares, rdd)

免责声明

首先,我还没有测试任何一个足够长的时间,以确保那里没有隐藏的问题。在

此外,我认为这不值得大惊小怪。如果不进行静态类型检查,就很难找到任何好处,而且可以以更干净的方式使用函数、currying和^{}获得类似的结果。在

from toolz import pipe
pipe(
    sc.parallelize([1, 2, 3]),
    squares,
    lambda rdd: rdd.collect())

相关问题 更多 >

    热门问题