如何在python中并行化包含请求后API调用的for循环?

2024-09-29 22:03:52 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个非常大的数据集(71M行-~7GB csv文件),我已经加载到pandas的数据框中。在

我需要在一个post请求调用中将数据帧中的每一行发送到一个API,然后将响应存储到另一个数据帧中,该数据帧稍后将被导出并进行分析

目前我的代码是这样的

##read data from csv

for row in data.itertuples(index=True, name='Pandas'):
    ##There is an if-else ladder to create a header depending on the type of values in a row

    ##Code to create a json payload

    r = requests.post(url, data=json.dumps(payload), headers=headers)
    t = json.loads(r.text)

    ## A try and except block to add the data sent via header and payload + the response from API call back into a new data-frame. (exception is in case there is no response from the API)

##write the data back to csv

API可以处理大约50000多个QPS(每秒查询数),但是这种执行方法只需要大约11个QPS。在我之前的测试中,我将一组较小的数据(大约700万行)拆分4次,并使用相同的代码运行4个不同的Jupyter笔记本,因此有效地达到了大约44-50的QPS,并运行了24小时的代码。在

注意:我不想用那么多的QPS来攻击它,因为它是一个生产API,我已经获得了大约10k QPS的津贴

既然我现在需要在一个更大的集合上运行它,那么有没有一种方法可以在Python中实现呢?将数据分成4块并一起运行是否等同于并行处理代码?在

也许我做错了,还有别的办法吗?-我使用python的经验主要是用于分析和数据科学工作(numpy、pandas等),所以这是我唯一想到的方法。在

我使用的是Intel xenon e5-2690 v2处理器(20核)和128 GB RAM的系统,因此我认为它应该能够处理这个问题,因为在我之前的执行中,它在资源利用率方面几乎不费吹灰之力。在

如果能帮我指出正确的方向,我将不胜感激。在

编辑: 所有的建议都指向了aiohttp,但是因为我的时间不多了,而且我已经在多处理池方面取得了进展,所以我继续这样做。增加了几行代码

^{pr2}$

apicall”函数基本上与上面的代码位相同(for循环部分)

对于大约10000个数据集,它运行良好。但是,如果我把它扩展到100000或更多,我会遇到一个错误,我会再次陷入困境

OSError: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted

编辑2: 明白我为什么会犯上述错误。看来异步是唯一的方法。在


Tags: csvtheto数据方法代码infrom
2条回答

aiohttp是你的工具。在

async with aiohttp.ClientSession() as session:
    async with session.post(url, data=json.dumps(payload), headers=headers) as resp:
        resp = await resp.text()

你应该试试Dask

它有两个好处:

  • 如果您的数据集太大而无法装入RAM,Dask将为您处理此问题

  • Dask将帮助您并行处理您的请求。

相关问题 更多 >

    热门问题