任务队列调度程序框架。
pytq的Python项目详细描述
欢迎使用pytq文档
pytq(python任务队列)是任务计划程序库。
我们解决的问题:
- 你有任务要做。
- 每个任务都有input_data,经过处理后,我们得到output_data。
pytq提供这些开箱即用的功能(而且都是可定制的)。
- 将输出数据保存到数据持久化系统。
- 过滤掉重复的输入数据。
- 内置多线程处理器提高了速度。
- 很好的内置日志系统。
- 而且很容易定义您将如何:
- 处理输入的数据
- 与数据持久性系统集成
- 过滤重复的输入数据
- 检索输出数据
示例
假设你有一些url要抓取,而你不想抓取那些url 已成功爬网,并且还希望将已爬网的数据保存到数据库中。
#!/usr/bin/env python# -*- coding: utf-8 -*-""" This script implement multi-thread safe, a sqlite backed task queue scheduler. """frompytqimportSqliteDictScheduler# Define your input_data modelclassUrlRequest(object):def__init__(self,url,context_data=None):self.url=url# your have url to crawlself.context_data=context_data# and maybe some context data to useclassScheduler(SqliteDictScheduler):# (Required) define how you gonna process your datadefuser_process(self,input_data):# you need to implement get_html_from_url yourselfhtml=get_html_from_url(input_data.url)# you need to implement parse_html yourselfoutput_data=parse_html(html)returnoutput_datas=Scheduler(user_db_path="example.sqlite")# let's have some urlsinput_data_queue=[UrlRequest(url="https://pypi.python.org/pypi/pytq"),UrlRequest(url="https://pypi.python.org/pypi/crawlib"),UrlRequest(url="https://pypi.python.org/pypi/loggerFactory"),]# execute multi thread processs.do(input_data_queue,multiprocess=True)# print outputforid,outpupt_datains.items():...
自定义:
classScheduler(SqliteDictScheduler):# (Optional) define the identifier of input_data (for duplicate)defuser_hash_input(self,input_data):returninput_data.url# (Optional) define how do you save output_data to database# Here we just use the default onedefuser_post_process(self,task):self._default_post_process(task)# (Optional) define how do you skip crawled url# Here we just use the default onedefuser_is_duplicate(self,task):returnself._default_is_duplicate(task)
托多:更多的例子来了。
安装
pytq在pypi上发布,所以您只需要:
$ pip install pytq
要升级到最新版本:
$ pip install --upgrade pytq