Python: IOError 110 Connection timed out when reading from dis

2024-09-21 05:44:00 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在sungridengine超级计算机集群上运行一个Python脚本,该脚本读取文件id的列表,将每个id发送到工作进程进行分析,并将每个输入文件的一个输出写入磁盘。在

问题是我在worker函数中的某个地方得到IOError(110,“Connection timed out”),但我不确定原因。我以前在发出严重延迟的网络请求时收到过这个错误,但在本例中,worker只尝试从磁盘读取数据。在

我的问题是:从磁盘读取时,什么会导致连接超时错误,如何解决这个错误?如果其他人能提供任何帮助,我们将不胜感激。在

完整脚本(IOError出现在minhash_text()):

from datasketch import MinHash
from multiprocessing import Pool
from collections import defaultdict
from nltk import ngrams
import json
import sys
import codecs
import config

cores = 24
window_len = 12
step = 4
worker_files = 50
permutations = 256
hashband_len = 4

def minhash_text(args):
  '''Return a list of hashband strings for an input doc'''
  try:
    file_id, path = args
    with codecs.open(path, 'r', 'utf8') as f:
      f = f.read()
    all_hashbands = []
    for window_idx, window in enumerate(ngrams(f.split(), window_len)):
      window_hashbands = []
      if window_idx % step != 0:
        continue
      minhash = MinHash(num_perm=permutations, seed=1)
      for ngram in set(ngrams(' '.join(window), 3)):
        minhash.update( ''.join(ngram).encode('utf8') )
      hashband_vals = []
      for i in minhash.hashvalues:
        hashband_vals.append(i)
        if len(hashband_vals) == hashband_len:
          window_hashbands.append( '.'.join([str(j) for j in hashband_vals]) )
          hashband_vals = []
      all_hashbands.append(window_hashbands)
    return {'file_id': file_id, 'hashbands': all_hashbands}
  except Exception as exc:
    print(' ! error occurred while processing', file_id, exc)
    return {'file_id': file_id, 'hashbands': []}

if __name__ == '__main__':

  file_ids = json.load(open('file_ids.json'))
  file_id_path_tuples = [(file_id, path) for file_id, path in file_ids.items()]

  worker_id = int(sys.argv[1])
  worker_ids = list(ngrams(file_id_path_tuples, worker_files))[worker_id]

  hashband_to_ids = defaultdict(list)
  pool = Pool(cores)

  for idx, result in enumerate(pool.imap(minhash_text, worker_ids)):
    print(' * processed', idx, 'results')
    file_id = result['file_id']
    hashbands = result['hashbands']
    for window_idx, window_hashbands in enumerate(hashbands):
      for hashband in window_hashbands:
        hashband_to_ids[hashband].append(file_id + '.' + str(window_idx))

  with open(config.out_dir + 'minhashes-' + str(worker_id) + '.json', 'w') as out:
    json.dump(dict(hashband_to_ids), out)

Tags: pathinimportidjsonidsforlen
1条回答
网友
1楼 · 发布于 2024-09-21 05:44:00

结果发现我对文件系统的冲击太大了,对同一台服务器上的文件进行了太多的并发读取请求。该服务器在给定的时间段内只能允许固定数量的读取,因此任何超过该限制的请求都会收到连接超时响应。在

解决方案是在while循环中包装每个文件读取请求。在while循环中,尝试从磁盘读取适当的文件。如果出现连接超时错误,请睡眠一秒钟,然后重试。只有当文件被读取后,while循环才能被中断。在

相关问题 更多 >

    热门问题