压缩Hadoop序列文件Python

2024-06-28 20:13:27 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个客户发送给我快速压缩的hadoop序列文件进行分析。我最终想做的是把这些数据放入熊猫数据仓库。格式如下所示

>>> body_read

b'SEQ\x06!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable\x01\x01)org.apache.hadoop.io.compress.SnappyCodec\x00\x00\x00\x00\x0b\xabZ\x92f\xceuAf\xa1\x9a\xf0-\x1d2D\xff\xff\xff\xff\x0b\xabZ\x92f\xceuAf\xa1\x9a\xf0-\x1d2D\x8e\x05^N\x00\x00\x05^\x00\x00\x00F\xde\n\x00\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00r\x01\x00\x04\x00\x00\x00\x00\x8e\x08\x92\x00\x00\x10\x1a\x00\x00\x08\x8a\x9a h\x8e\x02\xd6\x8e\x02\xc6\x8e\x02T\x8e\x02\xd4\x8e\x02\xdb\x8e\x02\xd8\x8e\x02\xdf\x8e\x02\xd9\x8e\x02\xd3\x05\x0c0\xd9\x8e\x02\xcc\x8e\x02\xfc\x8e\x02\xe8\x8e\x02\xd0\x05!\x00\xdb\x05\x06\x0c\xd1\x8e\x02\xd7\x05\'\x04\xde\x8e\x01\x03\x18\xce\x8e\x02\xe7\x8e\x02\xd2\x05<\x00\xd4\x05\x1b\x04\xdc\x8e

我想我需要做的是首先使用pythonsnappy解压文件,然后读取序列文件。我不确定在python中读取hadoop序列文件的最佳方法是什么。我也得到和错误时,试图解压缩这个文件

^{pr2}$

要阅读这些文件,我需要做些什么?在


Tags: 文件ioorghadoopapache序列x00x04
1条回答
网友
1楼 · 发布于 2024-06-28 20:13:27

多亏了@cricket_007的有益评论和更多的挖掘,我才得以解决这个问题。PySpark将完成我需要的任务,并且可以直接从S3位置读取Hadoop序列文件,这很好。棘手的部分是设置PySpark,在下载apachespark-https://markobigdata.com/2017/04/23/manipulating-files-from-s3-with-apache-spark/之后,我发现这本指南非常有用。在

我有一个奇怪的差异,那就是我的spark shell会自动解压缩文件:

scala> val fRDD = sc.textFile("s3a://bucket/file_path")
fRDD: org.apache.spark.rdd.RDD[String] = s3a://bucket/file_path MapPartitionsRDD[5] at textFile at <console>:24

scala> fRDD.first()
res4: String = SEQ?!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable??)org.apache.hadoop.io.compress.SnappyCodec???? �Z�f�uAf���- 2D���� �Z�f�uAf���- 2D�?^N???^???F�

但是PySpark没有:

^{pr2}$

你知道我怎么让Pypark这么做吗?在

编辑:再次感谢cricket_007,我开始改用.sequenceFile()。这是我最初的错误

    >>> textFile = sc.sequenceFile("s3a://bucket/file_path")
18/02/07 18:13:12 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.RuntimeException: native snappy library not available: this version of libhadoop was built without snappy support.

我可以通过以下指南来解决这个问题-https://community.hortonworks.com/questions/18903/this-version-of-libhadoop-was-built-without-snappy.html。我现在可以读取序列文件并反编译protobuf消息

>>> seqs = sc.sequenceFile("s3a://bucket/file_path").values()
>>> feed = protobuf_message_pb2.feed()
>>> row = bytes(seqs.first())
>>> feed.ParseFromString(row)
>>> feed.user_id_64
3909139888943208259

这正是我所需要的。我现在想做的是找到一种有效的方法来反编译整个sequenceFile并将其转换为一个数据帧,而不是像我上面所做的那样一次只做一个记录。在

相关问题 更多 >