在本地环境和HPC环境中处理巨大的数据帧

2024-09-21 03:22:30 发布

您现在位置:Python中文网/ 问答频道 /正文

我想把一个在本地环境下运行的递归大数据过滤程序变成一个可以并行运行最昂贵进程的程序

这个问题很复杂,需要一些背景信息——所以请耐心听我说:)我是用Python3.7写的

我正在使用statsmodels.outliers的VIF过滤方法来修剪一个庞大的数据集。数据是以制表符分隔的,我使用pandas将其加载到DataFrame中。 但是,随着数据集中的列和行数的增加,VIF操作变得非常昂贵。出于数学上的原因,可以不看行的子集,但需要考虑所有列。 我的解决方案是将巨大的数据帧子集到一个较小的数据帧列表中,并对每个数据帧执行VIF计算,但有一个折衷:较小的数据帧处理得更快,但这会降低整个VIF过程的准确性(每个列只看到它自己的子集中的内容)。 为了解决这个问题,我构建了一个递归过程,包括用户定义的迭代次数值,并有一个控制流:片段数据-->;碎片VIF计算-->;重组片段-->;随机化列-->;重复。 一切正常。。。对于测试数据集。对于原始数据集为6000k列的单个迭代,大约需要20分钟。但即使在那个测试用例中,每个片段中的列数也非常小,我想我需要很多次迭代才能获得更高的精确度。 因此,我想:为什么不并行运行VIF进程呢?所以我开始在HPC环境下做这件事。我是一个相对初级的程序员,我知道HPC环境在语法上可以是高度特定的/独占的。但跳转到HPC在很大程度上使我的控制流和代码变得复杂,包括写入同一目录中的多个文件,调用我的脚本数千次(每次提交的作业只运行一个或两个方法),写入、附加到文件,有时在每次迭代中覆盖文件。。。简直是疯了。 更新sbash文件以保持项目目录、HPC核心和脚本之间的控制流顺利运行,我没有任何问题;真正的困难是让我的脚本根据用户输入以高度特定的顺序拉入信息、修改信息并将其写入文件。 如果您有并行运行脚本的经验,请提供建议

以下是我用来在本地环境中分割和过滤数据的方法:

def frag_df(df_snps, og_chunk_delim=len(df_snps.columns) // 15):

    """
    FUNCTION:
    This method takes a DataFrame, evaluates its length, determines a 
    fragmentation size, and separates the DataFrame into chunks. It returns a 
    list of fragmented DataFrames.

    INPUTS:
    df_snps --> A DataFrame object, ideally one pre-processed by df_snps() [an 
    earlier method]

    RETURNS:
    df_list --> a list of fragment DataFrames
    """

    df_list = []
    # Subset df by all SNP predictor columns and find the total number of SNPs 
    in the infile.
    snp_count = len(df_snps.columns)

    # Create counters to be used by an iterative loop (for local 
    applications).

    snp_counter = 0
    fragment_counter = 0
    chunk_delim = og_chunk_delim
    num_chunks = snp_count / chunk_delim
    num_chunks = int(math.ceil(num_chunks))
    print(snp_counter, chunk_delim, num_chunks)

    # Iterate through the snp_count DataFrame and split it into chunks.

    print('\n' 'SNPs fragmenting into list of smaller DataFrames ...' '\n')
    while fragment_counter < num_chunks:
        print('Building Fragment #', fragment_counter + 1, 'from position',
              snp_counter, 'to', chunk_delim)
        df_list.append(df_snps.iloc[:, snp_counter:chunk_delim])

    # Move snp_counter up by specified chunk_delim (Defaults to 50 SNPs).
        snp_counter += og_chunk_delim
        chunk_delim += og_chunk_delim
        fragment_counter += 1
        df_list.append(df_snps.iloc[:, snp_counter:])

        print('\n', 'SNP fragmentation complete. Proceeding to VIF analysis.')

    return df_list

df_list = frag_df(df_snps)

