可接近的地图/减少工作
mister的Python项目详细描述
满足您所有的媒体数据需求
先生试图使运行map/reduce作业变得容易。
当你得到的数据不是很大,所以你还没有准备好 在加齐利亚的机器上分发数据 在合理的时间内仍然像是一个答案。
5分钟开始
先生需要您定义三个方法:prepare(获取数据 准备跨多个进程运行,map(实际上 有来自prepare和^{tt4}的数据块的东西$ (将从map返回的所有值混合在一起)。
reduce方法
prepare(self,count,*args,**kwargs)
count是作业将要运行的进程数,并且 *args和**kwargs是传递到子类的 __init__方法。prepare方法返回count行 包含将传递给的参数的元组((), {}) 每个map进程
map方法
map(self,*args,**kwargs)
*args和**kwargs是从 prepare。map方法返回您想要的reduce到的任何内容 用于将所有数据合并在一起
reduce方法
reduce(self,output,value)
output是所有value参数的全局聚合 reduce方法已经看到基本上,不管你从一个 reduce调用将被传递回下一个reduce调用 output。value参数是最近完成的 map调用已返回。
将所有这些结合在一起
因此,让我们在MrHelloWorld作业中把所有内容都集中起来,首先让我们 把骨架放好:
frommisterimportBaseMisterclassMrHelloWorld(BaseMister):defprepare(self,count,*args,**kwargs):passdefmap(self,*args,**kwargs):passdefreduce(self,output,value):pass
现在让我们充实一下prepare方法:
defprepare(self,count,name):# we're just going to return the number and the name we pass inforxinrange(count):yield([x,name],{})
以及我们的map方法:
defmap(self,x,name):return"Process {} says 'hello {}'".format(x,name)
最后,我们的reduce方法:
defreduce(self,output,value):ifoutputisNone:output=[]output.append(value)returnoutput
执行我们的工作:
mr=MrHelloWorld("Alice")output=mr.run()print(output)
将导致:
[ "Process 1 says 'hello Alice'", "Process 0 says 'hello Alice'", "Process 2 says 'hello Alice'", "Process 3 says 'hello Alice'", "Process 4 says 'hello Alice'", "Process 5 says 'hello Alice'", "Process 6 says 'hello Alice'", "Process 7 says 'hello Alice'", "Process 8 says 'hello Alice'", "Process 9 says 'hello Alice'", "Process 10 says 'hello Alice'" ]
恭喜,你刚刚运行了一个map/reduce作业,你现在是一个ai和一个ml 工程师,当你出名的时候记得我
另一个例子
我认为单词计数是传统的map/reduce示例所以在这里 是:
importosimportreimprotmathfromcollectionsimportCounterfrommisterimportBaseMisterclassMrWordCount(BaseMister):defprepare(self,count,path):"""prepare segments the data for the map() method"""size=os.path.getsize(path)length=int(math.ceil(size/count))start=0forxinrange(count):kwargs={}kwargs["path"]=pathkwargs["start"]=startkwargs["length"]=lengthstart+=lengthyield(),kwargsdefmap(self,path,start,length):"""all the magic happens right here"""output=Counter()withopen(path)asfp:fp.seek(start,0)words=fp.read(length)# I don't compensate for word boundaries because exampleforwordinre.split(r"\s+",words):output[word]+=1returnoutputdefreduce(self,output,count):"""take all the return values from map() and aggregate them to the final value"""ifnotoutput:output=Counter()output.update(count)returnoutput# let's count the biblepath="./testdata/bible-kjv.txt"mr=MrWordCount(path)wordcounts=mr.run()print(wordcounts.most_common(10))
在我的计算机上,上面的异步代码比 它的同步等效物如下:
importrefromcollectionsimportCounterpath="./testdata/bible-kjv.txt"output=Counter()withopen(path)asfp:words=fp.read()forwordinre.split(r"\s+",words):output[word]+=1print(wordcounts.most_common(10))
安装
要安装,请使用pip:
$ pip install mister
或者,抓住最新最伟大的:
$ pip install --upgrade git+https://github.com/Jaymon/mister#egg=mister