科学套件-学习是Pyspark

sparkit-learn的Python项目详细描述


Sparkit学习

Build StatusPyPiJoin the chat at https://gitter.im/lensacom/sparkit-learn

pyspark+scikit learn=sparkit学习

github:https://github.com/lensacom/sparkit-learn

关于

sparkit learn旨在提供scikit learn功能和 皮斯帕克。库的主要目标是创建一个api 接近sklearn。

驱动原理是“局部思考,分布式执行。” 为了适应这个概念,基本数据块总是数组或 (稀疏)矩阵和操作在块级执行。

要求

  • python 2.7.x或3.4.x
  • spark[>;=1.3.0]
  • numpy[>;=1.9.0]
  • scipy[>;=0.14.0]
  • 科学套件学习[>;=0.16]

从笔记本目录运行ipython

PYTHONPATH=${PYTHONPATH}:.. IPYTHON_OPTS="notebook"${SPARK_HOME}/bin/pyspark --master local\[4\] --driver-memory 2G

使用

运行测试
./runtests.sh

快速启动

sparkit learn引入了三种重要的分布式数据格式:

  • arrayrdd:

    类似于分布式数组的numpy.array

    fromsplearn.rddimportArrayRDDdata=range(20)# PySpark RDD with 2 partitionsrdd=sc.parallelize(data,2)# each partition with 10 elements# ArrayRDD# each partition will contain blocks with 5 elementsX=ArrayRDD(rdd,bsize=5)# 4 blocks, 2 in each partition

    基本操作:

    len(X)# 20 - number of elements in the whole datasetX.blocks# 4 - number of blocksX.shape# (20,) - the shape of the whole datasetX# returns an ArrayRDD# <class 'splearn.rdd.ArrayRDD'> from PythonRDD...X.dtype# returns the type of the blocks# numpy.ndarrayX.collect()# get the dataset# [array([0, 1, 2, 3, 4]),#  array([5, 6, 7, 8, 9]),#  array([10, 11, 12, 13, 14]),#  array([15, 16, 17, 18, 19])]X[1].collect()# indexing# [array([5, 6, 7, 8, 9])]X[1]# also returns an ArrayRDD!X[1::2].collect()# slicing# [array([5, 6, 7, 8, 9]),#  array([15, 16, 17, 18, 19])]X[1::2]# returns an ArrayRDD as wellX.tolist()# returns the dataset as a list# [0, 1, 2, ... 17, 18, 19]X.toarray()# returns the dataset as a numpy.array# array([ 0,  1,  2, ... 17, 18, 19])# pyspark.rdd operations will still workX.getNumPartitions()# 2 - number of partitions
  • sparserdd:

    arrayrdd的稀疏对应项,主要区别在于 块是稀疏矩阵。这种分裂背后的原因是 numpy.ndarray*s和*scipy.sparse矩阵之间的区别。 通常sparserdd是由splearn的转换器创建的,但是可以 也实例化。

    # generate a SparseRDD from a text using SparkCountVectorizerfromsplearn.rddimportSparseRDDfromsklearn.feature_extraction.tests.test_textimportALL_FOOD_DOCSALL_FOOD_DOCS#(u'the pizza pizza beer copyright',# u'the pizza burger beer copyright',# u'the the pizza beer beer copyright',# u'the burger beer beer copyright',# u'the coke burger coke copyright',# u'the coke burger burger',# u'the salad celeri copyright',# u'the salad salad sparkling water copyright',# u'the the celeri celeri copyright',# u'the tomato tomato salad water',# u'the tomato salad water copyright')# ArrayRDD created from the raw dataX=ArrayRDD(sc.parallelize(ALL_FOOD_DOCS,4),2)X.collect()# [array([u'the pizza pizza beer copyright',#         u'the pizza burger beer copyright'], dtype='<U31'),#  array([u'the the pizza beer beer copyright',#         u'the burger beer beer copyright'], dtype='<U33'),#  array([u'the coke burger coke copyright',#         u'the coke burger burger'], dtype='<U30'),#  array([u'the salad celeri copyright',#         u'the salad salad sparkling water copyright'], dtype='<U41'),#  array([u'the the celeri celeri copyright',#         u'the tomato tomato salad water'], dtype='<U31'),#  array([u'the tomato salad water copyright'], dtype='<U32')]# Feature extraction executedfromsplearn.feature_extraction.textimportSparkCountVectorizervect=SparkCountVectorizer()X=vect.fit_transform(X)# and we have a SparseRDDX# <class 'splearn.rdd.SparseRDD'> from PythonRDD...# it's type is the scipy.sparse's general parentX.dtype# scipy.sparse.base.spmatrix# slicing works just like in ArrayRDDsX[2:4].collect()# [<2x11 sparse matrix of type '<type 'numpy.int64'>'#   with 7 stored elements in Compressed Sparse Row format>,#  <2x11 sparse matrix of type '<type 'numpy.int64'>'#   with 9 stored elements in Compressed Sparse Row format>]# general mathematical operations are availableX.sum(),X.mean(),X.max(),X.min()# (55, 0.45454545454545453, 2, 0)# even with axis parameters providedX.sum(axis=1)# matrix([[5],#         [5],#         [6],#         [5],#         [5],#         [4],#         [4],#         [6],#         [5],#         [5],#         [5]])# It can be transformed to dense ArrayRDDX.todense()# <class 'splearn.rdd.ArrayRDD'> from PythonRDD...X.todense().collect()# [array([[1, 0, 0, 0, 1, 2, 0, 0, 1, 0, 0],#         [1, 1, 0, 0, 1, 1, 0, 0, 1, 0, 0]]),#  array([[2, 0, 0, 0, 1, 1, 0, 0, 2, 0, 0],#         [2, 1, 0, 0, 1, 0, 0, 0, 1, 0, 0]]),#  array([[0, 1, 0, 2, 1, 0, 0, 0, 1, 0, 0],#         [0, 2, 0, 1, 0, 0, 0, 0, 1, 0, 0]]),#  array([[0, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0],#         [0, 0, 0, 0, 1, 0, 2, 1, 1, 0, 1]]),#  array([[0, 0, 2, 0, 1, 0, 0, 0, 2, 0, 0],#         [0, 0, 0, 0, 0, 0, 1, 0, 1, 2, 1]]),#  array([[0, 0, 0, 0, 1, 0, 1, 0, 1, 1, 1]])]# One can instantiate SparseRDD manually too:sparse=sc.parallelize(np.array([sp.eye(2).tocsr()]*20),2)sparse=SparseRDD(sparse,bsize=5)sparse# <class 'splearn.rdd.SparseRDD'> from PythonRDD...sparse.collect()# [<10x2 sparse matrix of type '<type 'numpy.float64'>'#   with 10 stored elements in Compressed Sparse Row format>,#  <10x2 sparse matrix of type '<type 'numpy.float64'>'#   with 10 stored elements in Compressed Sparse Row format>,#  <10x2 sparse matrix of type '<type 'numpy.float64'>'#   with 10 stored elements in Compressed Sparse Row format>,#  <10x2 sparse matrix of type '<type 'numpy.float64'>'#   with 10 stored elements in Compressed Sparse Row format>]
  • dictrdd:

    基于列的数据格式,每个列都有自己的类型。

    fromsplearn.rddimportDictRDDX=range(20)y=list(range(2))*10# PySpark RDD with 2 partitionsX_rdd=sc.parallelize(X,2)# each partition with 10 elementsy_rdd=sc.parallelize(y,2)# each partition with 10 elements# DictRDD# each partition will contain blocks with 5 elementsZ=DictRDD((X_rdd,y_rdd),columns=('X','y'),bsize=5,dtype=[np.ndarray,np.ndarray])# 4 blocks, 2/partition# if no dtype is provided, the type of the blocks will be determined# automatically# or:importnumpyasnpdata=np.array([range(20),list(range(2))*10]).Trdd=sc.parallelize(data,2)Z=DictRDD(rdd,columns=('X','y'),bsize=5,dtype=[np.ndarray,np.ndarray])

    基本操作:

    len(Z)# 8 - number of blocksZ.columns# returns ('X', 'y')Z.dtype# returns the types in correct order# [numpy.ndarray, numpy.ndarray]Z# returns a DictRDD#<class 'splearn.rdd.DictRDD'> from PythonRDD...Z.collect()# [(array([0, 1, 2, 3, 4]), array([0, 1, 0, 1, 0])),#  (array([5, 6, 7, 8, 9]), array([1, 0, 1, 0, 1])),#  (array([10, 11, 12, 13, 14]), array([0, 1, 0, 1, 0])),#  (array([15, 16, 17, 18, 19]), array([1, 0, 1, 0, 1]))]Z[:,'y']# column select - returns an ArrayRDDZ[:,'y'].collect()# [array([0, 1, 0, 1, 0]),#  array([1, 0, 1, 0, 1]),#  array([0, 1, 0, 1, 0]),#  array([1, 0, 1, 0, 1])]Z[:-1,['X','y']]# slicing - DictRDDZ[:-1,['X','y']].collect()# [(array([0, 1, 2, 3, 4]), array([0, 1, 0, 1, 0])),#  (array([5, 6, 7, 8, 9]), array([1, 0, 1, 0, 1])),#  (array([10, 11, 12, 13, 14]), array([0, 1, 0, 1, 0]))]

基本工作流程

通过使用所描述的数据结构,基本工作流是 几乎与sklearn的相同。

文本的分布式矢量化
火花计数器
fromsplearn.rddimportArrayRDDfromsplearn.feature_extraction.textimportSparkCountVectorizerfromsklearn.feature_extraction.textimportCountVectorizerX=[...]# list of textsX_rdd=ArrayRDD(sc.parallelize(X,4))# sc is SparkContextlocal=CountVectorizer()dist=SparkCountVectorizer()result_local=local.fit_transform(X)result_dist=dist.fit_transform(X_rdd)# SparseRDD
Sparkhashingvector发生器
fromsplearn.rddimportArrayRDDfromsplearn.feature_extraction.textimportSparkHashingVectorizerfromsklearn.feature_extraction.textimportHashingVectorizerX=[...]# list of textsX_rdd=ArrayRDD(sc.parallelize(X,4))# sc is SparkContextlocal=HashingVectorizer()dist=SparkHashingVectorizer()result_local=local.fit_transform(X)result_dist=dist.fit_transform(X_rdd)# SparseRDD
SPARKTF变压器
fromsplearn.rddimportArrayRDDfromsplearn.feature_extraction.textimportSparkHashingVectorizerfromsplearn.feature_extraction.textimportSparkTfidfTransformerfromsplearn.pipelineimportSparkPipelinefromsklearn.feature_extraction.textimportHashingVectorizerfromsklearn.feature_extraction.textimportTfidfTransformerfromsklearn.pipelineimportPipelineX=[...]# list of textsX_rdd=ArrayRDD(sc.parallelize(X,4))# sc is SparkContextlocal_pipeline=Pipeline((('vect',HashingVectorizer()),('tfidf',TfidfTransformer())))dist_pipeline=SparkPipeline((('vect',SparkHashingVectorizer()),('tfidf',SparkTfidfTransformer())))result_local=local_pipeline.fit_transform(X)result_dist=dist_pipeline.fit_transform(X_rdd)# SparseRDD

