我有一个场景,其中我得到了一个要处理的作业列表,例如一个要从互联网爬网的网页列表)。每个作业都是独立的,而且作业可以以任何顺序处理。单个作业可能失败或成功,并且可能必须进行相应的处理(例如,失败的爬网任务的临时数据可能必须在下一轮中删除并重新爬网)
我试图用python中基于线程的处理来实现它。为了模拟实际的任务,假设我有一个庞大的整数数组列表,单个任务是计算每个数组的Sum
或Product
。我试图做的是使用JobsProcessor
类对象来实例化JobWorker
类对象的线程,这些对象通过为其他类创建对象来执行实际处理(这里是Sum
和Product
)。代码如下所述。将显示一个代码段
from queue import Queue, Empty
from threading import Thread
import time
class Product:
def __init__(self,data):
self.data = data
def doOperation(self):
try:
product =self.data[0]
for d in self.data[1:]:
if d>100000:
raise Exception( "Forcefully throwing exception")
product*=d
time.sleep(1)
return product
except:
return "product computation failed"
class Sum:
def __init__(self,data):
self.data = data
def doOperation(self):
try:
sum =0
for d in self.data:
sum+=d
time.sleep(1)
return sum
except:
return "sum computation failed"
class JobWorker(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
def run(self):
while True:
try:
jobitem = self.queue.get_nowait()
if jobitem is None:
break
jobdata, optype = jobitem
if optype =='sum':
opobj = Sum(jobdata)
jobresult = opobj.doOperation()
elif optype =='product':
opobj = Product(jobdata)
jobresult = opobj.doOperation()
else:
print ("Invalid op type")
jobresult = 'Failed'
print(" job result", jobresult)
self.queue.task_done()
except Empty:
break
except:
print ("Some exception occured")
#How to pass it to up to the main jobs processor#
class JobsProcessor(object):
def __init__(self, joblist):
self.joblist = joblist
self.job_queue = Queue()
def process_resources(self):
try:
for job in self.joblist:
self.job_queue.put(job)
for i in range(2):
jobthread = JobWorker(self.job_queue)
jobthread.start()
'''
Write code here to monitor current status for all running jobs
'''
self.job_queue.join()
'''I want to write code here to track progress status for all jobs
Some jobs may have failed, not completed and based on that I may
want to take further action such as retry or flag them'''
print("Finished Jobs")
except:
pass
orgjobList = [ ([1,5,9,4],'sum'),
([5,4,5,8],'product'),
([100,45,678,999],'product'),
([3743,34,44324,543],'sum'),
([100001, 100002, 9876, 83989], 'product')]
mainprocessor = JobsProcessor(orgjobList)
mainprocessor.process_resources()
我想在这个过程中添加两个功能。你知道吗
JobWorker
对象的状态(例如,它们是否成功完成/完成但失败)。故障/异常可能发生在JobWorker对象中,甚至可能是Sum或Product对象。失败/成功状态应该传播回JobsProcessor,在那里我想根据返回的状态执行其他操作,如重新处理/删除/发送到别处等Monitor
功能,它可以连续检查当前正在运行/已完成作业的状态,并立即执行必要的操作,如删除,而不是等到整合结束时才执行请告知我如何添加上述功能,以及它们中是否只有一个可以满足爬网页面等情况。我们也欢迎任何其他建议。你知道吗
您可以通过以下两种方式之一在代码中添加这两种功能-
getProgress
和getStatus
方法(优雅的方法)您可以创建两个线程,一个线程执行实际工作并更新进度变量。你知道吗
对于第二种方法,可以在
__init__
类中设置两个变量,如下所示。你知道吗然后你可以在你的代码中包含如下逻辑-
相关问题 更多 >
编程相关推荐