在Python中使用多线程方法将数据存储到BigQuery中的问题

2024-09-27 21:26:55 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在实现一个Python脚本,从Google BigQuery数据库获取现有用户数据,然后使用多线程方法为每个用户执行一些web抓取功能,最后将结果存储在BigQuery上的另一个表中。现有用户记录约360万条,为每个用户执行刮取操作最多需要40秒。我的目标是每天能够处理100000个用户,因此我需要一种并发处理方法

我正在使用来自concurrent.futures模块的ThreadPoolExecutor。在给定数量的线程完成其工作后,执行器应该将相应的一批结果存储回BigQuery中。我看到线程继续执行它们的web抓取功能。但是经过一段时间后(或者使用大量线程),它们停止将记录存储回数据库

起初,我想我是在处理一些与清除批处理结果相关的竞争条件,但从那时起,我从threading模块中实现了一个BoundedSemaphore,以实现一种锁定方法,我认为它解决了最初的问题。但结果仍然无法可靠地存储回数据库。也许我错过了什么

我需要一些在Python中有大量并发处理经验的人的帮助。具体来说,我正在Heroku服务器上运行脚本,因此Heroku体验可能也会有所帮助。谢谢下面是我的代码片段:

service = BigQueryService() # a custom class defined elsewhere

users = service.fetch_remaining_users(min_id=MIN_ID, max_id=MAX_ID, limit=LIMIT) # gets users from BigQuery
print("FETCHED UNIVERSE OF", len(users), "USERS")

with ThreadPoolExecutor(max_workers=MAX_THREADS, thread_name_prefix="THREAD") as executor:
    batch = []
    lock = BoundedSemaphore()
    futures = [executor.submit(user_with_friends, row) for row in users]
    print("FUTURE RESULTS", len(futures))
    for index, future in enumerate(as_completed(futures)):
        #print(index)
        result = future.result()

        # OK, so this locking business:
        # ... prevents random threads from clearing the batch, which was causing results to almost never get stored, and
        # ... restricts a thread's ability to acquire access to the batch until another one has released it
        lock.acquire()
        batch.append(result)
        if (len(batch) >= BATCH_SIZE) or (index + 1 >= len(futures)): # when batch is full or is last
            print("-------------------------")
            print(f"SAVING BATCH OF {len(batch)}...")
            print("-------------------------")
            service.append_user_friends(batch) # stores the results in another table on BigQuery
            batch = []
        lock.release()

另见:

https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

https://docs.python.org/3.7/library/threading.html#threading.BoundedSemaphore


Tags: 方法用户数据库lenservicebatchbigquery线程
1条回答
网友
1楼 · 发布于 2024-09-27 21:26:55

因此,我最终使用了另一种更可靠的方法(见下文)。旧的方法在线程之间协调以存储结果,而新的方法处理并存储每个线程的批

def split_into_batches(all_users, batch_size=BATCH_SIZE):
    """h/t: https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks"""
    for i in range(0, len(all_users), batch_size):
        yield all_users[i : i + batch_size]

def process_and_save_batch(user_rows, bq):
    print(generate_timestamp(), "|", current_thread().name, "|", "PROCESSING...")
    bq.append_user_friends([user_with_friends(user_row) for user_row in user_rows])
    print(generate_timestamp(), "|", current_thread().name, "|", "PROCESSED BATCH OF", len(user_rows))
    return True

service = BigQueryService() # a custom class defined elsewhere

users = service.fetch_remaining_users(min_id=MIN_ID, max_id=MAX_ID, limit=LIMIT)
print("FETCHED UNIVERSE OF", len(users), "USERS")

batches = list(split_into_batches(users))
print(f"ASSEMBLED {len(batches)} BATCHES OF {BATCH_SIZE}")

with ThreadPoolExecutor(max_workers=MAX_THREADS, thread_name_prefix="THREAD") as executor:

    for batch in batches:
        executor.submit(process_and_save_batch, batch, service)

当我将线程数显著增加到2500这样的数字时,脚本几乎完全停止存储结果(我仍想进一步研究这一行为),但我能够以相对较低的线程数运行它,并且它正在完成这项工作

相关问题 更多 >

    热门问题