Hadoop Combiner未写入Redu

2024-09-30 16:32:39 发布

您现在位置:Python中文网/ 问答频道 /正文

当我执行以下操作时,在本地一切正常:

Cat data | mapper.py | sort | combiner.py | reducer.py  but when I ran this in Hadoop - combiner keeps on running without sending any output to reducer. Finally job gets killed.

显示"Java.io.IOException: Bad file descriptor" and "WARN org.apache.hadoop.streaming.PipeMapRed: java.io.IOException: Broken pipe"

这只在我用组合器运行时发生,但在使用mapper和reducer运行时不会发生。在

制图员:

^{pr2}$

合路器:

enter code here
#!/usr/bin/python
import sys

oldKey =None 
old_doc =None
doc_count =0


for line in sys.stdin:
    data_mapped = line.strip().split("\t")
    if len(data_mapped) != 3:
        # Something has gone wrong. Skip this line.
        continue

    thisKey, thisDoc,nos = data_mapped

    if oldKey and oldKey != thisKey:
        print oldKey, "\t", old_doc,"\t",str(doc_count)
        doc_list = []
        doc_count =0
        old_doc=None

    oldKey = thisKey

    if old_doc ==None:
        old_doc =thisDoc
        doc_count +=int(1)
        continue
    if old_doc !=None:
        if thisDoc ==old_doc:
            doc_count +=int(1)
            continue
        else:
            print oldKey, "\t", old_doc,"\t",str(doc_count)
            old_doc =thisDoc
            doc_count =0
            doc_count +=int(1)

if oldKey != None:
    print oldKey, "\t", old_doc,"\t",str(doc_count)

减速器:

enter code here
#!/usr/bin/python
import sys

oldKey =None 
doc_list = []
doc_count =0


for line in sys.stdin:
    data_mapped = line.strip().split("\t")
    if len(data_mapped) != 3:
        # Something has gone wrong. Skip this line.
        continue

    thisKey, thisDoc,nos = data_mapped

    if oldKey and oldKey != thisKey:
        print oldKey, "\t", sorted(doc_list), "\t",doc_count
        doc_list = []
        doc_count =0
        oldKey=None

    oldKey = thisKey
    doc_count +=int(nos)
    if int(thisDoc) not in doc_list:
        doc_list.append(int(thisDoc))
        #print doc_list

if oldKey != None:
    print oldKey, "\t", sorted(doc_list), "\t",doc_count

这个问题类似于“反向索引”——最终输出将是<;word,[list of docs],count>

任何帮助都会很好。在


Tags: innonedatadocifcountlineold