在python中对大型数据集进行多处理(查找重复项)

2024-06-25 06:29:32 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个json文件,我想从中删除重复的行,但它太大,无法放入内存中。我找到了一个方法来完成它,但我猜这不是最好的方法。在

我的问题是,对于12gb的数据集,它只需8分钟就可以运行。但要求是对代码进行缩放,以便它可以在100gb的数据集上运行。有什么建议吗?我应该在python中使用多线程还是多处理来实现这一点?或者其他方法?在

代码如下:

import json
import time

""" This class contains the business logic for identifying the duplicates and creating an output file for further processing """

class BusinessService:

    """ The method identiifes the duplicate """
    def service(ipPath,opPath):
            start_time = time.time()    #We start the timer to see how much time the method takes to work #
            uniqueHandleSet = set();     #Creating a set to store unique values #
            try:
                duplicateHandles = open(opPath,'w+',encoding='utf-8')     #Opening and creating an output file to catch the duplicate hanndles #                     
                with open(ipPath,buffering = 200000000,encoding = 'utf-8') as infile:     #Reading the JSON File by buffering and using 20mb as it is too big to read at once #       
                    for line in infile:
                        tweetJsonObject = json.loads(line);

                        if tweetJsonObject["name"] not in uniqueHandleSet:
                            uniqueHandleSet.add(tweetJsonObject["name"]);
                        else:
                            duplicateHandles.write(line);

                print("--- %s seconds --- memory 200mb while buffering" % (time.time() - start_time));  #Printing the total time required to execute 
            except:
                print("Error")
            finally:
                duplicateHandles.close();

Tags: andtheto数据方法代码jsonfor
1条回答
网友
1楼 · 发布于 2024-06-25 06:29:32

要扩展它,您需要队列来输入多个进程,并需要两个共享列表来跟踪结果。其主要思想是将文件逐行送入队列,然后由一些使用者进程处理。然而,这些进程共享两个列表来存储中间结果。管理器负责进程之间的同步。在

以下代码只是一些粗略的指导原则,没有经过真正的测试:

from multiprocessing import Process, Manager, Queue

def findDuplicate(inputQueue, uniqueValues, duplicates):
    for line in iter(inputQueue.get, 'STOP'): #get line from Queue, stop if 'STOP' is received
        if line not in uniqueValues: # check if duplicate
            uniqueValues.append(line)
        else:
            duplicates.append(line) # store it

manager = Manager() # get a new SyncManager
uniqueValues = manager.list() # handle for shared list
duplicates = manager.list() # a 2nd handle for a shared list
inputQueue = Queue() # a queue to provide tasks to the processes

# setup workers, provide shared lists and tasks
numProc = 4
process = [Process(target=findDuplicate,
                      args=(inputQueue, uniqueValues, duplicates)) for x in range(numProc)]

# start processes, they will idle if nothing is in queue
for p in process:
    p.start()

with open(ipPath) as f:
    for line in f:
        inputQueue.put(line, block=True) # put line in queue, only if free slot avaible
for p in process:
    inputQueue.put('STOP') # signal workers to stop as no further input

    # wait for processes to finish
for p in process:
    p.join()

相关问题 更多 >