针对数据生成和运行查询
datafusion的Python项目详细描述
Python中的数据融合
这是一个绑定到Apache Arrow内存查询引擎DataFusion的Python库。在
与pyspark一样,它允许您通过SQL或dataframeapi针对内存中的数据、parquet或CSV文件构建计划,在多线程环境中运行它,并在Python中获得结果。在
它还允许您将udf和UDAFs用于复杂的操作。在
与其他执行引擎相比,这个库的主要优点是它在Python和它的执行引擎之间实现了零拷贝:除了在运行这些操作时必须锁定GIL之外,使用udf、UDAFs并将结果收集到Python中没有成本。在
它的查询引擎DataFusion是用Rust编写的,它对线程安全和缺少内存泄漏做出了强有力的假设。在
从技术上讲,零拷贝是通过c data interface实现的。在
如何使用它
简单用法:
importdatafusionimportpyarrow# an aliasf=datafusion.functions# create a contextctx=datafusion.ExecutionContext()# create a RecordBatch and a new DataFrame from itbatch=pyarrow.RecordBatch.from_arrays([pyarrow.array([1,2,3]),pyarrow.array([4,5,6])],names=["a","b"],)df=ctx.create_dataframe([[batch]])# create a new statementdf=df.select(f.col("a")+f.col("b"),f.col("a")-f.col("b"),)# execute and collect the first (and only) batchresult=df.collect()[0]assertresult.column(0)==pyarrow.array([5,7,9])assertresult.column(1)==pyarrow.array([-3,-3,-3])
自定义项
^{pr2}$UDAF
importpyarrowimportpyarrow.computeclassAccumulator:""" Interface of a user-defined accumulation. """def__init__(self):self._sum=pyarrow.scalar(0.0)defto_scalars(self)->[pyarrow.Scalar]:return[self._sum]defupdate(self,values:pyarrow.Array)->None:# not nice since pyarrow scalars can't be summed yet. This breaks on `None`self._sum=pyarrow.scalar(self._sum.as_py()+pyarrow.compute.sum(values).as_py())defmerge(self,states:pyarrow.Array)->None:# not nice since pyarrow scalars can't be summed yet. This breaks on `None`self._sum=pyarrow.scalar(self._sum.as_py()+pyarrow.compute.sum(states).as_py())defevaluate(self)->pyarrow.Scalar:returnself._sumdf=...udaf=f.udaf(Accumulator,pyarrow.float64(),pyarrow.float64(),[pyarrow.float64()])df=df.aggregate([],[udaf(f.col("a"))])
如何安装
pip install datafusion
如何发展
这假设您安装了铁锈和货物。我们使用pyo3和maturin推荐的工作流。在
引导:
# fetch this repo git clone git@github.com:jorgecarleitao/datafusion-python.git cd datafusion-python # prepare development environment (used to build wheel / install in development) python -m venv venv venv/bin/pip install maturin==0.8.2 toml==0.10.1
只要rust代码更改(您的更改或通过git pull):
venv/bin/maturin develop venv/bin/python -m unittest discover tests
- 项目
标签: