科学套件-学习是Pyspark
sparkit-learn的Python项目详细描述
关于
sparkit learn旨在提供scikit learn功能和 皮斯帕克。库的主要目标是创建一个api 接近sklearn。
驱动原理是“局部思考,分布式执行。” 为了适应这个概念,基本数据块总是数组或 (稀疏)矩阵和操作在块级执行。
要求
- python 2.7.x或3.4.x
- spark[>;=1.3.0]
- numpy[>;=1.9.0]
- scipy[>;=0.14.0]
- 科学套件学习[>;=0.16]
从笔记本目录运行ipython
PYTHONPATH=${PYTHONPATH}:.. IPYTHON_OPTS="notebook"${SPARK_HOME}/bin/pyspark --master local\[4\] --driver-memory 2G
使用
运行测试./runtests.sh
快速启动
sparkit learn引入了三种重要的分布式数据格式:
arrayrdd:
类似于分布式数组的numpy.array
fromsplearn.rddimportArrayRDDdata=range(20)# PySpark RDD with 2 partitionsrdd=sc.parallelize(data,2)# each partition with 10 elements# ArrayRDD# each partition will contain blocks with 5 elementsX=ArrayRDD(rdd,bsize=5)# 4 blocks, 2 in each partition
基本操作:
len(X)# 20 - number of elements in the whole datasetX.blocks# 4 - number of blocksX.shape# (20,) - the shape of the whole datasetX# returns an ArrayRDD# <class 'splearn.rdd.ArrayRDD'> from PythonRDD...X.dtype# returns the type of the blocks# numpy.ndarrayX.collect()# get the dataset# [array([0, 1, 2, 3, 4]),# array([5, 6, 7, 8, 9]),# array([10, 11, 12, 13, 14]),# array([15, 16, 17, 18, 19])]X[1].collect()# indexing# [array([5, 6, 7, 8, 9])]X[1]# also returns an ArrayRDD!X[1::2].collect()# slicing# [array([5, 6, 7, 8, 9]),# array([15, 16, 17, 18, 19])]X[1::2]# returns an ArrayRDD as wellX.tolist()# returns the dataset as a list# [0, 1, 2, ... 17, 18, 19]X.toarray()# returns the dataset as a numpy.array# array([ 0, 1, 2, ... 17, 18, 19])# pyspark.rdd operations will still workX.getNumPartitions()# 2 - number of partitions
sparserdd:
arrayrdd的稀疏对应项,主要区别在于 块是稀疏矩阵。这种分裂背后的原因是 numpy.ndarray*s和*scipy.sparse矩阵之间的区别。 通常sparserdd是由splearn的转换器创建的,但是可以 也实例化。
# generate a SparseRDD from a text using SparkCountVectorizerfromsplearn.rddimportSparseRDDfromsklearn.feature_extraction.tests.test_textimportALL_FOOD_DOCSALL_FOOD_DOCS#(u'the pizza pizza beer copyright',# u'the pizza burger beer copyright',# u'the the pizza beer beer copyright',# u'the burger beer beer copyright',# u'the coke burger coke copyright',# u'the coke burger burger',# u'the salad celeri copyright',# u'the salad salad sparkling water copyright',# u'the the celeri celeri copyright',# u'the tomato tomato salad water',# u'the tomato salad water copyright')# ArrayRDD created from the raw dataX=ArrayRDD(sc.parallelize(ALL_FOOD_DOCS,4),2)X.collect()# [array([u'the pizza pizza beer copyright',# u'the pizza burger beer copyright'], dtype='<U31'),# array([u'the the pizza beer beer copyright',# u'the burger beer beer copyright'], dtype='<U33'),# array([u'the coke burger coke copyright',# u'the coke burger burger'], dtype='<U30'),# array([u'the salad celeri copyright',# u'the salad salad sparkling water copyright'], dtype='<U41'),# array([u'the the celeri celeri copyright',# u'the tomato tomato salad water'], dtype='<U31'),# array([u'the tomato salad water copyright'], dtype='<U32')]# Feature extraction executedfromsplearn.feature_extraction.textimportSparkCountVectorizervect=SparkCountVectorizer()X=vect.fit_transform(X)# and we have a SparseRDDX# <class 'splearn.rdd.SparseRDD'> from PythonRDD...# it's type is the scipy.sparse's general parentX.dtype# scipy.sparse.base.spmatrix# slicing works just like in ArrayRDDsX[2:4].collect()# [<2x11 sparse matrix of type '<type 'numpy.int64'>'# with 7 stored elements in Compressed Sparse Row format>,# <2x11 sparse matrix of type '<type 'numpy.int64'>'# with 9 stored elements in Compressed Sparse Row format>]# general mathematical operations are availableX.sum(),X.mean(),X.max(),X.min()# (55, 0.45454545454545453, 2, 0)# even with axis parameters providedX.sum(axis=1)# matrix([[5],# [5],# [6],# [5],# [5],# [4],# [4],# [6],# [5],# [5],# [5]])# It can be transformed to dense ArrayRDDX.todense()# <class 'splearn.rdd.ArrayRDD'> from PythonRDD...X.todense().collect()# [array([[1, 0, 0, 0, 1, 2, 0, 0, 1, 0, 0],# [1, 1, 0, 0, 1, 1, 0, 0, 1, 0, 0]]),# array([[2, 0, 0, 0, 1, 1, 0, 0, 2, 0, 0],# [2, 1, 0, 0, 1, 0, 0, 0, 1, 0, 0]]),# array([[0, 1, 0, 2, 1, 0, 0, 0, 1, 0, 0],# [0, 2, 0, 1, 0, 0, 0, 0, 1, 0, 0]]),# array([[0, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0],# [0, 0, 0, 0, 1, 0, 2, 1, 1, 0, 1]]),# array([[0, 0, 2, 0, 1, 0, 0, 0, 2, 0, 0],# [0, 0, 0, 0, 0, 0, 1, 0, 1, 2, 1]]),# array([[0, 0, 0, 0, 1, 0, 1, 0, 1, 1, 1]])]# One can instantiate SparseRDD manually too:sparse=sc.parallelize(np.array([sp.eye(2).tocsr()]*20),2)sparse=SparseRDD(sparse,bsize=5)sparse# <class 'splearn.rdd.SparseRDD'> from PythonRDD...sparse.collect()# [<10x2 sparse matrix of type '<type 'numpy.float64'>'# with 10 stored elements in Compressed Sparse Row format>,# <10x2 sparse matrix of type '<type 'numpy.float64'>'# with 10 stored elements in Compressed Sparse Row format>,# <10x2 sparse matrix of type '<type 'numpy.float64'>'# with 10 stored elements in Compressed Sparse Row format>,# <10x2 sparse matrix of type '<type 'numpy.float64'>'# with 10 stored elements in Compressed Sparse Row format>]
dictrdd:
基于列的数据格式,每个列都有自己的类型。
fromsplearn.rddimportDictRDDX=range(20)y=list(range(2))*10# PySpark RDD with 2 partitionsX_rdd=sc.parallelize(X,2)# each partition with 10 elementsy_rdd=sc.parallelize(y,2)# each partition with 10 elements# DictRDD# each partition will contain blocks with 5 elementsZ=DictRDD((X_rdd,y_rdd),columns=('X','y'),bsize=5,dtype=[np.ndarray,np.ndarray])# 4 blocks, 2/partition# if no dtype is provided, the type of the blocks will be determined# automatically# or:importnumpyasnpdata=np.array([range(20),list(range(2))*10]).Trdd=sc.parallelize(data,2)Z=DictRDD(rdd,columns=('X','y'),bsize=5,dtype=[np.ndarray,np.ndarray])
基本操作:
len(Z)# 8 - number of blocksZ.columns# returns ('X', 'y')Z.dtype# returns the types in correct order# [numpy.ndarray, numpy.ndarray]Z# returns a DictRDD#<class 'splearn.rdd.DictRDD'> from PythonRDD...Z.collect()# [(array([0, 1, 2, 3, 4]), array([0, 1, 0, 1, 0])),# (array([5, 6, 7, 8, 9]), array([1, 0, 1, 0, 1])),# (array([10, 11, 12, 13, 14]), array([0, 1, 0, 1, 0])),# (array([15, 16, 17, 18, 19]), array([1, 0, 1, 0, 1]))]Z[:,'y']# column select - returns an ArrayRDDZ[:,'y'].collect()# [array([0, 1, 0, 1, 0]),# array([1, 0, 1, 0, 1]),# array([0, 1, 0, 1, 0]),# array([1, 0, 1, 0, 1])]Z[:-1,['X','y']]# slicing - DictRDDZ[:-1,['X','y']].collect()# [(array([0, 1, 2, 3, 4]), array([0, 1, 0, 1, 0])),# (array([5, 6, 7, 8, 9]), array([1, 0, 1, 0, 1])),# (array([10, 11, 12, 13, 14]), array([0, 1, 0, 1, 0]))]
基本工作流程
通过使用所描述的数据结构,基本工作流是 几乎与sklearn的相同。
文本的分布式矢量化
火花计数器
fromsplearn.rddimportArrayRDDfromsplearn.feature_extraction.textimportSparkCountVectorizerfromsklearn.feature_extraction.textimportCountVectorizerX=[...]# list of textsX_rdd=ArrayRDD(sc.parallelize(X,4))# sc is SparkContextlocal=CountVectorizer()dist=SparkCountVectorizer()result_local=local.fit_transform(X)result_dist=dist.fit_transform(X_rdd)# SparseRDD
Sparkhashingvector发生器
fromsplearn.rddimportArrayRDDfromsplearn.feature_extraction.textimportSparkHashingVectorizerfromsklearn.feature_extraction.textimportHashingVectorizerX=[...]# list of textsX_rdd=ArrayRDD(sc.parallelize(X,4))# sc is SparkContextlocal=HashingVectorizer()dist=SparkHashingVectorizer()result_local=local.fit_transform(X)result_dist=dist.fit_transform(X_rdd)# SparseRDD
SPARKTF变压器
fromsplearn.rddimportArrayRDDfromsplearn.feature_extraction.textimportSparkHashingVectorizerfromsplearn.feature_extraction.textimportSparkTfidfTransformerfromsplearn.pipelineimportSparkPipelinefromsklearn.feature_extraction.textimportHashingVectorizerfromsklearn.feature_extraction.textimportTfidfTransformerfromsklearn.pipelineimportPipelineX=[...]# list of textsX_rdd=ArrayRDD(sc.parallelize(X,4))# sc is SparkContextlocal_pipeline=Pipeline((('vect',HashingVectorizer()),('tfidf',TfidfTransformer())))dist_pipeline=SparkPipeline((('vect',SparkHashingVectorizer()),('tfidf',SparkTfidfTransformer())))result_local=local_pipeline.fit_transform(X)result_dist=dist_pipeline.fit_transform(X_rdd)# SparseRDD
火花计数器
fromsplearn.rddimportArrayRDDfromsplearn.feature_extraction.textimportSparkCountVectorizerfromsklearn.feature_extraction.textimportCountVectorizerX=[...]# list of textsX_rdd=ArrayRDD(sc.parallelize(X,4))# sc is SparkContextlocal=CountVectorizer()dist=SparkCountVectorizer()result_local=local.fit_transform(X)result_dist=dist.fit_transform(X_rdd)# SparseRDD
Sparkhashingvector发生器
fromsplearn.rddimportArrayRDDfromsplearn.feature_extraction.textimportSparkHashingVectorizerfromsklearn.feature_extraction.textimportHashingVectorizerX=[...]# list of textsX_rdd=ArrayRDD(sc.parallelize(X,4))# sc is SparkContextlocal=HashingVectorizer()dist=SparkHashingVectorizer()result_local=local.fit_transform(X)result_dist=dist.fit_transform(X_rdd)# SparseRDD
SPARKTF变压器
fromsplearn.rddimportArrayRDDfromsplearn.feature_extraction.textimportSparkHashingVectorizerfromsplearn.feature_extraction.textimportSparkTfidfTransformerfromsplearn.pipelineimportSparkPipelinefromsklearn.feature_extraction.textimportHashingVectorizerfromsklearn.feature_extraction.textimportTfidfTransformerfromsklearn.pipelineimportPipelineX=[...]# list of textsX_rdd=ArrayRDD(sc.parallelize(X,4))# sc is SparkContextlocal_pipeline=Pipeline((('vect',HashingVectorizer()),('tfidf',TfidfTransformer())))dist_pipeline=SparkPipeline((('vect',SparkHashingVectorizer()),('tfidf',SparkTfidfTransformer())))result_local=local_pipeline.fit_transform(X)result_dist=dist_pipeline.fit_transform(X_rdd)# SparseRDD
分布式分类器
fromsplearn.rddimportDictRDDfromsplearn.feature_extraction.textimportSparkHashingVectorizerfromsplearn.feature_extraction.textimportSparkTfidfTransformerfromsplearn.svmimportSparkLinearSVCfromsplearn.pipelineimportSparkPipelinefromsklearn.feature_extraction.textimportHashingVectorizerfromsklearn.feature_extraction.textimportTfidfTransformerfromsklearn.svmimportLinearSVCfromsklearn.pipelineimportPipelineX=[...]# list of textsy=[...]# list of labelsX_rdd=sc.parallelize(X,4)y_rdd=sc.parralelize(y,4)Z=DictRDD((X_rdd,y_rdd),columns=('X','y'),dtype=[np.ndarray,np.ndarray])local_pipeline=Pipeline((('vect',HashingVectorizer()),('tfidf',TfidfTransformer()),('clf',LinearSVC())))dist_pipeline=SparkPipeline((('vect',SparkHashingVectorizer()),('tfidf',SparkTfidfTransformer()),('clf',SparkLinearSVC())))local_pipeline.fit(X,y)dist_pipeline.fit(Z,clf__classes=np.unique(y))y_pred_local=local_pipeline.predict(X)y_pred_dist=dist_pipeline.predict(Z[:,'X'])
分布式模型选择
fromsplearn.rddimportDictRDDfromsplearn.grid_searchimportSparkGridSearchCVfromsplearn.naive_bayesimportSparkMultinomialNBfromsklearn.grid_searchimportGridSearchCVfromsklearn.naive_bayesimportMultinomialNBX=[...]y=[...]X_rdd=sc.parallelize(X,4)y_rdd=sc.parralelize(y,4)Z=DictRDD((X_rdd,y_rdd),columns=('X','y'),dtype=[np.ndarray,np.ndarray])parameters={'alpha':[0.1,1,10]}fit_params={'classes':np.unique(y)}local_estimator=MultinomialNB()local_grid=GridSearchCV(estimator=local_estimator,param_grid=parameters)estimator=SparkMultinomialNB()grid=SparkGridSearchCV(estimator=estimator,param_grid=parameters,fit_params=fit_params)local_grid.fit(X,y)grid.fit(Z)
特别感谢
- scikit学习社区
- Spylearn社区
- Pyspark社区