
2024-10-01 09:33:43 发布

以上是样本数据。 数据是根据电子邮件地址排序的,文件非常大,大约1.5Gb



也就是说,如果条目第一次出现,我需要追加1,如果第二次出现,我需要追加2,同样,我的意思是我需要计算文件中电子邮件地址的出现次数,如果电子邮件存在两次或更多,我想要日期之间的差异,请记住日期没有排序,因此我们还必须根据特定的电子邮件地址,我正在寻找一个在python中使用numpy或pandas库或任何其他库的解决方案,可以处理这种类型的海量数据,而不会出现内存溢出异常。我有双核处理器,带有centos 6.3,ram为4GB

  1. Convert the timestamp到秒(从Epoch开始),并使用UNIX sort,使用email和这个新字段(即:sort -k2 -k4 -n -t, < converted_input_file > output_file
  2. 初始化3个变量,EMAILPREV_TIME和{}
  3. 在每一行进行交互,如果遇到新邮件,添加“1,0天”。更新PREV_TIME=timestampCOUNT=1EMAIL=new_email
  4. 下一行:3种可能的情况
    • a) 如果相同的电子邮件,不同的时间戳:计算天数,增量计数=1,更新上一个时间,添加“COUNT,Difference_in_days”
    • b) 如果相同的电子邮件,相同的时间戳:increment COUNT,添加“COUNT,0 day”
    • c) 如果是新邮件,从3开始。在



/usr/bin/gawk -F'","' ' { 
    split("JAN FEB MAR APR MAY JUN JUL AUG SEP OCT NOV DEC", month, " "); 
    for (i=1; i<=12; i++) mdigit[month[i]]=i; 
    print $0 "," mktime(substr($4,6,4) " " mdigit[substr($4,3,3)] " " substr($4,1,2) " 00 00 00"
)}' < input.txt |  /usr/bin/sort -k2 -k7 -n -t, > output_file.txt


"DF","00000000@11111.COM","FLTINT1000130394756","26JUL2010","B2C","6799.2",1280102400 "DF","0001HARISH@GMAIL.COM","NF252022031180","09DEC2010","B2C","3439",1291852800 "DF","0001HARISH@GMAIL.COM","NF251742087846","12DEC2010","B2C","1000",1292112000 "DF","0001HARISH@GMAIL.COM","NF251352240086","22DEC2010","B2C","4006",1292976000





1)从csv中按块读取数据并附加到HDF存储 2) 对存储区的迭代,它创建另一个执行合并器的存储区


这应该是O(num_of_chunks**2)内存和计算时间 在你的情况下,chunksize可以说是1m(或更多)

processing [0] [datastore.h5]
processing [1] [datastore_0.h5]
    count                date  diff                        email
4       1 2011-06-24 00:00:00     0           0000.ANU@GMAIL.COM
1       1 2011-06-24 00:00:00     0          00000.POO@GMAIL.COM
0       1 2010-07-26 00:00:00     0           00000000@11111.COM
2       1 2013-01-01 00:00:00     0         0000650000@YAHOO.COM
3       1 2013-01-26 00:00:00     0       00009.GAURAV@GMAIL.COM
5       1 2011-10-29 00:00:00     0          0000MANNU@GMAIL.COM
6       1 2011-11-21 00:00:00     0    0000PRANNOY0000@GMAIL.COM
7       1 2011-06-26 00:00:00     0  0000PRANNOY0000@YAHOO.CO.IN
8       1 2012-10-25 00:00:00     0          0000RAHUL@GMAIL.COM
9       1 2011-05-10 00:00:00     0            0000SS0@GMAIL.COM
12      1 2010-12-09 00:00:00     0         0001HARISH@GMAIL.COM
11      2 2010-12-12 00:00:00     3         0001HARISH@GMAIL.COM
10      3 2010-12-22 00:00:00    13         0001HARISH@GMAIL.COM
14      1 2012-11-28 00:00:00     0           000AYUSH@GMAIL.COM
15      2 2012-11-29 00:00:00     1           000AYUSH@GMAIL.COM
17      3 2012-12-08 00:00:00    10           000AYUSH@GMAIL.COM
18      4 2012-12-12 00:00:00    14           000AYUSH@GMAIL.COM
13      5 2013-01-25 00:00:00    58           000AYUSH@GMAIL.COM
import pandas as pd
import StringIO
import numpy as np
from time import strptime
from datetime import datetime

# your data
data = """

# read in and create the store
data_store_file = 'datastore.h5'
store = pd.HDFStore(data_store_file,'w')

def dp(x, **kwargs):
    return [ datetime(*strptime(v,'%d%b%Y')[0:3]) for v in x ]

reader = pd.read_csv(StringIO.StringIO(data),names=['x1','email','x2','date','x3','x4'],
                     date_parser=dp, chunksize=chunksize)

for i, chunk in enumerate(reader):
    chunk['indexer'] = chunk.index + i*chunksize

    # create the global index, and keep it in the frame too
    df = chunk.set_index('indexer')

    # need to set a minimum size for the email column
    store.append('data',df,min_itemsize={'email' : 100})


# define the combiner function
def combiner(x):

    # given a group of emails (the same), return a combination
    # with the new data

    # sort by the date
    y = x.sort('date')

    # calc the diff in days (an integer)
    y['diff'] = (y['date']-y['date'].iloc[0]).apply(lambda d: float(d.item().days))
    y['count'] = pd.Series(range(1,len(y)+1),index=y.index,dtype='float64')  

    return y

# reduce the store (and create a new one by chunks)
in_store_file = data_store_file
in_store1 = pd.HDFStore(in_store_file)

# iter on the store 1
for chunki, df1 in enumerate(in_store1.select('data',chunksize=2*chunksize)):
    print "processing [%s] [%s]" % (chunki,in_store_file)

    out_store_file = 'datastore_%s.h5' % chunki
    out_store = pd.HDFStore(out_store_file,'w')

    # iter on store 2
    in_store2 = pd.HDFStore(in_store_file)
    for df2 in in_store2.select('data',chunksize=chunksize):

        # concat & drop dups
        df = pd.concat([df1,df2]).drop_duplicates(['email','date'])

        # group and combine
        result = df.groupby('email').apply(combiner)

        # remove the mi (that we created in the groupby)
        result = result.reset_index('email',drop=True)

        # only store those rows which are in df2!
        result = result.reindex(index=df2.index).dropna()

        # store to the out_store
        out_store.append('data',result,min_itemsize={'email' : 100})
    in_store_file = out_store_file


# show the reduced store
print pd.read_hdf(out_store_file,'data').sort(['email','diff'])

