Flink上有状态函数的任务API

statefun-tasks的Python项目详细描述


逃避任务

Flink上有状态函数的任务API

这是什么?在

一个轻量级API,它借用了celry的概念,允许Python开发人员基于apacheflink有状态函数运行基于任务的工作流。工作流由接受参数的任务组成,这些任务可以作为延续链接到管道中。管道变成了燧石状态。在

@tasks.bind()
def greeting_workflow(first_name, last_name):
    return say_hello.send(first_name, last_name).continue_with(say_goodbye)


@tasks.bind()
def say_hello(first_name, last_name):
    return f'Hello {first_name} {last_name}'


@tasks.bind()
def say_goodbye(greeting):
    return f'{greeting}.  So now I will say goodbye'

动机

在我的职业生涯中,我使用celeri来编排分布式Python工作流。这些工作流通常是1-N-1基数,例如:

  • 装载股票组合
  • 对于每一个股票装载一个历史价格的时间序列
  • 对于每个时间序列,计算标准差
  • 返回标准差的平均值

在芹菜中使用简单的旧函数装饰为芹菜任务,这是相当微不足道的

^{pr2}$

可以在芹菜上作为工作流运行

stocks = ['BT.L', 'DAIGn.DE', 'BP.L']

workflow = 
    chain(group([chain(
        load_timeseries.s(stock),
        compute_std_dev.s())
    for stock in stocks]),
        compute_average.s()
    )

result = workflow.delay()

由于工作流是作为简单的函数实现的,所以它也可以测试和调试,而不必启动Celery

test_result = compute_average([compute_std_dev(load_timeseries('BT.L'))])

Flink对我来说很有趣,因为它不像Celery,它是polygot,远程函数可以用Python以外的语言编写。不过,我在Celery上运行的现有工作流需要移动。那会是什么样子?在

Flink状态函数

如果您已经阅读了这篇文章,并且还没有看到Flink有状态函数,那么就值得阅读apackeflink站点上的文档。Flink并没有真正定义工作流。函数可以调用其他函数,函数可以回复调用它们的函数,函数可以向某些出口发出结果,通常是Kakfa主题。这使得它不像芹菜一样受到限制,但同时也给开发人员带来了更多的工作。在

让我们从Flink的角度重新审视一下我们的股票示例。在

@functions.bind('examples/load_timeseries')
def load_timeseries(context, stock):
    prices = _load_prices(stock)
    context.send('examples/compute_std_dev', prices)


@functions.bind('examples/compute_std_dev')
def compute_std_dev(context, prices):
    context.reply(np.std(prices))

这方面的一些问题:

  1. load_timeseries()始终调用compute_std_dev()。它不再是一个可恢复的函数,所以我不能在其他工作流中使用它。在

  2. compute_std_dev()响应load_timeseries()。这意味着load_timeseries()需要同时接受股票作为输入或股票价格列表。在

  3. 随着工作流变得更加复杂,load_timeseries()将变形为编排函数:

@functions.bind('examples/load_timeseries')
def load_timeseries(context, input):

    if isinstance(input, str):
        prices = _load_prices(stock)
        context.send('examples/compute_std_dev', input)
    # elif ... next stage
    # elif ... next stage
    # elif ... next stage etc
    elif isinstance(input, double):  # finally reply to original caller
        context.pack_and_send_egress('topic', input)
  1. 这些函数不再可以通过在Flink之外将它们链接在一起进行测试

Flink任务

flinktasks将编排函数封装到一个管道中,这样开发人员就可以专注于编写简单的函数,这些函数可以使用直观的API组合到工作流中。由于工作流中的每个单独任务都作为单独的Flink状态函数调用运行,所以执行仍然是分布式的,并且可以根据需要进行扩展。在

tasks = FlinkTasks(
    default_namespace="example", 
    default_worker_name="worker", 
    egress_type_name="example/kafka-generic-egress")


@tasks.bind()
def timeseries_workflow():
    stocks = ['BT.L', 'DAIGn.DE', 'BP.L']

    return in_parallel(
        [load_timeseries.send(stock).continue_with(compute_std_dev) for stock in stocks]
    ).continue_with(compute_average)


@tasks.bind()
def load_timeseries(stock):
    return _load_prices(stock)


@tasks.bind()
def compute_std_dev(prices):
    return np.std(prices)


@tasks.bind()
def compute_average(std_devs):
    return np.mean(std_devs) 


@functions.bind("example/worker")
def worker(context, task_data: Union[TaskRequest, TaskResult, TaskException]):
    try:
        tasks.run(context, task_data)
    except Exception as e:
        print(f'Error - {e}')
        traceback.print_exc()

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java从Dropwizard中的Minio检索文件时,GET请求中的超时是如何处理的?   带Hibernate的java Jackson用于序列化以避免枚举   Raspberry Pi上的java Jave分段错误   java在屏幕旋转时不保存当前片段和数据   java War文件未在Heroku上正确部署   如何使用Java处理Selenium webdriver中的促销广告或cookie   java处理“用法:PApplet[options]<classname>[sketch args]”   java文本文件错误扫描程序   运行第一个JavaFX模块化程序时出现java异常   java将fileoutputstream转换为字符串   如何调试gstreamerjava?   java Spring RestTemplate ResponseBody类是什么样的   如何将JSON数组转换为Java列表。我在用斯文森   javascript在显示div按钮后进入新页面