我有自己的Spring云数据流处理器,里面有Python,我使用这个示例作为指导:https://dataflow.spring.io/docs/recipes/polyglot/processor/。
然后我想扩展并创建其中的三个处理器,因此使用spring.cloud.deployer.myApp.count=3
我创建了3个包含Python的pod。
我稍微修改了示例中的一个代码:当我创建一个Kafka消费者时,我还传递了一个组id,所以消息应该是负载平衡的。你知道吗
consumer = KafkaConsumer(get_input_channel(), group_id=get_consumer_group(), bootstrap_servers=[get_kafka_binder_brokers()])
问题是SCDF创建了一个只有一个分区的Kafka主题,所以消息只能到达一个pod。 所以我想知道:
请花点时间回顾一下Spring云数据流的responsibilities。如果不清楚,SCDF既不会与Kafka之类的支持消息传递中间件交互,也不会在运行时使用它。换句话说,SCDF不会创建与之相关的主题或分区—它只是自动配置SpringCloudStream(SCSt)属性。你知道吗
但是,如果您在自定义处理器中使用SCSt,那么该框架会自动将所需通道绑定到中间件中的底层主题。该框架还具有更改分区行为的功能。也可以使用过度分区主题部署处理器。有several other configuration options来构建所需的流数据处理行为。你知道吗
您正在查看的Python示例没有SCSt提供的所有特性。这个方法是一个示例演练,演示了如何用Python构建本机处理器风格的应用程序,其中生产者和使用者配置是在Python代码本身中手动创建的。SCDF和SCSt均不影响此配方中的应用行为。你知道吗
如前所述,SCDF不与卡夫卡互动。你知道吗
如果您的定制处理器不是springcloudstream应用程序,那么您有责任在代码中显式地定义主题+分区。你知道吗
春云溪。参见上面的解释。你知道吗
您不一定需要重新启动流数据管道。如果您的主题是预先过度分区的,那么运行时的任何其他使用者都应该能够自动参与竞争的使用者关系。请注意spring-io/dataflow.spring.io#156-我们正在添加一个配方来演示使用SCSt+SCDF+Kafka手动和自动缩放的可能性。你知道吗
通过在Python容器启动脚本中引入以下代码,https://dataflow.spring.io/docs/recipes/polyglot/processor/中提供了改进的代码,我们可以绕过这个问题。使用SCDF服务器传递的参数获取代理URL、主题名称、实例数:
相关问题 更多 >
编程相关推荐