如何在事件循环结束时为并发未来的所有线程调用类方法?

2024-10-02 16:25:17 发布

您现在位置:Python中文网/ 问答频道 /正文

假设我有以下要创建的类,将类属性绑定到每个线程(无论是由concurrent.futures还是其他方式创建的):

import pandas as pd 
from concurrent.futures import ThreadPoolExecutor

def ThreadSafeIOClass(threading.local):
    self.df = pd.DataFrame(columns=["col1", "col2"]) 
    self.file_name_prefix = f"my_artifact_{threading.get_ident()}" 

def update_df(row):
    self.df = self.df.append(row, ignore_index=True) 

def flush():
    with open(f"{self.file_name_prefix}.csv", "w") as f:
        self.df.to_csv(f, "w")

def close():
    self.flush() 

在典型的concurrent.futures循环中,您将有:

   with ThreadPoolExecutor(4) as executor:
       cl = ThreadSafeIOClass()
       for row in rows:
           executor.submit(cl.update_df, row) 
   cl.close() 

现在困扰我的是cl.close()不会清除由concurrent.futures创建的所有线程中的所有内存剩余,导致在持久性步骤的末尾丢失一些数据

我真的不需要锁定这个类,但仍然要使它线程安全,这可行吗?如何解决这个问题

(请原谅我在标题中借用了“事件循环”一词,但我认为它实现了我在这里想要实现的目标。)