分布式分类器
fromsplearn.rddimportDictRDDfromsplearn.feature_extraction.textimportSparkHashingVectorizerfromsplearn.feature_extraction.textimportSparkTfidfTransformerfromsplearn.svmimportSparkLinearSVCfromsplearn.pipelineimportSparkPipelinefromsklearn.feature_extraction.textimportHashingVectorizerfromsklearn.feature_extraction.textimportTfidfTransformerfromsklearn.svmimportLinearSVCfromsklearn.pipelineimportPipelineX=[...]# list of textsy=[...]# list of labelsX_rdd=sc.parallelize(X,4)y_rdd=sc.parralelize(y,4)Z=DictRDD((X_rdd,y_rdd),columns=('X','y'),dtype=[np.ndarray,np.ndarray])local_pipeline=Pipeline((('vect',HashingVectorizer()),('tfidf',TfidfTransformer()),('clf',LinearSVC())))dist_pipeline=SparkPipeline((('vect',SparkHashingVectorizer()),('tfidf',SparkTfidfTransformer()),('clf',SparkLinearSVC())))local_pipeline.fit(X,y)dist_pipeline.fit(Z,clf__classes=np.unique(y))y_pred_local=local_pipeline.predict(X)y_pred_dist=dist_pipeline.predict(Z[:,'X'])

分布式模型选择
fromsplearn.rddimportDictRDDfromsplearn.grid_searchimportSparkGridSearchCVfromsplearn.naive_bayesimportSparkMultinomialNBfromsklearn.grid_searchimportGridSearchCVfromsklearn.naive_bayesimportMultinomialNBX=[...]y=[...]X_rdd=sc.parallelize(X,4)y_rdd=sc.parralelize(y,4)Z=DictRDD((X_rdd,y_rdd),columns=('X','y'),dtype=[np.ndarray,np.ndarray])parameters={'alpha':[0.1,1,10]}fit_params={'classes':np.unique(y)}local_estimator=MultinomialNB()local_grid=GridSearchCV(estimator=local_estimator,param_grid=parameters)estimator=SparkMultinomialNB()grid=SparkGridSearchCV(estimator=estimator,param_grid=parameters,fit_params=fit_params)local_grid.fit(X,y)grid.fit(Z)

特别感谢

  • scikit学习社区
  • Spylearn社区
  • Pyspark社区

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java如何使用“Wed,01 Jul 2015 17:32:41 EDT”解析字符串   java Storm apache升级(1.0.0到2.0.0)   java类驻留在不同的目录中,而不是包指定的目录。为什么?   将Java中的图像缩放到非常小的维度   java如何通过子文档从自定义方面访问ElasticSearch parentdoc字段   java如何在RationalSoftwareArchitect中使用findbugs?   Java中的事件提升处理   java值被添加到arrayList的所有索引中,而不是在“”时添加到最后一个索引中。正在使用arraylist的add()方法   JFrame中的java JPanel派生类   java如何用循环和异步方法模拟类   java Android阻止可绘制背景超出视图范围   为客户排序Java阵列   java Apache poi如何将工作表设置为枚举位置值属性?   java Rhino在使用自定义类参数调用javascript函数时出错   java格式化日期从年月日到年月日   spring如何修复java。lang.illegalargumentexception在此特定场景中是否尝试创建具有null实体的合并事件?   java如何创建更好的对象