如何运行x数量的线程并等待线程完成

2024-09-30 12:16:25 发布

您现在位置:Python中文网/ 问答频道 /正文

我一直在试图弄清楚Python线程越来越多,但当涉及到队列时,我被卡住了

我的想法是有一个可以读取的CSV文件(比如一行1000行CSV行)。我想做的是读取CSV中每一行的信息,但我希望它以线程方式执行。通过这一点,我希望有大量的x线程同时运行,这意味着如果我想同时运行5个线程。应该只运行5个线程

一旦5个线程中的一个线程完成,它应该立即从csv运行一个新行(如果没有更多的可读取内容,则停止)

到目前为止,我所做的是:

import sys
import csv
import threading
import queue


totalThreadAtTime = 5

def threadingTest(row):
    print(row.get('Sales Start Date'))


def main():
    with open('test.csv') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:

            threading.Thread(
                target=threadingTest,
                args=(row,)
            ).start()


if __name__ == '__main__':
    main()

现在csv中的每一行都有一个线程,我想“限制”它在同一时间只运行5个线程。一旦一个完成,然后开始一个新的

我该怎么做

如果我错过了什么,请告诉我。请让我知道!:)

编辑:

CSV:

Home Furnishing Business No.,Product Range Area No.,Product Area No.,No.,Description,Unit Price Including VAT,045 Sellable Stock,022 Sellable Stock,Sales Method,Range Code,Sales Start Date,End Date Sales,Range Status,Replenishment Code
07,071,0711,10290396,ME rnfrcd vent top rl 60 galvanised AP CN,8.00,"1,000.",949.,F,K,6/1/2015,,Released,10
07,073,0731,379172,FO N drwr low 80x60 white AP,38.00,"1,000.",963.,F,K,2/1/2019,,Released,10
07,073,0731,80379173,FO N drwr med 40x60 white AP,30.00,"1,000.",964.,F,K,2/1/2019,,Released,10
07,073,0731,40379170,FO N drwr low 40x60 white AP,26.00,"1,000.",966.,F,K,2/1/2019,,Released,10
07,073,0731,20379171,FO N drwr low 60x60 white AP,32.00,"1,000.",967.,F,K,2/1/2019,,Released,10
07,073,0731,60379174,FO N drwr med 60x60 white AP,36.00,"1,000.",967.,F,K,2/1/2019,,Released,10
10,101,1015,70420173,SUNNEBY cord set 1.8 m dark yellow textile,9.90,"1,665.",983.,M,K,8/1/2019,,Released,10
02,021,0211,10444351,GLASSVIK gls dr 60x64 drk rbr/clear glass AP,25.00,663.,996.,S,K,4/1/2020,,Released,10
02,021,0211,50444387,SELSVIKEN door/drawer front 60x38 hi-gl drk rbr AP,10.00,666.,999.,S,K,4/1/2020,,Released,10
09,093,0935,90311229,KURA NN  bed tent pink AP,30.00,666.,999.,S,K,8/1/2015,,Released,10
12,121,1211,80459221,GUNRID air purify crtn 1 pair 145x250 lgrey AP,49.90,666.,999.,M,K,4/1/2020,,Released,10
16,163,1633,451832,VANLIGEN vase 18 grey AP,14.90,666.,999.,M,K,4/1/2020,,Released,10
18,181,1813,70261230,BRADA laptop support 42x31 pink AP CN,9.90,666.,999.,M,K,10/1/2013,,Released,10
07,075,0752,10247181,HALLVIKEN in sin 1 bwl 56x50 blk quartz comp AP CN,350.00,"1,000.",999.,F,K,2/1/2014,,Released,10
10,102,1023,10390701,FOTO NN pend lmp 38 aluminium,29.90,"1,666.",999.,M,K,4/1/2018,,Released,10
10,104,1042,50426166,LILLHULT USB type C t USB crd 1.5 m AP,7.90,"1,666.",999.,M,K,10/1/2018,,Released,10
06,061,0611,20392276,GO high cabinet 40x32x192 Kasjon light grey AP,295.00,"1,000.","1,000.",F,K,2/1/2018,,Released,10
06,062,0621,60381285,TISKEN soap dish w suction cup white AP,6.90,"1,000.","1,000.",M,K,2/1/2019,,Released,10
11,113,1131,20432574,OTTSJON hand towel 40x70 white/blue AP,5.90,"1,665.","1,000.",M,K,4/1/2019,,Released,10
11,111,1112,10412595,VARBRACKA qc/2pwc 150x200/50x80 beige/white AP,29.90,"1,666.","1,000.",M,K,10/1/2018,,Released,10
11,111,1112,60412606,VARBRACKA qc/4pwc 200x200/50x80 beige/white AP,39.90,"1,666.","1,000.",M,K,10/1/2018,,Released,10
06,061,0611,30387646,GO wash-stnd w 2 drws 80x47x58 Kasjon lgrey AP,325.00,"2,000.","1,000.",F,K,2/1/2018,,Released,10
02,021,0211,30363990,SINDVIK gls dr 60x38 light grey/clear glass AP,25.00,"1,666.","1,001.",S,K,4/1/2017,,Released,10
11,111,1112,40412607,VARBRACKA qc/4pwc 240x220/50x80 beige/white AP,49.90,"1,666.","1,002.",M,K,10/1/2018,,Released,10
12,121,1211,343404,SPARVORT sheer crtn 1 pair 145x250 white AP,39.90,"1,666.","1,002.",M,K,2/1/2017,,Released,10

def main():

    pool = ThreadPool(processes=5)  # argument name is inherited from process pool, a bit confusing

    def process_row(row):
        print(row)
        # pass  # do something

    # file handler can be directly iterated instead
    # then, you'll get a line instead of a parsed CSV row
    reader = csv.reader(open('test.csv'))

    # pool.map is faster but doesn't guarantee order of results
    pool.imap(process_row, reader)

if __name__ == '__main__':
    main()

Tags: csvnoimportmaindef线程readerrow
2条回答

multiprocessing中包含了线程池的现有实现。 下面是一个如何使用它的示例:

import csv
from multiprocessing.pool import ThreadPool

# argument name is inherited from process pool, and is a bit confusing
# will use <number of CPUs> if omitted
pool = ThreadPool(processes=max_threads)

def process_row(row):
    pass  # do something

# file handler can be directly iterated instead
# then, you'll get a line instead of a parsed CSV row
reader = csv.reader(open(filename))

# pool.map is faster but doesn't guarantee order of results
pool.imap(process_row, reader)

UPD:pool.imap是一个迭代器。它将在控制台中自动求值,但在独立脚本中必须显式求值。修正:

result = list(pool.imap(process_row, reader))

join方法是等待线程结束,只需在启动的每个线程上调用它

def main():
    threads = []
    with open('test.csv') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:    
            th = threading.Thread(target=threadingTest, args=(row,))
            threads.append(th)
            th.start()

    for th in threads:
        th.join()

相关问题 更多 >

    热门问题