从作业处理器启动线程时,如何跟踪作业辅助线程的进度?

2024-09-26 17:41:58 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个场景,其中我得到了一个要处理的作业列表,例如一个要从互联网爬网的网页列表)。每个作业都是独立的,而且作业可以以任何顺序处理。单个作业可能失败成功,并且可能必须进行相应的处理(例如,失败的爬网任务的临时数据可能必须在下一轮中删除并重新爬网

我试图用python中基于线程的处理来实现它。为了模拟实际的任务,假设我有一个庞大的整数数组列表,单个任务是计算每个数组的SumProduct。我试图做的是使用JobsProcessor类对象来实例化JobWorker类对象的线程,这些对象通过为其他类创建对象来执行实际处理(这里是SumProduct)。代码如下所述。将显示一个代码段

from queue import Queue, Empty
from threading import  Thread
import time

class Product:
    def __init__(self,data):
        self.data = data

    def doOperation(self):
        try:
            product =self.data[0]
            for d in self.data[1:]:
                if d>100000:
                    raise Exception( "Forcefully throwing exception")
                product*=d
                time.sleep(1)
            return product
        except:
            return "product computation failed"

class Sum:
    def __init__(self,data):
        self.data = data

    def doOperation(self):
        try:
            sum =0
            for d in self.data:
                sum+=d
                time.sleep(1)
            return sum
        except:
            return  "sum computation failed"


class JobWorker(Thread):
    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue

    def run(self):
         while True:
            try:
                jobitem = self.queue.get_nowait()
                if jobitem is None:
                    break
                jobdata, optype = jobitem
                if optype =='sum':
                    opobj = Sum(jobdata)
                    jobresult = opobj.doOperation()
                elif optype =='product':
                    opobj = Product(jobdata)
                    jobresult = opobj.doOperation()
                else:
                    print ("Invalid op type")
                    jobresult = 'Failed'
                print(" job result", jobresult)
                self.queue.task_done()
            except Empty:
                break
            except:
                print ("Some exception occured")
                #How to pass it to up to the main jobs processor#



class JobsProcessor(object):

    def __init__(self, joblist):
        self.joblist = joblist
        self.job_queue = Queue()

    def process_resources(self):
        try:
            for job in self.joblist:
                self.job_queue.put(job)

            for i in range(2):
                jobthread = JobWorker(self.job_queue)
                jobthread.start()           
            '''
                Write code here to monitor current status for all running jobs
            '''

            self.job_queue.join()

            '''I want to write code here to track progress status for all jobs 
                Some jobs may have failed, not completed and based on that I may 
                want to take further action such as retry or flag them'''
            print("Finished Jobs")
        except:
            pass



orgjobList = [ ([1,5,9,4],'sum'), 
               ([5,4,5,8],'product'), 
               ([100,45,678,999],'product'), 
               ([3743,34,44324,543],'sum'),
               ([100001, 100002, 9876, 83989], 'product')]
mainprocessor = JobsProcessor(orgjobList)
mainprocessor.process_resources()

我想在这个过程中添加两个功能。你知道吗

  • 合并:当所有作业线程完成时,我想知道所有JobWorker对象的状态(例如,它们是否成功完成/完成但失败)。故障/异常可能发生在JobWorker对象中,甚至可能是SumProduct对象。失败/成功状态应该传播回JobsProcessor,在那里我想根据返回的状态执行其他操作,如重新处理/删除/发送到别处等
  • 监控-我还想拥有Monitor功能,它可以连续检查当前正在运行/已完成作业的状态,并立即执行必要的操作,如删除,而不是等到整合结束时才执行

请告知我如何添加上述功能,以及它们中是否只有一个可以满足爬网页面等情况。我们也欢迎任何其他建议。你知道吗


Tags: to对象selffordataqueueinitdef
1条回答
网友
1楼 · 发布于 2024-09-26 17:41:58

您可以通过以下两种方式之一在代码中添加这两种功能-

  • 使用全局变量(最简单的方法)
  • 在类中使用getProgressgetStatus方法(优雅的方法)

您可以创建两个线程,一个线程执行实际工作并更新进度变量。你知道吗

对于第二种方法,可以在__init__类中设置两个变量,如下所示。你知道吗

def __init__(self):
    self.progress = 0
    self.success = True
    self.isDone = False
    self.error = "No Error Occurred"

然后你可以在你的代码中包含如下逻辑-

def actualWork(self):
    self.isDone = 0
    try:
        for i in range(1000):
            self.progress = i
            time.sleep(0.01)
        self.isDone = True
    except Exception as e:
        self.success = False
        self.error = str(e)

def getProgress(self):
    return self.progress

def getError(self):
    return self.error

相关问题 更多 >

    热门问题