主循环不等待python多处理池完成&freez

2024-09-27 00:21:46 发布

您现在位置:Python中文网/ 问答频道 /正文

这是我10年后的第一个python项目,也是我第一次体验python多处理,所以可能有一些我没见过的非常基本的错误。你知道吗

我被python和一个多处理网络爬虫困住了。我的爬虫程序检查主页的更改,然后并行遍历子类别,将项目添加到列表中。然后并行地检查这些项目,并通过selenium进行提取(因为我不知道如何进行其他操作,因为单击这些项目时内容会动态加载到页面中)。你知道吗

主回路:

import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
import time
from bs4 import BeautifulSoup
import pickledb
import random
import multiprocessing
import itertools

import config


requests.packages.urllib3.disable_warnings(InsecureRequestWarning)


def getAllSubCategories(pageNumber, items):
    # check website and look for subcategories that are "worth" extracting
    url = 'https://www.google.com' + str(pageNumber)
    response = requests.get(url, verify=False, headers=config.headers, cookies=config.cookies)
    pageSoup = BeautifulSoup(response.content, features='html.parser')
    elements = soup.find(...)
    if not elements: # website not loading properly
        return getAllSubCategories(items)

    for element in elements:
        items.append(element)


def checkAndExtract(item, ignoredItems, itemsToIgnore):
    # check if items are already extracted; if not, extract them if they contain a keyword
    import checker
    import extractor

    if item not in ignoredItems:
        if checker.check(item):
            extractor.extract(item, itemsToIgnore)
        else: itemsToIgnore.append(item)


if __name__ == '__main__':
    multiprocessing.freeze_support()

    itemsToIgnore = multiprocessing.Manager().list()

    crawlUrl = 'https://www.google.com/'
    db = pickledb.load('myDB.db', False)

    while True:
        try:
            # check main website for changes
            response = requests.get(crawlUrl, verify=False, headers=config.headers, cookies=config.cookies)
            soup = BeautifulSoup(response.content, features='html.parser')
            mainCondition = soup.find(...)

            if mainCondition:
                numberOfPages = soup.find(...)

                ignoredItems = db.get('ignoredItems')
                if not ignoredItems:
                    db.lcreate('ignoredItems')
                    ignoredItems = db.get('ignoredItems')

                items = multiprocessing.Manager().list()
                # get all items from subcategories
                with multiprocessing.Pool(30) as pool:
                    pool.starmap(getAllSubCategories, zip(range(numberOfPages, 0, -1), itertools.repeat(items)))

                itemsToIgnore[:] = []
                # loop through all items
                with multiprocessing.Pool(30) as pool:
                    pool.starmap(checkAndExtract, zip(items, itertools.repeat(ignoredItems), itertools.repeat(itemsToIgnore)))

                for item in itemsToIgnore:
                    if item not in db.get('ignoredItems'): db.ladd('ignoredItems', item)
                db.dump()

            time.sleep(random.randint(10, 20))
        except KeyboardInterrupt:
            break
        except Exception as e:
            print(e)
            continue

校对人:

import config

def check(item):
    title = item...
    try:
        for keyword in config.keywords: # just a string array
            if keyword.lower() in title.lower():
                return True
    except Exception as e:
        print(e)

    return False

提取器:

from selenium import webdriver
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.by import By
import time

import config

def extract(item, itemsToIgnore):
    driver = webdriver.Chrome('./chromedriver')
    driver.implicitly_wait(3)
    driver.get('https://www.google.com')
    for key in config.cookies:
        driver.add_cookie({'name': key, 'value': config.cookies[key], 'domain': '.google.com'})
    try:
        driver.get('https://www.google.com')

        wait = WebDriverWait(driver, 10)
        if driver.title == 'Page Not Found':
            extract(item, itemsToIgnore)
            return

        driver.find_element_by_xpath('...').click()
        time.sleep(1)
        button = wait.until(EC.element_to_be_clickable((By.XPATH, '...')))
        button.click()
        # and some extraction magic
    except:
        extract(item, itemsToIgnore) # try again

一切正常,一些测试运行成功。但有时循环会在池完成其工作之前再次启动。在日志中,我可以看到条目检查器如何返回true,但提取器甚至没有启动,主进程开始下一次迭代:

2019-12-23 00:21:16,614 [SpawnPoolWorker-6220] [INFO ] check returns true
2019-12-23 00:21:18,142 [MainProcess         ] [DEBUG] starting next iteration
2019-12-23 00:21:39,630 [SpawnPoolWorker-6247] [INFO ] checking subcategory

另外,我猜池不会以某种方式清理,因为我怀疑SpawnPoolWorker-XXXX的数字应该有那么高。1小时后也会结冰。这可能与此问题有关。你知道吗


Tags: infromimportconfigfordbgetif

热门问题