大数据集上的弹性搜索聚合

2024-09-28 16:23:30 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用Python ElasticSearch API。 我的数据集太大,无法使用search()检索。 我可以用helpers.scan()检索,但数据太大,无法用pandas快速处理

因此,我学会了如何使用ElasticSearch进行聚合以压缩数据,但仍然使用search(),我无法检索所有数据。我知道聚合是按照“通常”的搜索大小进行的,即使聚合会给出一行

最后,我尝试了aggregations+scan或scroll,但我知道scan()或scroll()不能用于进行聚合,因为这些请求在数据集的子集上工作,而在子集上的聚合是无意义的

在非常大的数据集上进行聚合的好方法是什么? 我在网上找不到任何相关的解决方案

更明确地说,我的情况是: 我有X千个移动传感器,每小时发送最后一个停止位置,新的停止位置。从最后一站移动到新站可能需要几天的时间,所以在这几天里,我没有每小时采集的相关信息。 作为ElasticSearch输出,我只需要格式的每一行: 传感器id/最后一次停止/新停止


Tags: 数据apipandassearchscan传感器elasticsearch子集
1条回答
网友
1楼 · 发布于 2024-09-28 16:23:30

如果您将elastic用于pandas,您可以尝试eland一个新的官方elastic库,该库旨在更好地集成它们。尝试:

es = Elasticsearch() 

body = {
  "size": 0,
  "aggs": {
    "getAllSensorId": {
      "terms": {
        "field": "sensor_id",
        "size": 10000
      },
      "aggs": {
        "getAllTheLastStop": {
          "terms": {
            "field": "last_stop",
            "size": 10000
          },
      "aggs": {
        "getAllTheNewStop": {
          "terms": {
            "field": "new_stop",
            "size": 10000
          }
        }
      }
        }
      }
    }
  }
}
list_of_results = []
result = es.search(index="my_index", body=body)
for sensor in result["aggregations"]["getAllTheSensorId"]["buckets"]:
    for last in sensor["getAllTheLastStop"]["buckets"]:
        for new in last["getAllTheNewStop"]["buckets"]:
            record = {"sensor": sensor['key'], "last_stop": last['key'], "new_stop": new['key']}
            list_of_results.append(record)
            

相关问题 更多 >