围绕PostgreSQL编写的消息队列。
psycopg2-mq的Python项目详细描述
psycopg2_mq是在 PostgreSQL, SQLAlchemy,和 psycopg2。
目前,库只提供可以使用的低级构造 构建多线程工作系统。它分为两部分:
- psycopg2_mq.MQWorker-一个可重用的工作对象,它管理 可以接受并执行作业的单线程工作线程。申请书 应该为每个线程创建工作线程。它支持一个线程安全优雅的api 关闭。
- psycopg2_mq.MQSource-为 调用和查询作业状态。
数据模型
队列
工人运行队列中定义的作业。当前每个队列将运行作业 同时,将来的版本可能支持在 每个队列。每个注册的队列都应该包含一个execute_job(job) 方法。
工作
队列的execute_job方法被传递给一个Job对象,该对象包含 以下属性:
- id
- queue
- method
- args
为了方便起见,有一个extend(**kw)方法可以用来 向对象添加额外属性。这在单个队列中很有用 定义队列及其方法之间的协定。
示例工人
frompsycopg2_mqimport(MQWorker,make_default_model,)fromsqlalchemyimport(MetaData,create_engine,)importsysclassEchoQueue:defexecute_job(self,job):returnf'hello, {job.args["name"]} from method="{job.method}"'if__name__=='__main__':engine=create_engine(sys.argv[1])metadata=MetaData()model=make_default_model(metadata)worker=MQWorker(engine=engine,queues={'echo':EchoQueue(),},model=model,)worker.run()
示例源
engine=create_engine()metadata=MetaData()model=make_default_model(metadata)session_factory=sessionmaker()session_factory.configure(bind=engine)dbsession=session_factory()withdbsession.begin():mq=MQSource(dbsession=dbsession,model=model,)job=mq.call('echo','hello',{'name':'Andy'})print(f'queued job={job.id}')
0.1.5(2019-05-17)
- 用字符串或循环序列化错误时修复回归。
0.1.4(2019-05-09)
- 作业失败时更安全地序列化异常对象。
0.1.3(2018-09-04)
- 重命名线程以包含处理作业时的作业ID。
0.1.2(2018-09-04)
- 将Job.params重命名为Job.args。
0.1.1(2018-09-04)
- 使psycopg2成为可选依赖项,以便允许应用程序依赖 如果他们愿意的话。
0.1(2018-09-04)
- 初次发布。