多处理数据与多线程

2024-09-28 16:21:15 发布

您现在位置:Python中文网/ 问答频道 /正文

我编写并重写了我的小python应用程序,以至于我目前的python技能还不够。我从一个单线程应用程序开始,将Beautiful Soup作为解析器,改为lxml。使脚本多线程,我发现了twisted,但无法将这个小片段改为twisted。我会把这个贴在这里,也许你们可以给我指点更好的方向,让它更快一点。要拿到15万页,我需要1个小时。我对此很满意,因为我第一次尝试写的时候速度慢了3倍。在

#! /usr/bin/python
# coding: ISO-8859-1
import time, PySQLPool, Queue, threading
from urllib3 import connection_from_url
from lxml import etree
import cStringIO as StringIO

headers = {   
           'User-Agent'         : 'Mozilla/4.77 [en] (X11; I; IRIX;64 6.5 IP30)',
           'Accept'             : 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
           'Accept-Language'    : 'en-us;q=0.5,en;q=0.3',
           'Accept-Encoding'    : 'gzip, deflate',
           'Accept-Charset'     : 'utf-8;q=0.7,*;q=0.7'
}

t = time.time()
PySQLPool.getNewPool().maxActiveConnections = 60
db = PySQLPool.getNewConnection(username='user', password='pass', host='127.0.0.1', db='fddb')
pool = connection_from_url('http://fddb.info/', maxsize=60, timeout=150, headers=headers)
detailCounter = 0
urls = {}
queue = Queue.Queue()
out_queue = Queue.Queue()

clean_rows = {
              "Brennwert":"details_brennwert",
              "Kalorien":"details_kalorien",
              "Protein":"details_protein",
              "Kohlenhydrate":"details_kohlenhydrate",
              "davon Zucker":"details_zucker",
              "davon Polyole":"details_polyole",
              "Fett":"details_fett",
              "Ballaststoffe":"details_ballaststoffe",
              "Broteinheiten":"details_broteinheit",
              "Alkohol":"details_alkohol",
              "Cholesterin":"details_cholesterin",
              "Koffein":"details_koffein",
              "Wassergehalt":"details_wasser",
              "Vitamin C":"details_vitc",
              "Vitamin A":"details_vita",
              "Vitamin D":"details_vitd",
              "Vitamin E":"details_vite",
              "Vitamin B1":"details_vitb1",
              "Vitamin B2":"details_vitb2",
              "Vitamin B6":"details_vitb6",
              "Vitamin B12":"details_vitb12",
              "Natrium":"details_natrium",
              "Eisen":"details_eisen",
              "Zink":"details_zink",
              "Magnesium":"details_magnesium",
              "Chlor":"details_chlor",
              "Mangan":"details_mangan",
              "Schwefel":"details_schwefel",
              "Kalium":"details_kalium",
              "Kalzium":"details_kalzium",
              "Phosphor":"details_phosphor",
              "Kupfer":"details_kupfer",
              "Fluor":"details_fluor"
              }

def rows_escape(text):
    for item, key in clean_rows.items():
        text = text.replace(item, key)
    text = text.rstrip()
    return text

clean_values = {
         "kJ"   :"",
         "kcal" :"",
         "g"    :"",
         "mg"   :"",
         "%"    :"",
         ","    :".",
         u"\u03bc": ""
         }

def values_escape(text):
    for item, key in clean_values.items():
        text = text.replace(item, key)
    text = text.rstrip()
    return text

def insertDetails(container, foods_id):
    c = PySQLPool.getNewQuery(db)
    query_rows = ''
    query_values = ''
    for item in container:
        query_rows += item['row'] + ','
        query_values += item['value'] + ','

    c.Query("INSERT INTO details (%sdetails_id,foods_id) VALUES (%sNULL,%s)" % (query_rows, query_values, foods_id))
    c.Query("UPDATE foods SET foods_check = '1' WHERE foods_id=%d" % (foods_id))

def getHP(url):
    r = pool.request('GET', '/' + url)
    return r.data

class ThreadUrl(threading.Thread):
    def __init__(self, queue, out_queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.out_queue = out_queue
    def run(self):
        while True:
            host = self.queue.get()
            data = getHP(host[0])
            self.out_queue.put([data, host[1]])
            self.queue.task_done()

class DatamineThread(threading.Thread):
    def __init__(self, out_queue):
        threading.Thread.__init__(self)
        self.out_queue = out_queue
    def run(self):
        while True:
            global detailCounter

            qData = self.out_queue.get()
            data = qData[0]
            foods_id = qData[1]

            container = []
            parser = etree.HTMLParser(encoding='cp1252')
            tree = etree.parse(StringIO.StringIO(data), parser)
            divx = tree.xpath('//div[@style="background-color:#f0f5f9;padding:2px 4px;" or @style="padding:2px 4px;"]')

            for xdiv in divx:
                x = etree.ElementTree(element=xdiv, parser=parser)

                value = x.xpath('string(//div/text())')
                label = x.xpath('string(//*[self::a or self::span]/text())')

                label = rows_escape(label)

                if not "[nodata]" in value:
                    if u"\u03bc" in value:
                        value = values_escape(value)
                        item4 = 0
                        item4 = float(value)
                        item4 = item4 / 1000
                        container.append({'row':label,'value':str(item4)})
                    else:
                        container.append({'row':label,'value':values_escape(value)})

            detailCounter += 1
            container = tuple(container)
            insertDetails(container, foods_id)

            self.out_queue.task_done()

def main():

    c = PySQLPool.getNewQuery(db)
    c.Query("SELECT foods_id, foods_url FROM foods WHERE foods_check = 0")
    urls = c.record

    for i in range(6):
        t = ThreadUrl(queue, out_queue)
        t.setDaemon(True)
        t.start()

    for item in urls:
        queue.put([item['foods_url'], item['foods_id']])

    for i in range(6):
        dt = DatamineThread(out_queue)
        dt.setDaemon(True)
        dt.start()

    queue.join()
    out_queue.join()

main()
db.close
print "Zeit: %.2f New Details: %d" % (time.time()-t, detailCounter)

Tags: textinselfidqueuevaluecontainerdef
1条回答
网友
1楼 · 发布于 2024-09-28 16:21:15

如果您有多个CPU,并且您的程序似乎非常占用CPU,我建议您使用多处理模块。Python在多线程处理方面是出了名的差劲,因为全局解释器锁(GIL)基本上保证了在任何给定的时间内,单个进程中只能有一个Python线程执行。在

相关问题 更多 >