使用Sp读取CSV

2024-09-28 17:20:48 发布

您现在位置:Python中文网/ 问答频道 /正文

我读csv文件通过火花使用以下。在

rdd=sc.textFile("emails.csv").map(lambda line: line.split(","))

我需要创建一个Spark数据帧。在

我使用以下方法将此rdd转换为spark df:

^{pr2}$

但是我需要在将rdd转换为df时指定df的模式。我试着这么做:(我只有两列文件和消息)

from pyspark import Row

email_schema=Row('file','message')

email_rdd=rdd.map(lambda r: email_schema(*r))

dataframe=sqlContext.createDataFrame(email_rdd)

但是,我得到了一个错误: java.lang.IllegalStateException:输入行没有架构所需的预期值数。需要2个字段,但提供了1个值。在

我还试着用这个来读取我的csv文件:

rdd=sc.textFile("emails.csv").map(lambda line: line.split(",")).map(lambda line: line(line[0],line[1]))

我得到错误:TypeError:'list'对象不可调用

我尝试使用pandas将csv文件读入pandas数据帧,然后将其转换为spark数据帧,但我的文件太大了。在

我还补充道:

bin/pyspark --packages com.databricks:spark-csv_2.10:1.0.3

并使用以下命令读取我的文件:

df=sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('emails.csv')

我得到了一个错误: java.io.IOException异常:(startine 1)在封装令牌完成之前达到EOF

我已经通过了其他几个相关的线程,并尝试如上所述。有人能解释一下我哪里出错了吗?在

[在MacOSX上使用Python2.7、Spark 1.6.2]

编辑:

前3排如下。我只需要提取邮件的内容。我该怎么做?在

1allen-p/_已发送电子邮件/1。”邮件ID:18782981.1075855378110。JavaMail.埃文斯@百里香>; 日期:2001年5月14日星期一16:39:00-0700(太平洋时间) 发件人:菲利普·艾伦@安然网 收件人:蒂姆·贝尔登@安然网 主题: Mime版本:1.0 内容类型:text/plain;字符集=us ascii 内容传输编码:7位 X-发件人:Phillip K Allen 收件人:蒂姆·贝尔登 X-cc: 密件抄送: X文件夹:\Phillip_Allen_Jan2002_1\Allen,Phillip K.\'已发送邮件 X-原点:艾伦-P X文件名:pallen(非特权).pst

这是我们的预测”

2allen-p/\u已发送电子邮件/10。”邮件ID:15464986.1075855378456。JavaMail.埃文斯@百里香>; 日期:2001年5月4日星期五13:51:00-0700(太平洋时间) 发件人:菲利普·艾伦@安然网 收件人:约翰·拉沃拉托@安然网 主题:回复: Mime版本:1.0 内容类型:text/plain;字符集=us ascii 内容传输编码:7位 X-发件人:Phillip K Allen 收件人:John J Lavorato X-cc: 密件抄送: X文件夹:\Phillip_Allen_Jan2002_1\Allen,Phillip K.\'已发送邮件 X-原点:艾伦-P X文件名:pallen(非特权).pst

出差去开商务会议使旅行失去乐趣。尤其是如果你要准备一个演讲。我建议在这里召开商业计划会议,然后在没有任何正式商务会议的情况下旅行。我甚至会尝试得到一些诚实的意见,关于旅行是否是必要的。在

就商务会议而言,我认为尝试并鼓励不同群体讨论什么是有效的,什么是无效的会更有成效。很多时候,演讲者在讲话,而其他人则安静地等待轮到他们。如果以圆桌讨论的形式举行,会议可能会更好。在

我的建议是奥斯汀。打高尔夫球,租一艘滑雪船和摩托艇。去什么地方坐飞机要花很多时间。”

3allen-p/\u已发送电子邮件/100。”邮件ID:24216240.1075855687451。JavaMail.埃文斯@百里香>; 日期:2000年10月18日星期三03:00:00-0700(PDT) 发件人:菲利普·艾伦@安然网 收件人:莉亚·阿斯达尔@安然网 主题:回复:测试 Mime版本:1.0 内容类型:text/plain;字符集=us ascii 内容传输编码:7位 X-发件人:Phillip K Allen 收件人:Leah Van Arsdall X-cc: 密件抄送: X文件夹:\Phillip_Allen_Dec2000\Notes Folders'已发送邮件 X-原点:艾伦-P X文件名:帕伦nsf在

测试成功。干得好!!!”在


Tags: 文件csvlambdamap内容dfemailline
2条回答

如果你有一个巨大的文件,为什么不分块使用pandas数据帧而不是一次加载所有文件,比如:

import pandas as pd
df_pd = pd.read_csv('myfilename.csv',chunksize = 10000)

for i,chunk in enumerate(df1):
    if i==0:
        df_spark = sqlContext.createDataFrame(chunk)
    else:
        df_spark = df_spark.unionAll(sqlContext.createDataFrame(chunk))

df\u spark将是您所需的spark数据帧。这是低效的,但它会起作用。对于实现相同功能的其他方法,您可以参考此question

另一种可能的方法是使用rdd的inferSchema方法,但您需要在csv文件中有列名才能使用,请参阅this。 所以你可以做一些类似的事情:

^{pr2}$

如果RDD可以放入内存,则:

rdd.toPandas().to_csv('emails.csv')

如果不是,请使用spark-csv作为您的spark版本:

^{pr2}$

在上面的例子中:

rdd=....map(lambda line: line.split(",")).map(lambda line: line(line[0],line[1]))

你不想:

rdd=....map(lambda line: line.split(",")).map(lambda line: (line[0], line[1]))

相关问题 更多 >