def vif_calc(df_list, threshold=3.0):

    """
    FUNCTION: This method takes a list of DataFrame objects and conducts VIF 
    analysis on each of them, dropping columns based on some VIF threshold.

    INPUTS:
    df_list --> A list of DataFrame objects processed by frag_df.
    threshold --> The VIF threshold by which columns are to be evaluated.

    RETURNS:
    vif_list --> A list of DataFrames without multicolinear predictor columns.
    """

    df_index = 0
    drop_counter = 0
    filtered_list = df_list
    for df in filtered_list:
        if df.empty:
            del df
    print('\n Iterating through all DataFrames in the passed list.')
    print('\n Dropping columns with a VIF threshold greater than', threshold, 
    '... \n')

        # Create a list of indices corresponding to each column in a given 
        chunk.
        variables = list(range(df.shape[1]))
        df_index += 1
        dropped = True
        try:
            while dropped:
                vif = [variance_inflation_factor(df.iloc[:, variables].values, 
                var) for var in variables]
                if max(vif) < threshold:
                    dropped = False
                    print('\n' 'Fragment #', df_index, 'has been VIF 
                    filtered.','Checking list for next DataFrame ...' '\n')
                    break
                else:
                    max_loc = vif.index(max(vif))
                    if max(vif) > threshold:
                        g = (float("{0:.2f}".format(max(vif))))
                        print('Dropping', df.iloc[:, 
                        variables].columns[max_loc],'at index', str(max_loc + 
                        1), 'within Chunk #', df_index, 'due to VIF of', g)

                        df.drop(df.columns[variables[max_loc]], 1, 
                        inplace=True)
                        variables = list(range(df.shape[1]))
                        dropped = True
                        drop_counter += 1

        except ValueError:
            max_loc = 0

    return filtered_list

filtered_list = vif_calc(df_list, 2.5)

下面是我用来递归运行脚本的内容:

def递归(剩余,df\u shuffled):

"""
FUNCTION: This method specifies a number of times to call the other 
methods in the program.

INPUTS:
remaining --> The number of iterations that the user would like to run. 
More iterations = more chances for columns to see other columns.
df_shuffled --> A DataFrame with randomized columns to be used in the 
next iteration of the program.

RETURNS:
df_final --> A DataFrame ready for downstream machine learning analysis.
"""

if remaining == 1:
    print("Recursive VIF filtering complete!", '\n'
    "Here is a preview of your data:", '\n')
    df_final = df_shuffled
    print('\n' 'In this iteration, a total of', col_loss, 'columns were 
    trimmed from the data file.')
    print(df_final.head())
    return df_final

else:
    df_list = frag_df(df_shuffled)
    vif_list = vif_calc(df_list, 2.0)
    print('\n' "All done filtering this iteration! There are", remaining - 
    2, "iterations left.", '\n')
    print('Reconstituting and running next iteration ...')
    df_recon = reconstitute_df(vif_list)
    recursion(remaining - 1, df_recon)
# All of the above is in working order. Here is the type of output I'd get:

SNPs fragmenting into list of smaller DataFrames ...

Building Fragment # 1 from position 0 to 31
Building Fragment # 2 from position 31 to 62
Building Fragment # 3 from position 62 to 93
Building Fragment # 4 from position 93 to 124
Building Fragment # 5 from position 124 to 155
Building Fragment # 6 from position 155 to 186
Building Fragment # 7 from position 186 to 217
Building Fragment # 8 from position 217 to 248
Building Fragment # 9 from position 248 to 279
Building Fragment # 10 from position 279 to 310
Building Fragment # 11 from position 310 to 341
Building Fragment # 12 from position 341 to 372
Building Fragment # 13 from position 372 to 403
Building Fragment # 14 from position 403 to 434
Building Fragment # 15 from position 434 to 465
Building Fragment # 16 from position 465 to 496

 Iterating through all DataFrames in the passed list.

 Dropping columns with a VIF threshold greater than 2.5 ...

Dropping AGE at index 2 within Chunk # 1 due to VIF of 16.32
Dropping UPSIT at index 2 within Chunk # 1 due to VIF of 7.07
Dropping snp164_C at index 5 within Chunk # 1 due to VIF of 2.74
Dropping snp139_T at index 19 within Chunk # 1 due to VIF of 2.52

Fragment # 1 has been VIF filtered. Checking list for next DataFrame ...

Dropping snp499_C at index 9 within Chunk # 2 due to VIF of 2.81
Dropping snp30_C at index 4 within Chunk # 2 due to VIF of 2.78
Dropping snp424_A at index 29 within Chunk # 2 due to VIF of 2.73
Dropping snp32_C at index 10 within Chunk # 2 due to VIF of 2.53

Fragment # 2 has been VIF filtered. Checking list for next DataFrame ...

Dropping snp483_T at index 31 within Chunk # 3 due to VIF of 2.84
Dropping snp350_T at index 26 within Chunk # 3 due to VIF of 2.6
Dropping snp150_A at index 28 within Chunk # 3 due to VIF of 2.55

``````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````
# When I tried to move to the HPC environment, things got crazy.
I only wanted to have vif_calc() run in parallel; everything else only needed 1 core. So I designed a sort of 'flag' system, and had my methods run only if the flag was set to a certain value. The dilemma was, I had to write the files that called my script using THIS SAME SCRIPT! And even worse, vif_calc() couldn't rely on the script anymore to supply the DataFrame to filter, instead I had to dump the fragments into the working directory. Here's how I set it up:

    def init_pipeline():

        parser = argparse.ArgumentParser()

        parser.add_argument("datafile", type=str, default='discrete.dataforml',
                            help="Please enter the name of a CSV file to be 
                            processed.")
        parser.add_argument("fragment", type=int, choices=range(50, 300), 
                           default=50, help="DataFrame fragment size. Smaller 
                           fragments processed faster, but require more 
                           iterations.")
        parser.add_argument("-fg", "--flag",  type=int, choices=range(1, 5), 
                           default=1, help="Specify which part of the script to 
                           run this iteration. Defaults to pre-processing.")
        parser.add_argument("-th", "--threshold", type=float, choices=range(2, 6), 
                           default=4.0, help="Specify VIF filtering threshold. 
                           Columns exceeding it will be dropped from the 
                           DataFrame.")
        parser.add_argument("-re", "--recursion", type=int, choices=range(0, 50), 
                           default=0, help="Recursive filtering. Choose # of 
                           iterations from 0 to 50. Default is no recursion.")

        args = parser.parse_args()
        data = args.datafile
        thresh = args.threshold
        recur = args.recursion
        frag = args.fragment
        flag = args.flag

# I passed the arguments into a dictionary so every single time the script was called (once per submitted job), it'd have what it needed.

    arguments = dict([('file', data), ('frag', frag), ('flag', flag), 
    ('vif_threshold', thresh), ('iters', recur)])
    return arguments

`````````````````````````````````````````````````````````````````````````````

# Then I ran into many complicated problems. How could I get the fragmented DataFrames to and from files and get different calls to this same script to effectively communicate with one another? Solution was basically: write to other files (master.sh and swarm_iter.txt) to handle the control flow in the HPC environment.

    def frag_df(df_snps, flag, og_chunk_delim=50):

        if flag == 1 or 2:

            df_list = []
            # Subset df by all SNP predictor columns and find the total number of 
            SNPs in the infile.
            snp_count = len(df_snps)
            # Create counters to be used by an iterative loop (for local 
            applications).
            snp_counter = 0
            num_chunks = 1
            chunk_delim = og_chunk_delim
            swarm_counter = 0

            # Iterate through the snp_count DataFrame and split it into chunks.
            print('\n' 'SNPs fragmenting into list of smaller DataFrames ...')
            while chunk_delim + og_chunk_delim <= snp_count:
                df_list.append(df_snps.iloc[:, snp_counter:chunk_delim])
                # print(cov_snps.iloc[:,snp_counter:chunk_delim])
                # Move snp_counter up by specified chunk_delim (Defaults to 50 
                SNPs).
                snp_counter += og_chunk_delim
                chunk_delim += og_chunk_delim
                num_chunks += 1

            print('\n', 'SNP fragmentation complete. Proceeding to VIF analysis.')

            # Now use the fragments in df_list to write/overwrite the .swarm file 
            in the directory.
            # Use the .swarm file to create a series of temporary txt files 
            corresponding to each fragment.
            # These files can be deleted or overwritten after the VIF_filtering 
            process.

            swarm_writer = open('swarm_iter.txt', "w")
            df_list_inc = len(df_list)
            while swarm_counter < df_list_inc:
                if len(df_list) == 0:
                    return df_list
                else:
                    fragment_list = []     # A list containing names of file 
                    objects (each is a txt DataFrame).
                    frag_writer_inc = swarm_counter
                    df_frag_writer = open('df_' + str(frag_writer_inc+1) + '.txt', 
                                     "w")
                    df_frag_writer.write(str(df_list[swarm_counter]))
                    fragment_list.append(str('df_' + str(frag_writer_inc+1) + 
                                        '.txt'))
                    df_frag_writer.close()

                    # Write a line to the swarm file - ensure that flag == 3 so 
                    only vif_calc() is called.

                    for DataFrame in fragment_list:
                        swarm_line = 'python filter_final_swarm.py', DataFrame, 
                                     '50 3 -re 2' '\n'
                        swarm_writer.write(str(swarm_line))
                        swarm_counter += 1
                        swarm_writer.close()

            # Finally, append new lines to master.sh - the swarm command and bash 
            commands used downstream!
            # Ensure that it's dependent on the previous job's completion (in 
            which the flag == 1 or 2).
            job_counter = 1
            master_writer = open('master.sh', "a+")
            swarm_command = str('jobid' + str(job_counter+1) + '=$(swarm -f 
                            swarm_iter.txt --dependancy afterany:$jobid' + 
                            str(job_counter) + '--module python -g 3.0 -b 30)' 
                            '\n')
            nxt_sbash = str('jobid' + str(job_counter+2) + '=$(sbatch -- 
                        dependancy=afterany:$jobid' + str(job_counter+1) + 'python 
                        filter_final_swarm.py 50 4' '\n')
            master_writer.write(swarm_command)
            master_writer.write(nxt_sbash)
            master_writer.close()

``````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````

 # So in this version, a list of file names was being passed to vif_calc() instead of a list of DataFrames.

    def vif_calc(comp_df_lists_dict, flag, threshold=5.0):
        if flag == 3:
            df_dir_files = len(fnmatch.filter(os.listdir(r'C:\Users\thomasauw\VIF- 
       Filter-master'), 'df_*'))
            df_index = 0
            drop_counter = 0

            filtered_list = comp_df_lists_dict.get('df_file_list')
            print('\n Iterating through all DataFrames in the passed list.')
            print('\n Dropping columns with a VIF threshold greater than', 
                 threshold, '.''\n')
            for file in filtered_list:
                active_df = open(file, 'r')
                df = pd.read_csv(active_df, sep="\t")

                # Create a list of indices corresponding to each column in a given 
                chunk.
                variables = list(range(df.shape[1]))
                df_index += 1
                dropped = True
                try:
                    while dropped:
                        vif = [variance_inflation_factor(df.iloc[:, 
                              variables].values, var) for var in variables]
                        if max(vif) < threshold:
                            dropped = False

                        # Now the method must overwrite the DataFrames it took in 
                        with FILTERED DataFrames. In this version, the 'list' has 
                        just a single DataFrame element, and the script is taking  
                        in a different file each time (being called many times).

                            filtered_df_frag_writer = open('df_' + str(df_index), 
                            '\n' "w")
                            filtered_df_frag_writer.write(df)
                            filtered_df_frag_writer.close()

                            print('\n' 'Fragment #', df_index, 'has been VIF 
                            filtered. Checking list for next DataFrame ...' '\n')

                            break

                         else:
                            max_loc = vif.index(max(vif))
                            if max(vif) > threshold:
                                g = (float("{0:.2f}".format(max(vif))))
                                print('Dropping', df.iloc[:, 
                                variables].columns[max_loc], 'at index', 
                                str(max_loc + 1), 'within Chunk #', df_index,
                                'due to VIF of', g)

                                df.drop(df.columns[variables[max_loc]], 1, 
                                inplace=True)

                                variables = list(range(df.shape[1]))
                                dropped = True
                                drop_counter += 1

                except ValueError:
                    max_loc = 0

            return filtered_list

        # If the flag requirement isn't met, simply return what was passed as 
        the list of DataFrames when the argument was called in the first place.

        else:
            return comp_df_lists_dict


    vif_list = vif_calc(comp_df_lists_dict, arg_holder.get('flag'), 
                        arg_holder.get('vif_threshold'))

`````````````````````````````````````````````````````````````````````````````
`````````````````````````````````````````````````````````````````````````````
What I'm really looking for first and foremost is advice on how to approach this problem. For this specific case though, the error I've run into presently seems to be one with the statsapi VIF method itself. I've succeeded in getting the script to write a master.sh file, a swarm_iter.txt file, and the DataFrame files that vif_calc() needs. All of those files are in the working directory when this command is run:

`````````````````````````````````````````````````````````````````````````````

    python filter_final_swarm.py discrete.dataForML 50 -fg 3 -re 2

`````````````````````````````````````````````````````````````````````````````

# Then, here is the result (note that with a flag == 3, the earlier methods responsible for fragmenting the data have already done their job. Assume that the HPC environment submitted that job successfully.

`````````````````````````````````````````````````````````````````````````````

SNPs fragmenting into list of smaller DataFrames ...

 SNP fragmentation complete. Proceeding to VIF analysis.

 Iterating through all DataFrames in the passed list.

 Dropping columns with a VIF threshold greater than 4.0 .

Traceback (most recent call last):
  File "filter_final_swarm.py", line 315, in <module>
    vif_list = vif_calc(comp_df_lists_dict, arg_holder.get('flag'), arg_holder.get('vif_threshold'))
  File "filter_final_swarm.py", line 270, in vif_calc
    vif = [variance_inflation_factor(df.iloc[:, variables].values, var) for var in variables]
  File "filter_final_swarm.py", line 270, in <listcomp>
    vif = [variance_inflation_factor(df.iloc[:, variables].values, var) for var in variables]
  File "C:\Users\thomasauw\AppData\Local\Continuum\anaconda3\envs\Py37\lib\site-packages\statsmodels\stats\outliers_influence.py", line 184, in variance_inflation_factor
    r_squared_i = OLS(x_i, x_noti).fit().rsquared
  File "C:\Users\thomasauw\AppData\Local\Continuum\anaconda3\envs\Py37\lib\site-packages\statsmodels\regression\linear_model.py", line 838, in __init__
    hasconst=hasconst, **kwargs)
  File "C:\Users\thomasauw\AppData\Local\Continuum\anaconda3\envs\Py37\lib\site-packages\statsmodels\regression\linear_model.py", line 684, in __init__
    weights=weights, hasconst=hasconst, **kwargs)
  File "C:\Users\thomasauw\AppData\Local\Continuum\anaconda3\envs\Py37\lib\site-packages\statsmodels\regression\linear_model.py", line 196, in __init__
    super(RegressionModel, self).__init__(endog, exog, **kwargs)
  File "C:\Users\thomasauw\AppData\Local\Continuum\anaconda3\envs\Py37\lib\site-packages\statsmodels\base\model.py", line 216, in __init__
    super(LikelihoodModel, self).__init__(endog, exog, **kwargs)
  File "C:\Users\thomasauw\AppData\Local\Continuum\anaconda3\envs\Py37\lib\site-packages\statsmodels\base\model.py", line 68, in __init__
    **kwargs)
  File "C:\Users\thomasauw\AppData\Local\Continuum\anaconda3\envs\Py37\lib\site-packages\statsmodels\base\model.py", line 91, in _handle_data
    data = handle_data(endog, exog, missing, hasconst, **kwargs)
  File "C:\Users\thomasauw\AppData\Local\Continuum\anaconda3\envs\Py37\lib\site-packages\statsmodels\base\data.py", line 635, in handle_data
    **kwargs)
  File "C:\Users\thomasauw\AppData\Local\Continuum\anaconda3\envs\Py37\lib\site-packages\statsmodels\base\data.py", line 80, in __init__
    self._handle_constant(hasconst)
  File "C:\Users\thomasauw\AppData\Local\Continuum\anaconda3\envs\Py37\lib\site-packages\statsmodels\base\data.py", line 125, in _handle_constant
    if not np.isfinite(ptp_).all():
TypeError: ufunc 'isfinite' not supported for the input types, and the inputs could not be safely coerced to any supported types according to the casting rule ''safe''

# I've confirmed that 'df' is a DataFrame object, read in by looking at each 'active_df' file in the working directory in succession, and reading it into 'df'. If I were to continue down this path of insanity (please tell me that isn't the right thing to do here), how would I solve this problem? I'd expect the VIF filter to work normally and overwrite each file in succession (for later recombination/randomization/refragmentation).


Tags: ofthetoindataframedfindexcounter

热门问题