class SomeDoFn(beam.DoFn):
def process(self, pair): # pair is a key/value tuple
df = pd.DataFrame(pair[1]) # just the array of values
## do something with the dataframe
...
records = df.to_dict('records')
# return a tuple with the same shape as the one we received
return [(rec["key"], rec) for rec in records]
当我使用“本机”数据帧而不是由Beam中的
to_dataframe
创建的数据帧时,我遇到了这个问题。我怀疑Beam创建的数据帧使用本机Pandas数据帧没有的新属性(如_expr
)来包装或子类化Pandas数据帧真正的答案是知道如何使用
apache_beam.dataframe.convert.to_dataframe
,但我不知道如何正确设置代理对象(当我稍后尝试使用to_pcollection
时,会出现单例错误)。因此,由于我无法在2.25.0中找到“正确”的工作方式(我对Beam和Pandas不太熟悉,也不知道代理对象如何工作,所以请恕我直言),我使用以下解决方法:我用这样的方式调用它:
我希望其他人会给你一个比这个解决方案更好的答案
to_pcollection
只打算应用于Beam的延迟数据帧,但从这个角度看,它应该可以工作,并且不清楚如何手动执行https://github.com/apache/beam/pull/14170应该解决这个问题相关问题 更多 >
编程相关推荐