我有一个包含文件子目录的目录,并从每个子目录中的文件中提取熊猫数据帧形式的信息,然后使用多重处理将每个子目录中的pandas数据帧重新连接到一个csv文件中。我使用一个队列来存储每个进程返回的本地数据帧,并将它们附加到文件中,以避免写入冲突。这是我的代码:
def work(cmd, q):
df_local = function_which_returns_dataframe(cmd)
if not df_local.empty:
q.put(df_local)
else:
print("Empty:", cmd)
def listener(file, q):
while True:
line = q.get()
if isinstance(line, pd.DataFrame):
line.to_csv(file, mode='a', header=False)
elif line == 'kill':
return
def main(args):
cpus = multiprocessing.cpu_count()
patient_dirs = [os.path.join(args.input_dir, x) for x in os.listdir(args.input_dir)]
threads = []
file = os.path.join(args.output_dir, 'concepts_all_%s.csv' % identifier)
#setup manager with write access to file
manager = multiprocessing.Manager()
q = manager.Queue()
header_df = pd.DataFrame(columns=['patient_id', 'lookup_id', 'begin_inx', 'end_inx', 'mention_type', 'codingScheme', 'code', 'preferredText', 'word_phrase'])
header_df.loc[len(header_df)] = ['patient_id', 'lookup_id', 'begin_inx', 'end_inx', 'mention_type', 'codingScheme', 'code', 'preferredText', 'word_phrase']
q.put(header_df)
#start write process
writer_process = multiprocessing.Process(target=listener, args=(file, q))
writer_process.start()
# now spawn processes from each patient dir*
while threads or patient_dirs:
if (len(threads) < cpus) and patient_dirs:
p = multiprocessing.Process(target=work, args=[patient_dirs.pop(), q])
p.start()
threads.append(p)
else:
for thread in threads:
if not thread.is_alive():
threads.remove(thread)
#finish write
q.put('kill')
writer_process.join()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('input_dir', type=str)
parser.add_argument('output_dir', type=str)
args = parser.parse_args()
main(args)
此代码在较小的测试目录下运行良好,但在较大的目录上运行一段时间后,我开始收到以下错误消息:
^{pr2}$队列的大小是否受到限制,或者我是否未正确设置写出方法?这是否与处理返回空数据帧的情况有关?在
使用^{} 代替它,它的语法与pandas相似,只是它的parallel(基本上它有很多并行pandas datafame)和lazy(这有助于避免ram限制)。在
如果要提取的文件是
csv
类型,请执行以下操作:如果文件的类型不同,
pandas
可以读取它们,dask
很可能也可以read它们。在对于
^{pr2}$xml
它看起来像这样:我建议您阅读this,并为
meta
提供一个from_delayed
如果要将
ddf
转换为pandas.DataFrame
,只需执行以下操作:相关问题 更多 >
编程相关推荐