如何在python中使用asyncio和wget下载多个文件?

2024-09-27 22:22:33 发布

您现在位置:Python中文网/ 问答频道 /正文

我想从dukaskopy下载很多文件。典型的url如下所示

url = 'http://datafeed.dukascopy.com/datafeed/AUDUSD/2014/01/02/00h_ticks.bi5'

我尝试了答案here,但大多数文件的大小都是0

但当我简单地使用wget循环时(见下文),我得到了完整的文件

import wget
from urllib.error import HTTPError

pair = 'AUDUSD'
for year in range(2014,2015):
    for month in range(1,13):
        for day in range(1,32):
            for hour in range(24): 
                try:
                    url = 'http://datafeed.dukascopy.com/datafeed/' + pair + '/' + str(year) + '/' + str(month-1).zfill(2) + '/' + str(day).zfill(2) + '/' + str(hour).zfill(2) + 'h_ticks.bi5'
                    filename = pair + '-' + str(year) + '-' + str(month-1).zfill(2) + '-' + str(day).zfill(2) + '-' + str(hour).zfill(2) + 'h_ticks.bi5'
                    x = wget.download(url, filename)
#                     print(url)
                except HTTPError as err:
                    if err.code == 404:
                        print((year, month,day, hour))
                    else:
                        raise

我以前使用过以下code用于抓取网站,但不用于下载文件

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from aiohttp import ClientSession, client_exceptions
from asyncio import Semaphore, ensure_future, gather, run
from json import dumps, loads

limit = 10
http_ok = [200]


async def scrape(url_list):
    tasks = list()
    sem = Semaphore(limit)

    async with ClientSession() as session:
        for url in url_list:
            task = ensure_future(scrape_bounded(url, sem, session))
            tasks.append(task)

        result = await gather(*tasks)

    return result


async def scrape_bounded(url, sem, session):
    async with sem:
        return await scrape_one(url, session)


async def scrape_one(url, session):
    try:
        async with session.get(url) as response:
            content = await response.read()
    except client_exceptions.ClientConnectorError:
        print('Scraping %s failed due to the connection problem', url)
        return False

    if response.status not in http_ok:
        print('Scraping%s failed due to the return code %s', url, response.status)
        return False

    content = loads(content.decode('UTF-8'))

    return content

if __name__ == '__main__':
    urls = ['http://demin.co/echo1/', 'http://demin.co/echo2/']
    res = run(scrape(urls))

    print(dumps(res, indent=4))

可以使用多处理here下载多个文件。但我认为asyncio可以更快

当返回大小为0的文件时,可能是服务器限制了请求的数量,但我仍然想探讨是否有可能使用wget和asyncio下载多个文件


Tags: 文件inimporthttpurlforasyncreturn
1条回答
网友
1楼 · 发布于 2024-09-27 22:22:33

这里有一个例子。解码/编码以及写入操作应根据目标数据类型固定

   #!/usr/bin/env python3
# -*- coding: utf-8 -*-

from aiofile import AIOFile
from aiohttp import ClientSession
from asyncio import ensure_future, gather, run, Semaphore
from calendar import monthlen
from lzma import open as lzma_open
from struct import calcsize, unpack
from io import BytesIO
from json import dumps

http_ok = [200]
limit = 5
base_url = 'http://datafeed.dukascopy.com/datafeed/{}/{}/{}/{}/{}h_ticks.bi5'
fmt = '>3i2f'
chunk_size = calcsize(fmt)


async def download():
    tasks = list()
    sem = Semaphore(limit)

    async with ClientSession() as session:
        for pair in ['AUDUSD']:
            for year in [2014, 2015]:
                for month in range(1, 12):
                    for day in range(1, monthlen(year, month)):
                        for hour in range(0, 23):
                            tasks.append(ensure_future(download_one(pair=pair,
                                                                    year=str(year).zfill(2),
                                                                    month=str(month).zfill(2),
                                                                    day=str(day).zfill(2),
                                                                    hour=str(hour).zfill(2),
                                                                    session=session,
                                                                    sem=sem)))
        return await gather(*tasks)


async def download_one(pair, year, month, day, hour, session, sem):
    url = base_url.format(pair, year, month, day, hour)
    data = list()

    async with sem:
        async with session.get(url) as response:
            content = await response.read()

        if response.status not in http_ok:
            print(f'Scraping {url} failed due to the return code {response.status}')
            return

        if content == b'':
            print(f'Scraping {url} failed due to the empty content')
            return

        with lzma_open(BytesIO(content)) as f:
            while True:
                chunk = f.read(chunk_size)
                if chunk:
                    data.append(unpack(fmt, chunk))
                else:
                    break

        async with AIOFile(f'{pair}-{year}-{month}-{day}-{hour}.bi5', 'w') as fl:
            await fl.write(dumps(data, indent=4))

        return


if __name__ == '__main__':
    run(download())

源代码是可用的here

相关问题 更多 >

    热门问题