将Spark RDD保存到Avrospark.api.python.Con公司

2024-10-02 00:39:06 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图遵循Spark 1.1: Bringing Hadoop Input/Output Formats to PySpark指南以Avro格式保存RDD。我的转换器工作,但看起来真的很难看。在

class JavaToAvroWrapperConverter()
  extends Converter[Any, AvroKey[GenericData.Record]] {
  override def convert(obj: Any): AvroKey[GenericData.Record] = {
    val args: Array[Any] = obj.asInstanceOf[Array[Any]]
    val schema = (new Parser).parse(args(1).asInstanceOf[String])
    val map = args(0).asInstanceOf[Array[Any]](0).asInstanceOf[util.HashMap[String, Any]]
    val record = new GenericData.Record(schema)
    map.keys.foreach(k => record.put(k, map(k)))

    new AvroKey[GenericData.Record](record)
  }
}

为了实现这一点,我需要将RDD构造为每行到模式的映射

^{pr2}$

所以,我的问题是,是否有一种更文明的方法来处理这个问题,而不是在每个RDD元组中重复模式,而不是在转换器中以庞大的参数形式获取模式。或者同样问题的简短版本是:我做错了什么?在

谢谢!在


Tags: objmapnewschema模式argsanyval

热门问题