<p>我终于想出了一个解决办法</p>
<pre><code>import multiprocessing
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue, state):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.state = state
def run(self):
while True:
next_task = self.task_queue.get()
if next_task is None:
self.task_queue.task_done()
break
# answer = next_task() is where the Task object is being called.
# Python runs on a line per line basis so it stops here until assigned.
# Put if-else on the same line so it quits calling Task if state.is.set()
answer = next_task() if self.state.is_set() is False else 0
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
for i in range(self.b):
if self.a % i == 0:
return 0
return 1
def initialize(n_list, tasks, results, states):
sum_list = []
for i in range(cpu_cnt):
tasks.put(Task(n_list[i], number))
for _ in range(cpu_cnt):
sum_list.append(int(results.get()))
if 0 in sum_list:
states.set()
if 0 in sum_list:
states.clear()
return None
else:
states.clear()
return number
if __name__ == '__main__':
states = multiprocessing.Event() # ADD THIS BOOLEAN FLAG EVENT!
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
cpu_cnt = multiprocessing.cpu_count()
# Add states.Event() to Consumer argument list:
consumers = [ Consumer(tasks, results, states) for i in range(cpu_cnt) ]
for w in consumers:
w.start()
n_list = [x for x in range(1000)]
iter_list = []
for _ in range(1000):
iter_list.append(initialize(n_list, tasks, results, states)
for _ in range(num_jobs):
result = results.get()
for i in range(num_consumers):
tasks.put(None)
</code></pre>
<p>如果打开的使用者对象使用同一行上的If else语句为下一个_task()函数调用分配答案,则当设置state.Event()标志时,它将退出,因为它锁定到该行,直到排队的任务对象分配变量“answer”。这是一个伟大的解决办法!它使任务对象在消费者的while循环中可中断,消费者通过变量“answer”赋值运行任务对象。两个多星期后,我找到了一个可以加快速度的解决方案!我在代码的工作版本上测试了它,速度更快!使用这样的方法,可以无限期地打开多个进程,并通过Consumer object joinable Queue循环传递许多不同的任务对象,以极快的速度并行处理大量数据!这段代码使所有内核都像一个“超级内核”一样工作,其中所有进程都对每个内核保持开放,并为任何所需的迭代i/o流协同工作</p>
<p>以下是我的一个python多处理程序在8个超线程内核上实现此方法的示例输出:</p>
<pre><code>Enter prime number FUNCTION:n+n-1
Enter the number for 'n' START:1
Enter the number of ITERATIONS:100000
Progress: ########## 100%
Primes:
ƒ(2) = 3
ƒ(3) = 5
ƒ(4) = 7
ƒ(6) = 11
ƒ(7) = 13
ƒ(9) = 17
etc etc...
ƒ(99966) = 199931
ƒ(99967) = 199933
ƒ(99981) = 199961
ƒ(99984) = 199967
ƒ(100000) = 199999
Primes found: 17983
Prime at end of list has 6 digits.
Overall process took 1 minute and 2.5 seconds.
</code></pre>
<p>1至200000(除#2外)的所有17983个素数在~1分钟内达到全模数</p>
<p>在3990x128线程AMD Threadripper上,需要约8秒</p>
<p>以下是8个超线程内核上的另一个输出:</p>
<pre><code>Enter prime number FUNCTION:((n*2)*(n**2)**2)+1
Enter the number for 'n' START:1
Enter the number of ITERATIONS:1000
Progress: ########## 100%
Primes:
ƒ(1) = 3
ƒ(3) = 487
ƒ(8) = 65537
etc... etc...
ƒ(800) = 655360000000001
ƒ(839) = 831457011176399
ƒ(840) = 836423884800001
ƒ(858) = 929964638281537
ƒ(861) = 946336852720603
ƒ(884) = 1079670712526849
ƒ(891) = 1123100229130903
ƒ(921) = 1325342566697203
ƒ(953) = 1572151878119987
ƒ(959) = 1622269605897599
ƒ(983) = 1835682572370287
Primes found: 76
Prime at end of list has 16 digits.
Overall process took 1 minute and 10.6 seconds.
</code></pre>