Spring云数据流主题创建

2024-09-30 02:25:51 发布

您现在位置:Python中文网/ 问答频道 /正文

我有自己的Spring云数据流处理器,里面有Python,我使用这个示例作为指导:https://dataflow.spring.io/docs/recipes/polyglot/processor/。 然后我想扩展并创建其中的三个处理器,因此使用spring.cloud.deployer.myApp.count=3我创建了3个包含Python的pod。 我稍微修改了示例中的一个代码:当我创建一个Kafka消费者时,我还传递了一个组id,所以消息应该是负载平衡的。你知道吗

consumer = KafkaConsumer(get_input_channel(), group_id=get_consumer_group(), bootstrap_servers=[get_kafka_binder_brokers()])

问题是SCDF创建了一个只有一个分区的Kafka主题,所以消息只能到达一个pod。 所以我想知道:

  • 我是否应该配置SCDF来创建一个带有3个分区的Kafka主题?你知道吗
  • 或者我不应该依赖SCDF并用Python自己创建主题?我想这是多余的,因为SCDF也创建了这个主题。你知道吗
  • SCDF中的哪个组件实际上负责创建卡夫卡主题?我如何影响分区的数量呢?你知道吗
  • 如果我停止这个流并用4个处理器步骤再次启动,这个主题应该用第4个分区扩展吗?因为当前没有创建新分区。你知道吗

Tags: kafkaid消息示例主题getconsumergroup
2条回答

请花点时间回顾一下Spring云数据流的responsibilities。如果不清楚,SCDF既不会与Kafka之类的支持消息传递中间件交互,也不会在运行时使用它。换句话说,SCDF不会创建与之相关的主题或分区—它只是自动配置SpringCloudStream(SCSt)属性。你知道吗

但是,如果您在自定义处理器中使用SCSt,那么该框架会自动将所需通道绑定到中间件中的底层主题。该框架还具有更改分区行为的功能。也可以使用过度分区主题部署处理器。有several other configuration options来构建所需的流数据处理行为。你知道吗

您正在查看的Python示例没有SCSt提供的所有特性。这个方法是一个示例演练,演示了如何用Python构建本机处理器风格的应用程序,其中生产者和使用者配置是在Python代码本身中手动创建的。SCDF和SCSt均不影响此配方中的应用行为。你知道吗

Should I somehow configure SCDF to create a Kafka topic with 3 partitions?

如前所述,SCDF不与卡夫卡互动。你知道吗

Or should I not rely on SCDF and create topics on my own in Python? I suppose this will be redundant, since SCDF also creates this topic.

如果您的定制处理器不是springcloudstream应用程序,那么您有责任在代码中显式地定义主题+分区。你知道吗

What component in SCDF is actually responsible for Kafka topics creation? And how can I influence it regarding number of partitions?

春云溪。参见上面的解释。你知道吗

If I stop this stream and launch again with 4 processor steps, should the topic be extended with the 4th partition? Because currently no new partitions get created.

您不一定需要重新启动流数据管道。如果您的主题是预先过度分区的,那么运行时的任何其他使用者都应该能够自动参与竞争的使用者关系。请注意spring-io/dataflow.spring.io#156-我们正在添加一个配方来演示使用SCSt+SCDF+Kafka手动和自动缩放的可能性。你知道吗

通过在Python容器启动脚本中引入以下代码,https://dataflow.spring.io/docs/recipes/polyglot/processor/中提供了改进的代码,我们可以绕过这个问题。使用SCDF服务器传递的参数获取代理URL、主题名称、实例数:

admin_client = KafkaAdminClient(bootstrap_servers=[get_kafka_binder_brokers()], client_id=sys.argv[0])

partition_count = get_cmd_arg("spring.cloud.stream.instanceCount")

# create Kafka topic if does not exist
new_topic = NewTopic(name=get_input_channel(), num_partitions=partition_count, replication_factor=1)
try:
    admin_client.create_topics(new_topics=[new_topic])
except TopicAlreadyExistsError:
    logging.info(f"Topic {get_input_channel()} was already created")

# add Kafka partitions to existing topic
new_partitions = NewPartitions(total_count=partition_count)
try:
    admin_client.create_partitions(topic_partitions={get_input_channel(): new_partitions})
except InvalidPartitionsError as exp:
    logging.info(f"No need to increase Kafka partitions for topic {get_input_channel()}")

相关问题 更多 >

    热门问题