并行化for循环中的函数

2024-09-14 18:12:54 发布

您现在位置:Python中文网/ 问答频道 /正文

我目前正在做一项非常耗时的任务。我需要在for循环中运行一些并行运行的函数。这是我当前的代码,下面是我认为正确的解决方案

for year in tqdm(years):
    
    temp_df = df[df.label <= year]
    processed_df = id_df.copy()
    G = nx.DiGraph()
    G.add_edges_from(temp_df.iloc[:,:2].values.tolist())
    
    # Degree Centrality
    DegreeCentrality = degree_centrality(G)
    DegreeCentrality_df = pd.DataFrame(DegreeCentrality.items(), columns=['id', 'DegreeCentrality'])
    processed_df = pd.merge(processed_df, DegreeCentrality_df, how='left', on='id').fillna(0)
    
    del DegreeCentrality
    del DegreeCentrality_df
    gc.collect()
    
    # In Degree Centrality
    InDegreeCentrality = in_degree_centrality(G)
    InDegreeCentrality_df = pd.DataFrame(InDegreeCentrality.items(), columns=['id', 'InDegreeCentrality'])
    processed_df = pd.merge(processed_df, InDegreeCentrality_df, how='left', on='id').fillna(0)
    
    del InDegreeCentrality
    del InDegreeCentrality_df
    gc.collect()

    # PageRank
    PageRank = pagerank(G)
    PageRank_df = pd.DataFrame(PageRank.items(), columns=['id', 'PageRank'])
    processed_df = pd.merge(processed_df, PageRank_df, how='left', on='id').fillna(0)
    
    del PageRank
    del PageRank_df
    gc.collect()

   processed_df.to_csv('properties_{}'.format(year), index=False)

我的解决方案:

from multiprocessing import Process

def deg(G):
    DegreeCentrality = degree_centrality(G)
    DegreeCentrality_df = pd.DataFrame(DegreeCentrality.items(), columns=['id', 'DegreeCentrality'])
    processed_df = pd.merge(processed_df, DegreeCentrality_df, how='left', on='id').fillna(0)
    
    del DegreeCentrality
    del DegreeCentrality_df
    gc.collect()
    
    
def indge(G):
    InDegreeCentrality = in_degree_centrality(G)
    InDegreeCentrality_df = pd.DataFrame(InDegreeCentrality.items(), columns=['id', 'InDegreeCentrality'])
    processed_df = pd.merge(processed_df, InDegreeCentrality_df, how='left', on='id').fillna(0)
    
    del InDegreeCentrality
    del InDegreeCentrality_df
    gc.collect()
    
def pgr(G):
    # PageRank
    PageRank = pagerank(G)
    PageRank_df = pd.DataFrame(PageRank.items(), columns=['id', 'PageRank'])
    processed_df = pd.merge(processed_df, PageRank_df, how='left', on='id').fillna(0)
    
    del PageRank
    del PageRank_df
    gc.collect()

for year in tqdm(years):
    
    temp_df = df[df.label <= year]
    processed_df = id_df.copy()
    G = nx.DiGraph()
    G.add_edges_from(temp_df.iloc[:,:2].values.tolist())
   
    if __name__=='__main__':
    
        # Degree Centrality
        p1 = Process(target=deg(G))
        p1.start()
        # In Degree Centrality
        p2 = Process(target=indeg(G))
        p2.start()
        # PageRank 
        p3 = Process(target=pgr(G))
        p3.start()


   processed_df.to_csv('properties_{}'.format(year), index=False)

我的问题是: 这是正确的吗? 数据帧会合并吗?是的,它们会按这个顺序合并吗? 我有大图,这会导致内存问题吗

谢谢大家!