Python多处理队列thows windows错误:系统找不到指定的文件

2024-09-29 02:24:31 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个包含文件子目录的目录,并从每个子目录中的文件中提取熊猫数据帧形式的信息,然后使用多重处理将每个子目录中的pandas数据帧重新连接到一个csv文件中。我使用一个队列来存储每个进程返回的本地数据帧,并将它们附加到文件中,以避免写入冲突。这是我的代码:

def work(cmd, q):
    df_local = function_which_returns_dataframe(cmd)
    if not df_local.empty:
        q.put(df_local)
    else:
        print("Empty:", cmd)

def listener(file, q):
    while True:
        line = q.get()
        if isinstance(line, pd.DataFrame):
            line.to_csv(file, mode='a', header=False)
        elif line == 'kill':
            return

def main(args):    
    cpus = multiprocessing.cpu_count()
    patient_dirs = [os.path.join(args.input_dir, x) for x in os.listdir(args.input_dir)]
    threads = []
    file = os.path.join(args.output_dir, 'concepts_all_%s.csv' % identifier)

    #setup manager with write access to file
    manager = multiprocessing.Manager()
    q = manager.Queue()
    header_df = pd.DataFrame(columns=['patient_id', 'lookup_id', 'begin_inx', 'end_inx', 'mention_type', 'codingScheme', 'code', 'preferredText', 'word_phrase'])
    header_df.loc[len(header_df)] = ['patient_id', 'lookup_id', 'begin_inx', 'end_inx', 'mention_type', 'codingScheme', 'code', 'preferredText', 'word_phrase']
    q.put(header_df)

    #start write process
    writer_process = multiprocessing.Process(target=listener, args=(file, q))
    writer_process.start()

    # now spawn processes from each patient dir*
    while threads or patient_dirs:
        if (len(threads) < cpus) and patient_dirs:
            p = multiprocessing.Process(target=work, args=[patient_dirs.pop(), q])
            p.start()
            threads.append(p)
        else:
            for thread in threads:
                if not thread.is_alive():
                    threads.remove(thread)

    #finish write
    q.put('kill')
    writer_process.join()


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('input_dir', type=str)
    parser.add_argument('output_dir', type=str)
    args = parser.parse_args()
    main(args)

此代码在较小的测试目录下运行良好,但在较大的目录上运行一段时间后,我开始收到以下错误消息:

^{pr2}$

队列的大小是否受到限制,或者我是否未正确设置写出方法?这是否与处理返回空数据帧的情况有关?在


Tags: 文件数据iddfifdirlineargs
1条回答
网友
1楼 · 发布于 2024-09-29 02:24:31

使用^{}代替它,它的语法与pandas相似,只是它的parallel(基本上它有很多并行pandas datafame)和lazy(这有助于避免ram限制)。在

如果要提取的文件是csv类型,请执行以下操作:

from dask.distributed import Client  
import dask.dataframe as dd

client = Client() #  ensures multiprocessing

ddf = dd.read_csv(r'sub\**\*.csv')  # reads all the csv files inside of the subdirectories of the subdirectories

如果文件的类型不同,pandas可以读取它们,dask很可能也可以read它们。在

对于xml它看起来像这样:

^{pr2}$

我建议您阅读this,并为meta提供一个from_delayed

如果要将ddf转换为pandas.DataFrame,只需执行以下操作:

df = ddf.compute() 

相关问题 更多 >