Flink上有状态函数的任务API
statefun-tasks的Python项目详细描述
逃避任务
Flink上有状态函数的任务API
这是什么?在
一个轻量级API,它借用了celry的概念,允许Python开发人员基于apacheflink有状态函数运行基于任务的工作流。工作流由接受参数的任务组成,这些任务可以作为延续链接到管道中。管道变成了燧石状态。在
@tasks.bind()
def greeting_workflow(first_name, last_name):
return say_hello.send(first_name, last_name).continue_with(say_goodbye)
@tasks.bind()
def say_hello(first_name, last_name):
return f'Hello {first_name} {last_name}'
@tasks.bind()
def say_goodbye(greeting):
return f'{greeting}. So now I will say goodbye'
动机
在我的职业生涯中,我使用celeri来编排分布式Python工作流。这些工作流通常是1-N-1基数,例如:
- 装载股票组合
- 对于每一个股票装载一个历史价格的时间序列
- 对于每个时间序列,计算标准差
- 返回标准差的平均值
在芹菜中使用简单的旧函数装饰为芹菜任务,这是相当微不足道的
^{pr2}$可以在芹菜上作为工作流运行
stocks = ['BT.L', 'DAIGn.DE', 'BP.L']
workflow =
chain(group([chain(
load_timeseries.s(stock),
compute_std_dev.s())
for stock in stocks]),
compute_average.s()
)
result = workflow.delay()
由于工作流是作为简单的函数实现的,所以它也可以测试和调试,而不必启动Celery
test_result = compute_average([compute_std_dev(load_timeseries('BT.L'))])
Flink对我来说很有趣,因为它不像Celery,它是polygot,远程函数可以用Python以外的语言编写。不过,我在Celery上运行的现有工作流需要移动。那会是什么样子?在
Flink状态函数
如果您已经阅读了这篇文章,并且还没有看到Flink有状态函数,那么就值得阅读apackeflink站点上的文档。Flink并没有真正定义工作流。函数可以调用其他函数,函数可以回复调用它们的函数,函数可以向某些出口发出结果,通常是Kakfa主题。这使得它不像芹菜一样受到限制,但同时也给开发人员带来了更多的工作。在
让我们从Flink的角度重新审视一下我们的股票示例。在
@functions.bind('examples/load_timeseries')
def load_timeseries(context, stock):
prices = _load_prices(stock)
context.send('examples/compute_std_dev', prices)
@functions.bind('examples/compute_std_dev')
def compute_std_dev(context, prices):
context.reply(np.std(prices))
这方面的一些问题:
- 在
load_timeseries()始终调用compute_std_dev()。它不再是一个可恢复的函数,所以我不能在其他工作流中使用它。在
在 - 在
compute_std_dev()响应load_timeseries()。这意味着load_timeseries()需要同时接受股票作为输入或股票价格列表。在
在 - 在
随着工作流变得更加复杂,load_timeseries()将变形为编排函数:
在
@functions.bind('examples/load_timeseries')
def load_timeseries(context, input):
if isinstance(input, str):
prices = _load_prices(stock)
context.send('examples/compute_std_dev', input)
# elif ... next stage
# elif ... next stage
# elif ... next stage etc
elif isinstance(input, double): # finally reply to original caller
context.pack_and_send_egress('topic', input)
- 这些函数不再可以通过在Flink之外将它们链接在一起进行测试
Flink任务
flinktasks将编排函数封装到一个管道中,这样开发人员就可以专注于编写简单的函数,这些函数可以使用直观的API组合到工作流中。由于工作流中的每个单独任务都作为单独的Flink状态函数调用运行,所以执行仍然是分布式的,并且可以根据需要进行扩展。在
tasks = FlinkTasks(
default_namespace="example",
default_worker_name="worker",
egress_type_name="example/kafka-generic-egress")
@tasks.bind()
def timeseries_workflow():
stocks = ['BT.L', 'DAIGn.DE', 'BP.L']
return in_parallel(
[load_timeseries.send(stock).continue_with(compute_std_dev) for stock in stocks]
).continue_with(compute_average)
@tasks.bind()
def load_timeseries(stock):
return _load_prices(stock)
@tasks.bind()
def compute_std_dev(prices):
return np.std(prices)
@tasks.bind()
def compute_average(std_devs):
return np.mean(std_devs)
@functions.bind("example/worker")
def worker(context, task_data: Union[TaskRequest, TaskResult, TaskException]):
try:
tasks.run(context, task_data)
except Exception as e:
print(f'Error - {e}')
traceback.print_exc()
- 项目
标签: