Python多处理:如何使用列表的下一个元素从一组进程中再次运行一个进程?

2024-06-23 03:09:21 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个包含表名的列表,假设列表的大小为n。现在我有m个服务器,所以我打开了m个对应于每个服务器的游标,它们也在另一个列表中。现在,对于每个表,我想调用一个特定的函数,它将参数作为这两个列表

templst = [T1,T2,T3,T4,T5,T6, T7,T8,T9,T10,T11]
curlst = [cur1,cur2,cur3,cur4,cur5]

这些游标以cur=conn.cursor()的形式打开,因此它们是对象

def extract_single(tableName, cursorconn):
      qry2 = "Select * FROM %s"% (tableName)
      cursorconn.execute(qry2).fetchall()
      print " extraction done"
      return 

现在我打开了5个processess(因为我有5个游标),以便并行运行它们

processes = []

x = 0
for x in range(5):
   new_p = 'p%x'%x
   print "process :", new_p
   new_p = multiprocessing.Process(target=extract_single, args=(templst[x],cur[x]))
   new_p.start()
   processes.append(new_p)


for process in processes:
    process.join()

因此,这确保我为每个游标打开了5个进程,并使用了前5个表名。 现在我想,一旦5个进程中的任何一个进程完成,它应该立即从我的圣殿师那里拿走第6张桌子,同样的事情继续下去,直到所有圣殿师完成

如何为此行为修改此代码? 例如 举个简单的例子,我想做什么。让我们考虑一个TEPLPLST作为int,我想调用它的睡眠函数

templst = [1,2,5,7,4,3,6,8,9,10,11]
curlst = [cur1,cur2,cur3,cur4,cur5]

def extract_single(sec, cursorconn):
      print "Sleeping for second=%s done by cursor=%s"% (sec,cursorconn)
      time.sleep(sec)
      print " sleeping done"
      return

因此,当我启动5个游标时,可能是睡眠(1)或睡眠(2)先完成 所以,一旦它完成,我想用光标运行sleep(3)

我的实际查询将依赖于游标,因为它将是SQL查询

修改方法 考虑前面的睡眠例子。现在我想实现我假设有10个游标,我的睡眠队列按递增顺序或递减顺序排序。 按递增顺序考虑列表 现在,在10个游标中,前5个游标将从队列中获取前5个元素,我的另一组5个游标将获取最后5个元素。 所以基本上我的游标队列被分成两部分,一部分取最小值,另一部分取最大值。 现在,若上半部分的光标结束,则应取下一个可用的最低值,若下半部分的光标结束,则应取第(n-6)个值,即从结束处的第6个值

我需要从两边遍历队列,每个队列有两组5个游标

example: curlst1 = [cur1,cur2,cur3,cur4,cur5]
         curlst2 = [cur6,cur7,cur8,cur9,cur10 ]
        templst = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]

so cur1 -> 1
   cur2 ->2
... cur5 -> 5
cur6 -> 16
cur7 ->15
.... cur10->12

现在cur1首先完成,因此需要6(前面的第一个可用元素) cur2发现它需要7个,以此类推 如果cur10找到,则需要11(后面的下一个可用元素)

依此类推,直到圣堂的所有元素


Tags: 元素列表new队列extractprint游标single
1条回答
网友
1楼 · 发布于 2024-06-23 03:09:21

templst参数(不管是实际示例中的表名还是下面示例中的睡眠秒数)放在多处理队列上。然后每个进程循环读取队列中的下一项。当队列为空时,将不再执行任何工作,您的流程可以返回。实际上,您已经实现了自己的流程池,其中每个流程都有自己的专用游标连接。现在,函数extract_single将从中检索表名或秒参数的队列作为其第一个参数

import multiprocessing
import Queue
import time

def extract_single(q, cursorconn):
    while True:
        try:
            sec = q.get_nowait()
            print "Sleeping for second=%s done by cursor=%s" % (sec,cursorconn)
            time.sleep(sec)
            print " sleeping done"
        except Queue.Empty:
            return

def main():
    q = multiprocessing.Queue()
    templst = [1,2,5,7,4,3,6,8,9,10,11]
    for item in templst:
        q.put(item) # add items to queue
    curlst = [cur1,cur2,cur3,cur4,cur5]
    process = []
    for i in xrange(5):
        p = multiprocessing.Process(target=extract_single, args=(q, curlst[i]))
        process.append(p)
        p.start()
    for p in process:
        p.join()

if __name__ == '__main__':
    main()

注意

如果处理器少于5个,则可以尝试使用5个(或更多)线程运行此程序,在这种情况下,应使用常规的Queue对象

更新问题的更新答案

允许您从队列前端和末端删除项目的数据结构称为deque(双端队列)。不幸的是,没有支持多处理的deque版本。但我认为您的表处理可能与线程处理一样好,而且您的计算机不太可能有10个处理器来支持10个并发进程

import threading
from collections import deque
import time
import sys

templst = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]
q = deque(templst)
curlst1 = [cur1,cur2,cur3,cur4,cur5]
curlst2 = [cur6,cur7,cur8,cur9,cur10]

def extract_single(cursorconn, from_front):
    while True:
        try:
            sec = q.popleft() if from_front else q.pop()
            #print "Sleeping for second=%s done by cursor=%s" % (sec,cursorconn)
            sys.stdout.write("Sleeping for second=%s done by cursor=%s\n" % (sec,cursorconn))
            sys.stdout.flush() # flush output
            time.sleep(sec)
            #print " sleeping done"
            sys.stdout.write("sleeping done by %s\n" % cursorconn)
            sys.stdout.flush() # flush output
        except IndexError:
            return

def main():
    threads = []
    for cur in curlst1:
        t = threading.Thread(target=extract_single, args=(cur, True))
        threads.append(t)
        t.start()
    for cur in curlst2:
        t = threading.Thread(target=extract_single, args=(cur, False))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

if __name__ == '__main__':
    main()

相关问题 更多 >

    热门问题