我试图用pandas从apachebeam读取存储在google存储中的文件,但是得到了错误
def Panda_a(self):
import pandas as pd
data = 'gs://tegclorox/Input/merge1.csv'
df1 = pd.read_csv(data, names = ['first_name', 'last_name', 'age',
'preTestScore', 'postTestScore'])
return df1
ip2 = p |'Split WeeklyDueto' >> beam.Map(Panda_a)
ip7 = ip2 | 'print' >> beam.io.WriteToText('gs://tegclorox/Output/merge1234')
当我执行上面的代码时,错误提示路径不存在。知道为什么吗?在
这个代码有很多问题。在
http
,ftp
,s3
,file
)。但是,您可以使用BeamFileSystems.open()
API来获取一个file对象,并将该对象提供给pandaps而不是文件路径。在p | ... >> beam.Map(...)
-beam.Map(f)
使用给定函数f
转换输入的每个元素{beam.Create(['ignored'])
PCollection
,其中这个元素就是整个数据帧——更有可能的是,您希望数据帧的每一行都有一个元素。为此,您需要使用beam.FlatMap
,并且需要df.iterrows()
或类似的东西。在一般来说,我不知道为什么要用Pandas来读取CSV文件。您可以使用Beam的
ReadFromText
和skip_header_lines=1
来读取它,然后自己解析每一行——如果您有大量的数据,这将是非常高效的(如果您只有少量的数据,并且不期望它变得足够大而超过一台机器的能力—比如,如果它永远不会超过几GB—那么梁是错误的工具)。在相关问题 更多 >
编程相关推荐