如何从一个大的压缩csv文件将数百万条记录插入mongodb?

2024-09-25 08:26:39 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图在Mongo中插入大约800万条记录,它似乎以每秒1000条的速度插入它们,这是非常慢的。在

代码是用python编写的,所以这可能是python的问题,但我对此表示怀疑。代码如下:

def str2datetime(str):
  return None if (not str or str == r'\N') else datetime.strptime(str, '%Y-%m-%d %H:%M:%S')
def str2bool(str):
  return None if (not str or str == r'\N') else (False if str == '0' else True)
def str2int(str):
  return None if (not str or str == r'\N') else int(str)
def str2float(str):
  return None if (not str or str == r'\N') else float(str)
def str2float2int(str):
  return None if (not str or str == r'\N') else int(float(str) + 0.5)
def str2latin1(str):
  return unicode(str, 'latin-1')

_ = lambda x: x

converters_map = {
  'test_id': str2int,
  'android_device_id': str2int,
  'android_fingerprint': _,
  'test_date': str2datetime,
  'client_ip_address': _,
  'download_kbps': str2int,
  'upload_kbps': str2int,
  'latency': str2int,
  'server_name': _,
  'server_country': _,
  'server_country_code': _,
  'server_latitude': str2float,
  'server_longitude': str2float,
  'client_country': _,
  'client_country_code': _,
  'client_region_name': str2latin1,
  'client_region_code': _,
  'client_city': str2latin1,
  'client_latitude': str2float,
  'client_longitude': str2float,
  'miles_between': str2float2int,
  'connection_type': str2int,
  'isp_name': _,
  'is_isp': str2bool,
  'network_operator_name': _,
  'network_operator': _,
  'brand': _,
  'device': _,
  'hardware': _,
  'build_id': _,
  'manufacturer': _,
  'model': str2latin1,
  'product': _,
  'cdma_cell_id': str2int,
  'gsm_cell_id': str2int,
  'client_ip_id': str2int,
  'user_agent': _,
  'client_net_speed': str2int,
  'iphone_device_id': str2int,
  'carrier_name': _,
  'iso_country_code': _,
  'mobile_country_code': str2int,
  'mobile_network_code': str2int,
  'model': str2latin1,
  'version': _,
  'server_sponsor_name': _,
}

def read_csv_zip(path):
  with ZipFile(path) as z:
    with z.open(z.namelist()[0]) as input:
      r = csv.reader(input)
      header = r.next()
      converters = tuple((title if title != 'test_id' else '_id', converters_map[title]) for title in header)
      for row in r:
        row = {converter[0]:converter[1](value) for converter, value in zip(converters, row)}
        yield row

argv = [x for x in argv if not x == '']
if len(argv) == 1:
  print("Usage: " + argv[0] + " zip-file")
  exit(1)

zip_file = argv[1]
collection_name = zip_file[:zip_file.index('_')]

print("Populating " + collection_name + " with the data from " + zip_file)
with Connection() as connection:
  db = connection.db
  collection = db.__getattr__(collection_name)
  i = 0;
  try:
    start = time()
    for item in read_csv_zip(zip_file):
      i += 1
      if (i % 1000) == 0:
        stdout.write("\r%d " % i)
        stdout.flush()
      try:
        collection.insert(item)
      except Exception as exc:
        print("Failed at the record #{0} (id = {1})".format(i,item['_id']))
        print exc
    print("Elapsed time = {0} seconds, {1} records.".format(time() - start, i))
    raw_input("Press ENTER to exit")
  except Exception as exc:
    print("Failed at the record #{0} (id = {1})".format(i,item['_id']))
    print exc
    exit(1)

插入262796条记录(一个csv文件)需要350秒。在

mongo服务器运行在同一台机器上,没有人使用它。所以,如果有办法的话,我可以直接写入数据库文件。在

我对切分不感兴趣,因为800万条记录不需要切分,不是吗?在

我的问题是我做错了什么?也许我对数据库的选择是错误的?典型的流程是每月刷新一次记录,然后只对数据库进行查询。在

谢谢。在

编辑

事实证明,瓶颈不是mongo,而是读取zip文件。我更改了代码,以1000行为单位读取zip文件,然后在一次调用Collection.insert中将它们提供给mongo。它是zip文件,它需要所有的时间。以下是修改后的代码:

^{pr2}$

结果发现大部分时间都花在items = list(itertools.islice(source, batch_size))上。在

有什么改进的办法吗?在


Tags: nameclientidreturnifserverdefnot
2条回答

如果每一张唱片略多于1ms,我不会说它“非常慢”,但无论如何,下面是一些关于下一步该怎么做的想法:

  • 使用探查器查看程序在哪里花费时间。可能不是你想的那样。在
  • 考虑使用ChrisP注释中链接的mongoimport实用程序,因为它是为此目的而设计的。在

尽管您在评论中指出您不能使用mongoimport,但您可以而且应该使用。日期可以完全导入,以及您的str2拉丁语转换。只需预处理您的csv,使其与mongoimport兼容,您就成了黄金。在

将日期转换为{myDate:{$date: msSinceEpoch}},mongoimport将理解它。因此,只需进行一个预处理步骤,就可以使用mongoimport了,鉴于您的用例,我不明白为什么这会是一个问题。在

也就是说,mongoimport不应该比批插入快一个数量级,尽管1000/秒并不慢,但它肯定不符合我在一个简单的dev机器上获得的性能类型。如果我使用批插入而不是mono插入,我可以轻松地达到30k/s,甚至可能更高,尤其是safe=false writes(在这种情况下,这应该很好,因为您可以在导入之后的第二步进行验证)。你的瓶颈是什么?(用mongostat和top检查)

相关问题 更多 >