如何使用ftplibpython2.7执行多段FTP下载?

2024-10-01 02:29:14 发布

您现在位置:Python中文网/ 问答频道 /正文

完成noob到Python,但是我已经完成了简单的FTP下载和上传,在写入整个文件之前,我没有填充RAM,而是将块写入磁盘。在

我的问题是,如何同时下载一个x数量的文件(多个线程下载一个文件的不同部分),同时立即将其写入磁盘,而不是先填充RAM?在

我到处找过这样的例子,但是他们先填充RAM,然后写出文件。在

我还想知道是否可以这样上传?在

谢谢


Tags: 文件数量ftp线程磁盘例子ramnoob
1条回答
网友
1楼 · 发布于 2024-10-01 02:29:14

所以我自己想办法:)

from ftplib import *
from threading import *
from shutil import *
import os

num_parts = 20
FTP_server = 'ftp.example.com'
FTP_user = 'mark'
FTP_password = 'password'

FTP_directory = '/foo/bar'
FTP_file = 'foo.bar'


class Done(Exception):
    pass


def open_ftp():
    ftp = FTP(FTP_server, FTP_user, FTP_password)
    ftp.cwd(FTP_directory)
    return ftp


def go():
    ftp = open_ftp()
    filesize = ftp.size(FTP_file)
    print 'filesize: ' + str(filesize)
    ftp.quit()

    chunk_size = filesize/num_parts
    last_chunk_size = filesize - (chunk_size * (num_parts - 1))

    downloaders = []
    for i in range(num_parts):
        if i == (num_parts - 1):
            this_chunk_size = last_chunk_size
        else:
            this_chunk_size = chunk_size
        downloaders.append(Downloader(i, chunk_size * i, this_chunk_size))
    for downloader in downloaders:
        downloader.thread.join()

    with open(FTP_file, 'w+b') as f:
        for downloader in downloaders:
            copyfileobj(open(downloader.part_name, 'rb'), f)


class Downloader:

    thread_number = 0

    def __init__(self, part_number, part_start, part_size):
        self.filename = FTP_file
        self.part_number = part_number
        self.part_name = 'part' + str(self.part_number)
        self.part_start = part_start
        self.part_size = part_size
        Downloader.thread_number += 1
        self.thread_number = Downloader.thread_number
        self.ftp = open_ftp()
        self.thread = Thread(target=self.receive_thread)
        self.thread.start()

    def receive_thread(self):
        try:
            self.ftp.retrbinary('RETR '+self.filename, self.on_data, 100000, self.part_start)
        except Done:
            pass

    def on_data(self, data):
        with open(self.part_name, 'a+b') as f:
            f.write(data)
        if os.path.getsize(self.part_name) >= self.part_size:
            with open(self.part_name, 'r+b') as f:
                f.truncate(self.part_size)
            raise Done

go()

所以我了解到retrbinary的回调是它获得的实际二进制数据。因此,对于每个线程,我创建一个文件并将回调中的二进制数据附加到该文件中,直到文件的大小大于预期的大小,然后我们截断多余的。 当所有线程都完成时,文件被连接起来,并生成一个具有原始文件名的文件。文件大小和sha256已完成,并确认其有效。:)

代码改编自RichieHindle

相关问题 更多 >