我有一个非常大的数据集(71M行-~7GB csv文件),我已经加载到pandas的数据框中。在
我需要在一个post请求调用中将数据帧中的每一行发送到一个API,然后将响应存储到另一个数据帧中,该数据帧稍后将被导出并进行分析
目前我的代码是这样的
##read data from csv
for row in data.itertuples(index=True, name='Pandas'):
##There is an if-else ladder to create a header depending on the type of values in a row
##Code to create a json payload
r = requests.post(url, data=json.dumps(payload), headers=headers)
t = json.loads(r.text)
## A try and except block to add the data sent via header and payload + the response from API call back into a new data-frame. (exception is in case there is no response from the API)
##write the data back to csv
API可以处理大约50000多个QPS(每秒查询数),但是这种执行方法只需要大约11个QPS。在我之前的测试中,我将一组较小的数据(大约700万行)拆分4次,并使用相同的代码运行4个不同的Jupyter笔记本,因此有效地达到了大约44-50的QPS,并运行了24小时的代码。在
注意:我不想用那么多的QPS来攻击它,因为它是一个生产API,我已经获得了大约10k QPS的津贴
既然我现在需要在一个更大的集合上运行它,那么有没有一种方法可以在Python中实现呢?将数据分成4块并一起运行是否等同于并行处理代码?在
也许我做错了,还有别的办法吗?-我使用python的经验主要是用于分析和数据科学工作(numpy、pandas等),所以这是我唯一想到的方法。在
我使用的是Intel xenon e5-2690 v2处理器(20核)和128 GB RAM的系统,因此我认为它应该能够处理这个问题,因为在我之前的执行中,它在资源利用率方面几乎不费吹灰之力。在
如果能帮我指出正确的方向,我将不胜感激。在
编辑: 所有的建议都指向了aiohttp,但是因为我的时间不多了,而且我已经在多处理池方面取得了进展,所以我继续这样做。增加了几行代码
^{pr2}$“apicall”函数基本上与上面的代码位相同(for循环部分)
对于大约10000个数据集,它运行良好。但是,如果我把它扩展到100000或更多,我会遇到一个错误,我会再次陷入困境
OSError: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted
编辑2: 明白我为什么会犯上述错误。看来异步是唯一的方法。在
aiohttp是你的工具。在
你应该试试Dask
它有两个好处:
如果您的数据集太大而无法装入RAM,Dask将为您处理此问题
Dask将帮助您并行处理您的请求。
相关问题 更多 >
编程相关推荐