正在尝试将文件下载缓冲区拆分为单独的线程

2024-10-02 08:21:45 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图将文件的缓冲区下载到5个线程中,但它似乎变得乱七八糟。在

from numpy import arange
import requests
from threading import Thread
import urllib2

url = 'http://pymotw.com/2/urllib/index.html'
sizeInBytes = r = requests.head(url, headers={'Accept-Encoding': 'identity'}).headers['content-length']

splitBy = 5

splits = arange(splitBy + 1) * (float(sizeInBytes)/splitBy)

dataLst = []

def bufferSplit(url, idx, splits):
    req = urllib2.Request(url,  headers={'Range': 'bytes=%d-%d' % (splits[idx], splits[idx+1])})
    print {'bytes=%d-%d' % (splits[idx], splits[idx+1])}
    dataLst.append(urllib2.urlopen(req).read())


for idx in range(splitBy):
    dlth = Thread(target=bufferSplit, args=(url, idx, splits))
    dlth.start()


print dataLst

with open('page.html', 'w') as fh:
    fh.write(''.join(dataLst))

更新: 所以我反复工作,只取得了一些进展,但是如果我下载了一个jpg,它似乎被破坏了

^{pr2}$

下面是如何下载后的图片。在

corrupted image


Tags: fromimporturlhtmlurllib2requeststhreadheaders
2条回答

下面是我如何让它工作如果任何人有任何建议,可能的改进,你是非常欢迎的。在

import os
import requests
import threading
import urllib2
import time

url = "http://www.nasa.gov/images/content/607800main_kepler1200_1600-1200.jpg"

def buildRange(value, numsplits):
    lst = []
    for i in range(numsplits):
        if i == 0:
            lst.append('%s-%s' % (i, int(round(1 + i * value/(numsplits*1.0) + value/(numsplits*1.0)-1, 0))))
        else:
            lst.append('%s-%s' % (int(round(1 + i * value/(numsplits*1.0),0)), int(round(1 + i * value/(numsplits*1.0) + value/(numsplits*1.0)-1, 0))))
    return lst

class SplitBufferThreads(threading.Thread):
    """ Splits the buffer to ny number of threads
        thereby, concurrently downloading through
        ny number of threads.
    """
    def __init__(self, url, byteRange):
        super(SplitBufferThreads, self).__init__()
        self.__url = url
        self.__byteRange = byteRange
        self.req = None

    def run(self):
        self.req = urllib2.Request(self.__url,  headers={'Range': 'bytes=%s' % self.__byteRange})

    def getFileData(self):
        return urllib2.urlopen(self.req).read()


def main(url=None, splitBy=3):
    start_time = time.time()
    if not url:
        print "Please Enter some url to begin download."
        return

    fileName = url.split('/')[-1]
    sizeInBytes = requests.head(url, headers={'Accept-Encoding': 'identity'}).headers.get('content-length', None)
    print "%s bytes to download." % sizeInBytes
    if not sizeInBytes:
        print "Size cannot be determined."
        return

    dataLst = []
    for idx in range(splitBy):
        byteRange = buildRange(int(sizeInBytes), splitBy)[idx]
        bufTh = SplitBufferThreads(url, byteRange)
        bufTh.start()
        bufTh.join()
        dataLst.append(bufTh.getFileData())

    content = ''.join(dataLst)

    if dataLst:
        if os.path.exists(fileName):
            os.remove(fileName)
        print " - %s seconds  -" % str(time.time() - start_time)
        with open(fileName, 'w') as fh:
            fh.write(content)
        print "Finished Writing file %s" % fileName

if __name__ == '__main__':
    main(url)

这是我开始工作的第一个基本代码,我发现如果我将bufTh缓冲线程设置为Daemon False,则进程需要更多的时间来完成。在

这是这个项目的另一个版本。差异:

  • 线程代码是一个单一的小函数

  • 每个线程下载一个块,然后将其存储在全局线程安全字典中

  • 线程被启动,然后join()它们同时运行

  • 所有操作完成后,数据按正确的顺序重新组合,然后写入磁盘

  • 额外打印,以验证所有内容是否正确

  • 计算输出文件大小,以便进行额外比较

来源

import os, requests
import threading
import urllib2
import time

URL = "http://www.nasa.gov/images/content/607800main_kepler1200_1600-1200.jpg"

def buildRange(value, numsplits):
    lst = []
    for i in range(numsplits):
        if i == 0:
            lst.append('%s-%s' % (i, int(round(1 + i * value/(numsplits*1.0) + value/(numsplits*1.0)-1, 0))))
        else:
            lst.append('%s-%s' % (int(round(1 + i * value/(numsplits*1.0),0)), int(round(1 + i * value/(numsplits*1.0) + value/(numsplits*1.0)-1, 0))))
    return lst

def main(url=None, splitBy=3):
    start_time = time.time()
    if not url:
        print "Please Enter some url to begin download."
        return

    fileName = url.split('/')[-1]
    sizeInBytes = requests.head(url, headers={'Accept-Encoding': 'identity'}).headers.get('content-length', None)
    print "%s bytes to download." % sizeInBytes
    if not sizeInBytes:
        print "Size cannot be determined."
        return

    dataDict = {}

    # split total num bytes into ranges
    ranges = buildRange(int(sizeInBytes), splitBy)

    def downloadChunk(idx, irange):
        req = urllib2.Request(url)
        req.headers['Range'] = 'bytes={}'.format(irange)
        dataDict[idx] = urllib2.urlopen(req).read()

    # create one downloading thread per chunk
    downloaders = [
        threading.Thread(
            target=downloadChunk, 
            args=(idx, irange),
        )
        for idx,irange in enumerate(ranges)
        ]

    # start threads, let run in parallel, wait for all to finish
    for th in downloaders:
        th.start()
    for th in downloaders:
        th.join()

    print 'done: got {} chunks, total {} bytes'.format(
        len(dataDict), sum( (
            len(chunk) for chunk in dataDict.values()
        ) )
    )

    print " - %s seconds  -" % str(time.time() - start_time)

    if os.path.exists(fileName):
        os.remove(fileName)
    # reassemble file in correct order
    with open(fileName, 'w') as fh:
        for _idx,chunk in sorted(dataDict.iteritems()):
            fh.write(chunk)

    print "Finished Writing file %s" % fileName
    print 'file size {} bytes'.format(os.path.getsize(fileName))

if __name__ == '__main__':
    main(URL)

输出

^{pr2}$

相关问题 更多 >

    热门问题