有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java序列化RDD

我有一个RDD,我正试图序列化它,然后通过反序列化来重建它。我想看看这在Apache Spark中是否可行

     static JavaSparkContext sc = new JavaSparkContext(conf);
        static SerializerInstance si = SparkEnv.get().closureSerializer().newInstance();
    static ClassTag<JavaRDD<String>> tag = scala.reflect.ClassTag$.MODULE$.apply(JavaRDD.class);
..
..
            JavaRDD<String> rdd = sc.textFile(logFile, 4);
            System.out.println("Element 1 " + rdd.first());
            ByteBuffer bb= si.serialize(rdd, tag);
            JavaRDD<String> rdd2 = si.deserialize(bb, Thread.currentThread().getContextClassLoader(),tag);
            System.out.println(rdd2.partitions().size());
            System.out.println("Element 0 " + rdd2.first());

当我对新创建的RDD执行操作时,最后一行出现异常。我序列化的方式类似于Spark内部的方式

Exception in thread "main" org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
    at org.apache.spark.rdd.RDD.sc(RDD.scala:87)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1177)
    at org.apache.spark.rdd.RDD.first(RDD.scala:1189)
    at org.apache.spark.api.java.JavaRDDLike$class.first(JavaRDDLike.scala:477)
    at org.apache.spark.api.java.JavaRDD.first(JavaRDD.scala:32)
    at SimpleApp.sparkSend(SimpleApp.java:63)
    at SimpleApp.main(SimpleApp.java:91)

RDD是在同一个过程中创建和加载的,所以我不明白这个错误是如何发生的


共 (1) 个答案

  1. # 1 楼答案

    我是这条警告信息的作者

    Spark不支持对通过反序列化创建的RDD副本执行操作和转换。RDD是可序列化的,因此可以在执行器中调用RDD上的某些方法,但最终用户不应该尝试手动执行RDD序列化

    当RDD被序列化时,它会丢失对创建它的SparkContext的引用,从而阻止使用它启动作业(请参见here)。在Spark的早期版本中,当Spark试图访问私有的空RDD.sc字段时,代码会导致NullPointerException

    此错误消息的措辞是这样的,因为用户在尝试执行rdd1.map { _ => rdd2.count() }之类的操作时经常遇到令人困惑的NullPointerException,这导致在executor机器上的反序列化RDD上调用操作。我没料到会有人试图在驱动程序上手动序列化/反序列化他们的RDD,所以我可以看出这个错误消息可能有点误导