将函数应用于将组拆分为多个文件而不连接所有文件的每个组

2024-10-04 11:24:03 发布

您现在位置:Python中文网/ 问答频道 /正文

我的数据来自作为CSV文件导出到GCS bucket的BigQuery,如果文件大小相当大,BigQuery将自动将数据分割成若干块。考虑到时间序列,时间序列可能分散在不同的文件中。我有一个自定义函数,我想应用到每个TimeseriesID

以下是数据的一些限制:

  • 数据按TimeseriesIDTimeID排序
  • 每个文件的行数可能不同,但至少为1行(这是不太可能的)
  • TimeID的开头并不总是0
  • 每个时间序列的长度可能不同,但最多只能分散在2个文件中。没有时间序列分散在3个不同的文件中

以下是说明问题的初始设置:

# Please take note this is just for simplicity. The actual goal is not to calculate mean for all group, but to apply a custom_func to each Timeseries ID
def custom_func(x):
    return np.mean(x) 

# Please take note this is just for simplicity. In actual, I read the file one by one since reading all the data is not possible
df1 = pd.DataFrame({"TimeseriesID":['A','A','A','B'],"TimeID":[0,1,2,4],"value":[10,20,5,30]})
df2 = pd.DataFrame({"TimeseriesID":['B','B','B','C'],"TimeID":[5,6,7,8],"value":[10,20,5,30]})
df3 = pd.DataFrame({"TimeseriesID":['C','D','D','D'],"TimeID":[9,1,2,3],"value":[10,20,5,30]})

如果我只需要concat所有文件,这应该很简单,但问题是如果我concat所有的数据帧,那么它将无法放入内存中

我想要的输出应该与此类似,但是没有concat所有文件

pd.concat([df1,df2,df3],axis=0).groupby('TimeseriesID').agg({"value":simple_func}) 

我也知道vaexdask,但我想暂时坚持使用简单的熊猫。 我也对解决方案持开放态度,包括修改BigQuery以更好地分割文件


Tags: 文件to数据dataframeforisvalue时间
1条回答
网友
1楼 · 发布于 2024-10-04 11:24:03

op提出的使用concat处理数百万条记录的方法对于内存/其他资源来说是过度消耗

我已经用Google Colab Nootebooks测试了OP代码,这是一个糟糕的方法

import pandas as pd
import numpy as np
import time

# Please take note this is just for simplicity. The actual goal is not to calculate mean for all group, but to apply a custom_func to each Timeseries ID

def  custom_func(x):
    return np.mean(x)

# Please take note this is just for simplicity. In actual, I read the file one by one since reading all the data is not possible
df1 = pd.DataFrame({"TimeseriesID":['A','A','A','B'],"TimeID":[0,1,2,4],"value":[10,20,5,30]})
df2 = pd.DataFrame({"TimeseriesID":['B','B','B','C'],"TimeID":[5,6,7,8],"value":[10,20,5,30]})
df3 = pd.DataFrame({"TimeseriesID":['C','D','D','D'],"TimeID":[9,1,2,3],"value":[10,20,5,30]})

start = time.time()
df = pd.concat([df1,df2,df3]).groupby('TimeseriesID').agg({"value":custom_func})
elapsed = (time.time() - start)

print(elapsed)
print(df.head())

输出将是:

0.023952960968017578 
                value 
TimeseriesID A 11.666667 
             B 16.250000 
             C 20.000000 
             D 18.333333

正如您所看到的,“concat”需要时间来处理。由于记录很少,因此未察觉到这一点。 方法如下:

  1. 获取包含要处理的数据的文件。ie:只有可使用的柱
  2. 根据已处理的文件和键值创建字典。如有必要,在必要的文件中获取每个键的值。您可以将结果以json/csv格式存储在“results”目录中:

A.csv will have all key 'A' values ... n.csv will have all key 'n' values

  1. 迭代结果目录,并开始在字典中构建最终输出

{'A': [10, 20, 5], 'B': [30, 10, 20, 5], 'C': [30, 10], 'D': [20, 5, 30]}

  1. 将自定义函数应用于每个键值列表

{'A': 11.666666666666666, 'B': 16.25, 'C': 20.0, 'D': 18.333333333333332}

您可以使用以下代码检查逻辑,我使用json存储数据:

from google.colab import files
import json
import pandas as pd

#initial dataset
df1 = pd.DataFrame({"TimeseriesID":['A','A','A','B'],"TimeID":[0,1,2,4],"value":[10,20,5,30]})
df2 = pd.DataFrame({"TimeseriesID":['B','B','B','C'],"TimeID":[5,6,7,8],"value":[10,20,5,30]})
df3 = pd.DataFrame({"TimeseriesID":['C','D','D','D'],"TimeID":[9,1,2,3],"value":[10,20,5,30]})

#get unique keys and its values
df1.groupby('TimeseriesID')['value'].apply(list).to_json('df1.json')
df2.groupby('TimeseriesID')['value'].apply(list).to_json('df2.json')
df3.groupby('TimeseriesID')['value'].apply(list).to_json('df3.json')

#as this is an example you can download the output as jsons
files.download('df1.json')
files.download('df2.json')
files.download('df3.json')

更新日期:2021年10月6日 我已经针对OPs需求调整了代码。此部分创建细化的文件

from google.colab import files
import json

#you should use your own function to get the data from the file
def retrieve_data(uploaded,file):
  return json.loads(uploaded[file].decode('utf-8'))

#you should use your own function to get a list of files to process
def retrieve_files():
  return files.upload()

key_list =[]
#call a function that gets a list of files to process
file_to_process = retrieve_files()

#read every raw file:
for file in file_to_process: 
  file_data = retrieve_data(file_to_process,file)

  for key,value in file_data.items(): 
    if key not in key_list: 
      key_list.append(key)
      with open(f'{key}.json','w') as new_key_file:
        new_json = json.dumps({key:value})
        new_key_file.write(new_json)

    else:
      with open(f'{key}.json','r+') as key_file:
        raw_json = key_file.read()
        old_json = json.loads(raw_json)
        new_json = json.dumps({key:old_json[key]+value})

        key_file.seek(0)
        key_file.write(new_json)

for key in key_list:
  files.download(f'{key}.json')

print(key_list)

更新日期:2021年10月7日 我已经更新了代码以避免混淆。这部分处理细化的文件

import time
import numpy as np

#Once we get the refined values we can use it to apply custom functions
def custom_func(x):
    return np.mean(x) 

#Get key and data content from single json
def get_data(file_data):
    content = file_data.popitem()
    return content[0],content[1]

#load key list and build our refined dictionary
refined_values = []

#call a function that gets a list of files to process
file_to_process = retrieve_files()

start = time.time()
#read every refined file:
for file in file_to_process: 
  #read content of file n
  file_data = retrieve_data(file_to_process,file)
  
  #parse and apply function per file read
  key,data = get_data(file_data)
  func_output = custom_func(data)

  #start building refined list
  refined_values.append([key,func_output])

elapsed = (time.time() - start)
print(elapsed)
  
df = pd.DataFrame.from_records(refined_values,columns=['TimerSeriesID','value']).sort_values(by=['TimerSeriesID'])
df = df.reset_index(drop=True)
print(df.head())

输出将是:

0.00045609474182128906
  TimerSeriesID      value
0             A  11.666667
1             B  16.250000
2             C  20.000000
3             D  18.333333

总结:

  • 在处理大型数据集时,您应该始终关注将要使用的数据,并将其保持在最低限度。仅使用可行值

  • 由基本运算符或python本机库执行操作时,处理时间更快

相关问题 更多 >