在Python中多个线程写入同一CSV

2024-09-28 22:19:05 发布

您现在位置:Python中文网/ 问答频道 /正文

我是Python中的多线程新手,目前正在编写一个附加到csv文件的脚本。如果我要将多个线程提交到一个concurrent.futures.ThreadPoolExecutor中,该concurrent.futures.ThreadPoolExecutor将行追加到csv文件中。如果追加是这些线程执行的唯一与文件相关的操作,我可以做些什么来保证线程安全?

我的代码的简化版本:

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    for count,ad_id in enumerate(advertisers):

        downloadFutures.append(executor.submit(downloadThread, arguments.....))
        time.sleep(random.randint(1,3)) 

我的线程类是:

def downloadThread(arguments......):

                #Some code.....

                writer.writerow(re.split(',', line.decode()))

我应该设置一个单独的单线程执行器来处理写操作,还是担心我只是附加?

编辑:我应该详细说明,当写操作发生时,在下一次附加文件之间的时间间隔可能会有很大的变化,我只是担心在测试我的脚本时没有发生这种情况,我更希望对此进行讨论。


Tags: 文件csv代码版本脚本with线程arguments
3条回答

迟到参与方注意:您可以用不同的方式来处理这个问题,不需要锁定,方法是让一个writer从共享队列中使用,由执行处理的线程将行推送到队列中。

from threading import Thread
from queue import Queue
from random import randint
from concurrent.futures import ThreadPoolExecutor


# CSV writer setup goes here

queue = Queue()


def consume():
    while True:
        if not queue.empty():
            i = queue.get()

            # Row comes out of queue; CSV writing goes here

            print(i)
            if i == 4999:
                return


consumer = Thread(target=consume)
consumer.setDaemon(True)
consumer.start()


def produce(i):
    # Data processing goes here; row goes into queue
    queue.put(i)


with ThreadPoolExecutor(max_workers=10) as executor:
    for i in range(5000):
        executor.submit(produce, i)

consumer.join()

下面是一些代码,它还处理引起unicode问题的头痛:

def ensure_bytes(s):
    return s.encode('utf-8') if isinstance(s, unicode) else s

class ThreadSafeWriter(object):
'''
>>> from StringIO import StringIO
>>> f = StringIO()
>>> wtr = ThreadSafeWriter(f)
>>> wtr.writerow(['a', 'b'])
>>> f.getvalue() == "a,b\\r\\n"
True
'''

def __init__(self, *args, **kwargs):
    self._writer = csv.writer(*args, **kwargs)
    self._lock = threading.Lock()

def _encode(self, row):
    return [ensure_bytes(cell) for cell in row]

def writerow(self, row):
    row = self._encode(row)
    with self._lock:
        return self._writer.writerow(row)

def writerows(self, rows):
    rows = (self._encode(row) for row in rows)
    with self._lock:
        return self._writer.writerows(rows)

# example:
with open('some.csv', 'w') as f:
    writer = ThreadSafeWriter(f)
    writer.write([u'中文', 'bar'])

更详细的解决方案是here

我不确定csvwriter是否是线程安全的。documentation没有指定,因此为了安全起见,如果多个线程使用同一个对象,则应该使用threading.Lock来保护使用:

# create the lock
import threading
csv_writer_lock = threading.Lock()

def downloadThread(arguments......):
    # pass csv_writer_lock somehow
    # Note: use csv_writer_lock on *any* access
    # Some code.....
    with csv_writer_lock:
        writer.writerow(re.split(',', line.decode()))

这就是说,对于downloadThread来说,将写任务提交给执行器,而不是像这样显式地使用锁,确实更为优雅。

相关问题 更多 >