我的任务是用python构建一个进程,从Elasticsearch中提取数据,将数据放入Azure Blob中,之后Snowflake将接收数据。我在Azure函数上运行该进程,该进程提取一个索引组(如game_name.*),并为索引组中的每个索引创建一个滚动线程。我保存每个结果的最后日期,并在下次运行时在范围查询中解析它。我每五分钟运行一次进程,并将范围的末尾偏移了5分钟(我们每2分钟运行一次刷新)。我让这个过程运行一段时间,然后通过在Elasticsearch和Snowflake中按小时(或按天)计算计数(*)来进行差距分析,预计最大差距为1%。然而,对于一种索引模式,它将大约127个索引分组,当我运行一个catchup作业(一天或更长时间)时,产生的间隔与预期的一样,但是,只要我让它在cron作业上运行(每5分钟一次),一段时间后,我就会得到6-10%的间隔,并且只针对这个索引组。 滚动条功能似乎在查询范围内拾取了一定数量的文档,但由于某些原因,文档后来添加(放置)的日期较早。或者我可能错了,我的代码正在做一些有趣的事情。我和我们的团队谈过,他们不会在客户端缓存任何文档,数据会同步到网络时钟(而不是客户端的时钟)并发送UTC
请参见下面我用于通过elasticsearch分页的查询:
def query(searchSize, lastRowDateOffset, endDate, pit, keep_alive):
body = {
"size": searchSize,
"query": {
"bool": {
"must": [
{
"exists": {
"field": "baseCtx.date"
}
},
{
"range": {
"baseCtx.date": {
"gt": lastRowDateOffset,
"lte": endDate
}
}
}
]
}
},
"pit": {
"id": pit,
"keep_alive": keep_alive
},
"sort": [
{
"baseCtx.date": {"order": "asc", "unmapped_type": "long"}
},
{
"_shard_doc": "asc"
}
],
"track_total_hits": False
}
return body
def scroller(pit,
threadNr,
index,
lastRowDateOffset,
endDate,
maxThreads,
es,
lastRowCount,
keep_alive="1m",
searchSize=10000):
cumulativeResultCount = 0
iterationResultCount = 0
data = []
dataBytes = b''
lastIndexDate = ''
startScroll = time.perf_counter()
while 1:
if lastRowCount == 0: break
#if lastRowDateOffset == endDate: lastRowCount = 0; break
try:
page = es.search(body=body)
except: # It is believed that the point in time is getting closed, hence the below opens a new one
pit = es.open_point_in_time(index=index, keep_alive=keep_alive)['id']
body = query(searchSize, lastRowDateOffset, endDate, pit, keep_alive)
page = es.search(body=body)
pit = page['pit_id']
data += page['hits']['hits']
body['pit']['id'] = pit
if len(data) > 0: body['search_after'] = [x['sort'] for x in page['hits']['hits']][-1]
cumulativeResultCount += len(page['hits']['hits'])
iterationResultCount = len(page['hits']['hits'])
#print(f"This Iteration Result Count: {iterationResultCount} -- Cumulative Results Count: {cumulativeResultCount} -- {time.perf_counter() - startScroll} seconds")
if iterationResultCount < searchSize: break
if len(data) > rowsPerMB * maxSizeMB / maxThreads: break
if time.perf_counter() - startScroll > maxProcessTimeSeconds: break
if len(data) != 0:
dataBytes = gzip.compress(bytes(json.dumps(data)[1:-1], encoding='utf-8'))
lastIndexDate = max([x['_source']['baseCtx']['date'] for x in data])
response = {
"pit": pit,
"index": index,
"threadNr": threadNr,
"dataBytes": dataBytes,
"lastIndexDate": lastIndexDate,
"cumulativeResultCount": cumulativeResultCount
}
return response
def batch(game_name, env='prod', startDate='auto', endDate='auto', writeDate=True, minutesOffset=5):
es = Elasticsearch(
esUrl,
port=9200,
timeout=300)
lowerFormat = game_name.lower().replace(" ","_")
indexGroup = lowerFormat + "*"
if env == 'dev': lowerFormat, indexGroup = 'dev_' + lowerFormat, 'dev.' + indexGroup
azFormat = re.sub(r'[^0-9a-zA-Z]+', '-', game_name).lower()
storageContainerName = azFormat
curFileName = f"{lowerFormat}_cursors.json"
curBlobFilePath = f"cursors/{curFileName}"
compressedTools = [gzip.compress(bytes('[', encoding='utf-8')), gzip.compress(bytes(',', encoding='utf-8')), gzip.compress(bytes(']', encoding='utf-8'))]
pits = []
lastRowCounts = []
# Parameter and state settings
if os.getenv(f"{lowerFormat}_maxSizeMB") is not None: maxSizeMB = int(os.getenv(f"{lowerFormat}_maxSizeMB"))
if os.getenv(f"{lowerFormat}_maxThreads") is not None: maxThreads = int(os.getenv(f"{lowerFormat}_maxThreads"))
if os.getenv(f"{lowerFormat}_maxProcessTimeSeconds") is not None: maxProcessTimeSeconds = int(os.getenv(f"{lowerFormat}_maxProcessTimeSeconds"))
# Get all indices for the indexGroup
indicesEs = list(set([(re.findall(r"^.*-", x)[0][:-1] if '-' in x else x) + '*' for x in list(es.indices.get(indexGroup).keys())]))
indices = [{"indexName": x, "lastOffsetDate": (datetime.datetime.utcnow()-datetime.timedelta(days=5)).strftime("%Y/%m/%d 00:00:00")} for x in indicesEs]
# Load Cursors
cursors = getCursors(curBlobFilePath, indices)
# Offset the current time by -5 minutes to account for the 2-3 min delay in Elasticsearch
initTime = datetime.datetime.utcnow()
if endDate == 'auto': endDate = f"{initTime-datetime.timedelta(minutes=minutesOffset):%Y/%m/%d %H:%M:%S}"
print(f"Less than or Equal to: {endDate}, {keep_alive}")
# Start Multi-Threading
while 1:
dataBytes = []
dataSize = 0
start = time.perf_counter()
if len(pits) == 0: pits = ['' for x in range(len(cursors))]
if len(lastRowCounts) == 0: lastRowCounts = ['' for x in range(len(cursors))]
with concurrent.futures.ThreadPoolExecutor(max_workers=len(cursors)) as executor:
results = [
executor.submit(
scroller,
pit,
threadNr,
x['indexName'],
x['lastOffsetDate'] if startDate == 'auto' else startDate,
endDate,
len(cursors),
es,
lastRowCount,
keep_alive,
searchSize) for x, pit, threadNr, lastRowCount in (zip(cursors, pits, list(range(len(cursors))), lastRowCounts))
]
for f in concurrent.futures.as_completed(results):
if f.result()['lastIndexDate'] != '': cursors[f.result()['threadNr']]['lastOffsetDate'] = f.result()['lastIndexDate']
pits[f.result()['threadNr']] = f.result()['pit']
lastRowCounts[f.result()['threadNr']] = f.result()['cumulativeResultCount']
dataSize += f.result()['cumulativeResultCount']
if len(f.result()['dataBytes']) > 0: dataBytes.append(f.result()['dataBytes'])
print(f"Thread {f.result()['threadNr']+1}/{len(cursors)} -- Index {f.result()['index']} -- Results pulled {f.result()['cumulativeResultCount']} -- Cumulative Results: {dataSize} -- Process Time: {round(time.perf_counter()-start, 2)} sec")
if dataSize == 0: break
lastRowDateOffsetDT = datetime.datetime.strptime(max([x['lastOffsetDate'] for x in cursors]), '%Y/%m/%d %H:%M:%S')
outFile = f"elasticsearch/live/{lastRowDateOffsetDT:%Y/%m/%d/%H}/{lowerFormat}_live_{lastRowDateOffsetDT:%Y%m%d%H%M%S}_{datetime.datetime.utcnow():%Y%m%d%H%M%S}.json.gz"
print(f"Starting compression of {dataSize} rows -- {round(time.perf_counter()-start, 2)} sec")
dataBytes = compressedTools[0] + compressedTools[1].join(dataBytes) + compressedTools[2]
# Upload to Blob
print(f"Comencing to upload data to blob -- {round(time.perf_counter()-start, 2)} sec")
uploadJsonGzipBlobBytes(outFile, dataBytes, storageContainerName, len(dataBytes))
print(f"File compiled: {outFile} -- {dataSize} rows -- Process Time: {round(time.perf_counter()-start, 2)} sec\n")
# Update cursors
if writeDate: postCursors(curBlobFilePath, cursors)
# Clean Up
print("Closing PITs")
for pit in pits:
try: es.close_point_in_time({"id": pit})
except: pass
print(f"Closing Connection to {esUrl}")
es.close()
return
# Start the process
while 1:
batch("My App")
我想我只需要第二双眼睛来指出代码中可能存在的问题。我曾尝试将分钟偏移量argv增加到60(因此每5分钟它就会从上次运行到现在的数据中提取一次()-60分钟),但也有同样的问题。请帮忙
因此,“baseCtx.date”是由客户端触发的,在某些情况下,在触发事件和可搜索事件之间似乎存在延迟。我们通过使用摄入管道修复了此问题,如下所示:
并在模板设置中将index.default_管道设置为“indexDate”。每个月索引名称都会发生变化(我们附加年份和月份),这种方法会创建一个用于滚动的服务器日期
相关问题 更多 >
编程相关推荐