假设我有以下要创建的类,将类属性绑定到每个线程(无论是由concurrent.futures
还是其他方式创建的):
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
def ThreadSafeIOClass(threading.local):
self.df = pd.DataFrame(columns=["col1", "col2"])
self.file_name_prefix = f"my_artifact_{threading.get_ident()}"
def update_df(row):
self.df = self.df.append(row, ignore_index=True)
def flush():
with open(f"{self.file_name_prefix}.csv", "w") as f:
self.df.to_csv(f, "w")
def close():
self.flush()
在典型的concurrent.futures循环中,您将有:
with ThreadPoolExecutor(4) as executor:
cl = ThreadSafeIOClass()
for row in rows:
executor.submit(cl.update_df, row)
cl.close()
现在困扰我的是cl.close()
不会清除由concurrent.futures
创建的所有线程中的所有内存剩余,导致在持久性步骤的末尾丢失一些数据
我真的不需要锁定这个类,但仍然要使它线程安全,这可行吗?如何解决这个问题
(请原谅我在标题中借用了“事件循环”一词,但我认为它实现了我在这里想要实现的目标。)
目前没有回答
相关问题 更多 >
编程相关推荐