创建Zeebe Workers的简易工人包装
zeebe-worker的Python项目详细描述
Zeebe工人
一个简单的Zeebe worker包装器,让开发人员专注于重要的事情。
安装
pip install zeebe-worker
或使用您首选的包管理器。
有关可用版本,请参见https://pypi.org/project/zeebe-worker/#history。
使用
fromzeebe_workerimportZeebeWorkerfromextensionsimportzeebe_stubfromconfigimportworker_nameclassMyWorker(ZeebeWorker):defmy_task_type_handler(self,job):"""Handling my_task_type """variables=json.loads(job.variables)ifsomething_fails:# This will trigger a FailJobRequest with the exceptionraiseExceptionreturnvariablesdefanother_task_type_handler(self,job):"""Handles another task """# This will always succeed as an exception will never be raisedpass# Create your own class instance with your own configurationmy_worker=MyWorker(zeebe_stub,worker_name)# Subscribe to a task type (uses threading.Thread for concurrency)my_worker.subscribe('my_task_type','my_task_type_handler')my_worker.subscribe('my-task-typo','my_task_type_handler')my_worker.subscribe('another_task_type','another_task_type_handler')
美国石油学会
ZeebeWorker.__init__
使用设置的默认值初始化worker类。
arg | desc | default |
---|---|---|
stub | The grpc stub to connect to Zeebe with | - |
worker_name | The worker_name to send to along to Zeebe (mainly for debugging purposes) | - |
timeout | Number of milliseconds for a job to timeout | 5*60*1000 (5 minutes) |
request_timeout | Long polling: number of milliseconds for an ActivateJobs request to timeout | 1*60*1000 (1 minute) |
max_jobs_to_activate | Maximum amount of jobs to activate in one request | 1 |
backoff_interval | Number of milliseconds to backoff when unable to reach Zeebe | 5*1000 (5 seconds) |
ZeebeWorker.subscribe
同时将目标订阅到任务类型。
^{tb2}$目标函数
您自己的目标函数必须接受一个参数,最好称为job
。这将提供
就像Zeebe的ActivatedJob
(ref)。
使用variables = json.loads(job.variables)
提取变量。
工作失败
在函数中引发any exception将向zeebe发送一个FailJobRequest,其中包含引发的异常。
完成作业
如果函数在未引发异常的情况下执行异常,则将为该作业发送CompleteJobRequest。
设置变量
当函数返回dict时,它将把dict作为变量与CompleteJobRequest一起发送。
相容性
^{3}$- 项目
标签: