我正在实现一个Python脚本,从Google BigQuery数据库获取现有用户数据,然后使用多线程方法为每个用户执行一些web抓取功能,最后将结果存储在BigQuery上的另一个表中。现有用户记录约360万条,为每个用户执行刮取操作最多需要40秒。我的目标是每天能够处理100000个用户,因此我需要一种并发处理方法
我正在使用来自concurrent.futures
模块的ThreadPoolExecutor
。在给定数量的线程完成其工作后,执行器应该将相应的一批结果存储回BigQuery中。我看到线程继续执行它们的web抓取功能。但是经过一段时间后(或者使用大量线程),它们停止将记录存储回数据库
起初,我想我是在处理一些与清除批处理结果相关的竞争条件,但从那时起,我从threading
模块中实现了一个BoundedSemaphore
,以实现一种锁定方法,我认为它解决了最初的问题。但结果仍然无法可靠地存储回数据库。也许我错过了什么
我需要一些在Python中有大量并发处理经验的人的帮助。具体来说,我正在Heroku服务器上运行脚本,因此Heroku体验可能也会有所帮助。谢谢下面是我的代码片段:
service = BigQueryService() # a custom class defined elsewhere
users = service.fetch_remaining_users(min_id=MIN_ID, max_id=MAX_ID, limit=LIMIT) # gets users from BigQuery
print("FETCHED UNIVERSE OF", len(users), "USERS")
with ThreadPoolExecutor(max_workers=MAX_THREADS, thread_name_prefix="THREAD") as executor:
batch = []
lock = BoundedSemaphore()
futures = [executor.submit(user_with_friends, row) for row in users]
print("FUTURE RESULTS", len(futures))
for index, future in enumerate(as_completed(futures)):
#print(index)
result = future.result()
# OK, so this locking business:
# ... prevents random threads from clearing the batch, which was causing results to almost never get stored, and
# ... restricts a thread's ability to acquire access to the batch until another one has released it
lock.acquire()
batch.append(result)
if (len(batch) >= BATCH_SIZE) or (index + 1 >= len(futures)): # when batch is full or is last
print("-------------------------")
print(f"SAVING BATCH OF {len(batch)}...")
print("-------------------------")
service.append_user_friends(batch) # stores the results in another table on BigQuery
batch = []
lock.release()
另见:
https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
https://docs.python.org/3.7/library/threading.html#threading.BoundedSemaphore
因此,我最终使用了另一种更可靠的方法(见下文)。旧的方法在线程之间协调以存储结果,而新的方法处理并存储每个线程的批
当我将线程数显著增加到2500这样的数字时,脚本几乎完全停止存储结果(我仍想进一步研究这一行为),但我能够以相对较低的线程数运行它,并且它正在完成这项工作
相关问题 更多 >
编程相关推荐