csv文件上的Python多处理EOF错误

2024-09-28 03:20:59 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图实现一个multiprocessing方法来读取和比较两个csv文件。首先,我从embarassingly parallel problems中的代码示例开始,它对文件中的整数求和。问题是这个例子不适合我。(我在Windows上运行Python2.6。)

我得到以下EOF错误:

File "C:\Python26\lib\pickle.py", line 880, in load_eof
raise EOFError
EOFError

在这一行:

^{pr2}$

我发现了一些examples的问题,可能是csv的打开方法需要是'rb'。我试过了,但也没用。在

然后我试图简化代码,以便在最基本的级别上重现错误。我在同一条线上也犯了同样的错误。甚至当我简化到parse_input_csv函数甚至不读取文件时。(不确定如果文件未被读取,如何触发EOF?)在

import csv
import multiprocessing

class CSVWorker(object):
    def __init__(self, infile, outfile):
        #self.infile = open(infile)
        self.infile = open(infile, 'rb') #try rb for Windows

        self.in_csvfile = csv.reader(self.infile)
        self.inq = multiprocessing.Queue()    
        self.pin = multiprocessing.Process(target=self.parse_input_csv, args=())

        self.pin.start()
        self.pin.join()    
        self.infile.close()

    def parse_input_csv(self):
#         for i, row in enumerate(self.in_csvfile):
#             self.inq.put( (i, row) )

#         for row in self.in_csvfile:
#             print row
#             #self.inq.put( row )

        print 'yup'


if __name__ == '__main__':        
    c = CSVWorker('random_ints.csv', 'random_ints_sums.csv')
    print 'done' 

最后,我试着把这一切都扯到课外去了。如果不在csv上迭代,这就可以了,但是如果遍历csv,则会出现相同的错误。在

def manualCSVworker(infile, outfile):
    f = open(infile, 'rb')
    in_csvfile = csv.reader(f)        
    inq = multiprocessing.Queue()

    # this works (no reading csv file)
    pin = multiprocessing.Process(target=manual_parse_input_csv, args=(in_csvfile,))

    # this does not work (tries to read csv, and fails with EOFError)
    #pin = multiprocessing.Process(target=print_yup, args=())

    pin.start()
    pin.join()    
    f.close()

def print_yup():
    print 'yup'

def manual_parse_input_csv(csvReader):    
    for row in csvReader:
        print row

if __name__ == '__main__':        
    manualCSVworker('random_ints.csv', 'random_ints_sums.csv')
    print 'done' 

有人能帮我找出这里的问题吗?在

编辑:我只是想把工作代码贴出来。最后我放弃了类实现。正如timpeters建议的那样,我只传递文件名(而不是打开的文件)。在

在500万行x 2列中,我注意到2个处理器比1个处理器的时间提高了20%。我期望更多一点,但我认为问题是排队的额外开销。根据this thread,一个改进可能是以100个或更多的块(而不是每一行)对记录进行排队。在

import csv
import multiprocessing
from datetime import datetime

NUM_PROCS = multiprocessing.cpu_count()

def main(numprocsrequested, infile, outfile):

    inq = multiprocessing.Queue()
    outq = multiprocessing.Queue()

    numprocs = min(numprocsrequested, NUM_PROCS)

    pin = multiprocessing.Process(target=parse_input_csv, args=(infile,numprocs,inq,))
    pout = multiprocessing.Process(target=write_output_csv, args=(outfile,numprocs,outq,))
    ps = [ multiprocessing.Process(target=sum_row, args=(inq,outq,)) for i in range(numprocs)]

    pin.start()
    pout.start()
    for p in ps:
        p.start()

    pin.join()
    i = 0
    for p in ps:
        p.join()
        #print "Done", i
        i += 1
    pout.join()

def parse_input_csv(infile, numprocs, inq):
        """Parses the input CSV and yields tuples with the index of the row
        as the first element, and the integers of the row as the second
        element.

        The index is zero-index based.

        The data is then sent over inqueue for the workers to do their
        thing.  At the end the input thread sends a 'STOP' message for each
        worker.
        """
        f = open(infile, 'rb')
        in_csvfile = csv.reader(f)

        for i, row in enumerate(in_csvfile):
            row = [ int(entry) for entry in row ]
            inq.put( (i,row) )

        for i in range(numprocs):
            inq.put("STOP")

        f.close()

def sum_row(inq, outq):
    """
    Workers. Consume inq and produce answers on outq
    """
    tot = 0
    for i, row in iter(inq.get, "STOP"):
        outq.put( (i, sum(row)) )
    outq.put("STOP")

def write_output_csv(outfile, numprocs, outq):
    """
    Open outgoing csv file then start reading outq for answers
    Since I chose to make sure output was synchronized to the input there
    is some extra goodies to do that.

    Obviously your input has the original row number so this is not
    required.
    """

    cur = 0
    stop = 0
    buffer = {}
    # For some reason csv.writer works badly across threads so open/close
    # and use it all in the same thread or else you'll have the last
    # several rows missing
    f = open(outfile, 'wb')
    out_csvfile = csv.writer(f)

    #Keep running until we see numprocs STOP messages
    for works in range(numprocs):
        for i, val in iter(outq.get, "STOP"):
            # verify rows are in order, if not save in buffer
            if i != cur:
                buffer[i] = val
            else:
                #if yes are write it out and make sure no waiting rows exist
                out_csvfile.writerow( [i, val] )
                cur += 1
                while cur in buffer:
                    out_csvfile.writerow([ cur, buffer[cur] ])
                    del buffer[cur]
                    cur += 1
    f.close()

if __name__ == '__main__':

    startTime = datetime.now()
    main(4, 'random_ints.csv', 'random_ints_sums.csv')
    print 'done'
    print(datetime.now()-startTime)

Tags: csvthecsvfileinselfforinputdef
1条回答
网友
1楼 · 发布于 2024-09-28 03:20:59

在进程间传递对象需要在发送端“pickle”它(创建对象的字符串表示形式),并在接收端“unpickle”(从字符串表示重新创建同构对象)。除非您确切地知道自己在做什么,否则应该坚持传递内置的Python类型(字符串、int、float、list、dict,…)或由multiprocessingLock()Queue(),…)实现的类型。否则机会很好,泡菜舞就没用了。在

传递一个打开的文件是不可能的,更不用说包装在另一个对象中的打开文件了(比如由csv.reader(f)返回的)。运行代码时,收到来自pickle的错误消息:

pickle.PicklingError: Can't pickle <type '_csv.reader'>: it's not the same object as _csv.reader

你不是吗?永远不要忽视错误,除非你知道自己在做什么。在

解决方案很简单:正如我在评论中所说的,在工作进程中打开文件,只需传递它的字符串路径。例如,请改用这个:

^{pr2}$

并将manualCSVworker中的所有代码从中取出,并将流程创建行更改为:

pin = multiprocessing.Process(target=manual_parse_input_csv, args=(infile,))

看到了吗?它传递文件路径,一个纯字符串。有效:-)

相关问题 更多 >

    热门问题