为什么Elasticsearch和Snowflake之间有很大的差距?

2024-10-01 09:28:18 发布

您现在位置:Python中文网/ 问答频道 /正文

我的任务是用python构建一个进程,从Elasticsearch中提取数据,将数据放入Azure Blob中,之后Snowflake将接收数据。我在Azure函数上运行该进程,该进程提取一个索引组(如game_name.*),并为索引组中的每个索引创建一个滚动线程。我保存每个结果的最后日期,并在下次运行时在范围查询中解析它。我每五分钟运行一次进程,并将范围的末尾偏移了5分钟(我们每2分钟运行一次刷新)。我让这个过程运行一段时间,然后通过在Elasticsearch和Snowflake中按小时(或按天)计算计数(*)来进行差距分析,预计最大差距为1%。然而,对于一种索引模式,它将大约127个索引分组,当我运行一个catchup作业(一天或更长时间)时,产生的间隔与预期的一样,但是,只要我让它在cron作业上运行(每5分钟一次),一段时间后,我就会得到6-10%的间隔,并且只针对这个索引组。 enter image description here 滚动条功能似乎在查询范围内拾取了一定数量的文档,但由于某些原因,文档后来添加(放置)的日期较早。或者我可能错了,我的代码正在做一些有趣的事情。我和我们的团队谈过,他们不会在客户端缓存任何文档,数据会同步到网络时钟(而不是客户端的时钟)并发送UTC

请参见下面我用于通过elasticsearch分页的查询:

def query(searchSize, lastRowDateOffset, endDate, pit, keep_alive):

    body = {
        "size": searchSize,
        "query": {
            "bool": {
                "must": [
                    {
                        "exists": {
                            "field": "baseCtx.date"
                        }
                    },
                    {
                        "range": {
                            "baseCtx.date": {
                                "gt": lastRowDateOffset,
                                "lte": endDate
                            }
                        }
                    }
                ]
            }
        },
        "pit": {
            "id": pit,
            "keep_alive": keep_alive
        },
        "sort": [
            {
                "baseCtx.date": {"order": "asc", "unmapped_type": "long"}
            },
            {
                "_shard_doc": "asc"
            }
        ],
        "track_total_hits": False
    }
    
    return body

def scroller(pit,
             threadNr,
             index,
             lastRowDateOffset,
             endDate,
             maxThreads,
             es,
             lastRowCount,
             keep_alive="1m",
             searchSize=10000):
    
    cumulativeResultCount = 0
    iterationResultCount = 0
    data = []
    dataBytes = b''
    lastIndexDate = ''
    startScroll = time.perf_counter()
    while 1:
        if lastRowCount == 0: break
        #if lastRowDateOffset == endDate: lastRowCount = 0; break
        try:
            page = es.search(body=body)
        except: # It is believed that the point in time is getting closed, hence the below opens a new one
            pit = es.open_point_in_time(index=index, keep_alive=keep_alive)['id']
            body = query(searchSize, lastRowDateOffset, endDate, pit, keep_alive)
            page = es.search(body=body)
        pit = page['pit_id']
        data += page['hits']['hits']
        body['pit']['id'] = pit
        if len(data) > 0: body['search_after'] = [x['sort'] for x in page['hits']['hits']][-1]
        cumulativeResultCount += len(page['hits']['hits'])
        iterationResultCount = len(page['hits']['hits'])

        #print(f"This Iteration Result Count: {iterationResultCount} -- Cumulative Results Count: {cumulativeResultCount} -- {time.perf_counter() - startScroll} seconds")

        if iterationResultCount < searchSize: break
        if len(data) > rowsPerMB * maxSizeMB / maxThreads: break
        if time.perf_counter() - startScroll > maxProcessTimeSeconds: break

    if len(data) != 0:
        dataBytes = gzip.compress(bytes(json.dumps(data)[1:-1], encoding='utf-8'))
        lastIndexDate = max([x['_source']['baseCtx']['date'] for x in data])

    response = {
        "pit": pit,
        "index": index,
        "threadNr": threadNr,
        "dataBytes": dataBytes,
        "lastIndexDate": lastIndexDate,
        "cumulativeResultCount": cumulativeResultCount
    }
    
    return response

def batch(game_name, env='prod', startDate='auto', endDate='auto', writeDate=True, minutesOffset=5):
    
    es = Elasticsearch(
        esUrl,
        port=9200,
        timeout=300)
    
    lowerFormat = game_name.lower().replace(" ","_")
    indexGroup = lowerFormat + "*"
    if env == 'dev': lowerFormat, indexGroup = 'dev_' + lowerFormat, 'dev.' + indexGroup
    azFormat = re.sub(r'[^0-9a-zA-Z]+', '-', game_name).lower()
    storageContainerName = azFormat
    curFileName = f"{lowerFormat}_cursors.json"
    curBlobFilePath = f"cursors/{curFileName}"    

    compressedTools = [gzip.compress(bytes('[', encoding='utf-8')), gzip.compress(bytes(',', encoding='utf-8')), gzip.compress(bytes(']', encoding='utf-8'))]
    pits = []
    lastRowCounts = []

    # Parameter and state settings
    if os.getenv(f"{lowerFormat}_maxSizeMB") is not None: maxSizeMB = int(os.getenv(f"{lowerFormat}_maxSizeMB"))
    if os.getenv(f"{lowerFormat}_maxThreads") is not None: maxThreads = int(os.getenv(f"{lowerFormat}_maxThreads"))
    if os.getenv(f"{lowerFormat}_maxProcessTimeSeconds") is not None: maxProcessTimeSeconds = int(os.getenv(f"{lowerFormat}_maxProcessTimeSeconds"))

    # Get all indices for the indexGroup
    indicesEs = list(set([(re.findall(r"^.*-", x)[0][:-1] if '-' in x else x) + '*' for x in list(es.indices.get(indexGroup).keys())]))
    indices = [{"indexName": x, "lastOffsetDate": (datetime.datetime.utcnow()-datetime.timedelta(days=5)).strftime("%Y/%m/%d 00:00:00")} for x in indicesEs]

    # Load Cursors
    cursors = getCursors(curBlobFilePath, indices)


    # Offset the current time by -5 minutes to account for the 2-3 min delay in Elasticsearch
    initTime = datetime.datetime.utcnow()
    if endDate == 'auto': endDate = f"{initTime-datetime.timedelta(minutes=minutesOffset):%Y/%m/%d %H:%M:%S}"
    print(f"Less than or Equal to: {endDate}, {keep_alive}")

    # Start Multi-Threading
    while 1:
        dataBytes = []
        dataSize = 0
        start = time.perf_counter()

        if len(pits) == 0: pits = ['' for x in range(len(cursors))]
        if len(lastRowCounts) == 0: lastRowCounts = ['' for x in range(len(cursors))]

        with concurrent.futures.ThreadPoolExecutor(max_workers=len(cursors)) as executor:
            results = [
                executor.submit(
                    scroller,
                    pit,
                    threadNr,
                    x['indexName'],
                    x['lastOffsetDate'] if startDate == 'auto' else startDate,
                    endDate,
                    len(cursors),
                    es,
                    lastRowCount,
                    keep_alive,
                    searchSize) for x, pit, threadNr, lastRowCount in (zip(cursors, pits, list(range(len(cursors))), lastRowCounts))
            ]

            for f in concurrent.futures.as_completed(results):
                if f.result()['lastIndexDate'] != '': cursors[f.result()['threadNr']]['lastOffsetDate'] = f.result()['lastIndexDate']
                pits[f.result()['threadNr']] = f.result()['pit']
                lastRowCounts[f.result()['threadNr']] = f.result()['cumulativeResultCount']

                dataSize += f.result()['cumulativeResultCount']
                if len(f.result()['dataBytes']) > 0: dataBytes.append(f.result()['dataBytes'])

                print(f"Thread {f.result()['threadNr']+1}/{len(cursors)} -- Index {f.result()['index']} -- Results pulled {f.result()['cumulativeResultCount']} -- Cumulative Results: {dataSize} -- Process Time: {round(time.perf_counter()-start, 2)} sec")

        if dataSize == 0: break
        lastRowDateOffsetDT = datetime.datetime.strptime(max([x['lastOffsetDate'] for x in cursors]), '%Y/%m/%d %H:%M:%S')
        outFile = f"elasticsearch/live/{lastRowDateOffsetDT:%Y/%m/%d/%H}/{lowerFormat}_live_{lastRowDateOffsetDT:%Y%m%d%H%M%S}_{datetime.datetime.utcnow():%Y%m%d%H%M%S}.json.gz"

        print(f"Starting compression of {dataSize} rows -- {round(time.perf_counter()-start, 2)} sec")
        dataBytes = compressedTools[0] + compressedTools[1].join(dataBytes) + compressedTools[2]

        # Upload to Blob
        print(f"Comencing to upload data to blob -- {round(time.perf_counter()-start, 2)} sec")
        uploadJsonGzipBlobBytes(outFile, dataBytes, storageContainerName, len(dataBytes))
        print(f"File compiled: {outFile} -- {dataSize} rows -- Process Time: {round(time.perf_counter()-start, 2)} sec\n")

        # Update cursors
        if writeDate: postCursors(curBlobFilePath, cursors)

    # Clean Up
    print("Closing PITs")
    for pit in pits:
        try: es.close_point_in_time({"id": pit})
        except: pass
    print(f"Closing Connection to {esUrl}")
    es.close()
    return

# Start the process
while 1:
    batch("My App")

我想我只需要第二双眼睛来指出代码中可能存在的问题。我曾尝试将分钟偏移量argv增加到60(因此每5分钟它就会从上次运行到现在的数据中提取一次()-60分钟),但也有同样的问题。请帮忙


Tags: infordatetimeleniftimebodyresult
1条回答
网友
1楼 · 发布于 2024-10-01 09:28:18

因此,“baseCtx.date”是由客户端触发的,在某些情况下,在触发事件和可搜索事件之间似乎存在延迟。我们通过使用摄入管道修复了此问题,如下所示:

PUT _ingest/pipeline/indexDate
{
  "description": "Creates a timestamp when a document is initially indexed",
  "version": 1,
  "processors": [
    {
      "set": {
        "field": "indexDate",
        "value": "{{{_ingest.timestamp}}}",
        "tag": "indexDate"
      }
    }
  ]
}

并在模板设置中将index.default_管道设置为“indexDate”。每个月索引名称都会发生变化(我们附加年份和月份),这种方法会创建一个用于滚动的服务器日期

相关问题 更多 >