擅长:python、mysql、java
<p>通过在Python容器启动脚本中引入以下代码,<a href="https://dataflow.spring.io/docs/recipes/polyglot/processor/" rel="nofollow noreferrer">https://dataflow.spring.io/docs/recipes/polyglot/processor/</a>中提供了改进的代码,我们可以绕过这个问题。使用SCDF服务器传递的参数获取代理URL、主题名称、实例数:</p>
<pre><code>admin_client = KafkaAdminClient(bootstrap_servers=[get_kafka_binder_brokers()], client_id=sys.argv[0])
partition_count = get_cmd_arg("spring.cloud.stream.instanceCount")
# create Kafka topic if does not exist
new_topic = NewTopic(name=get_input_channel(), num_partitions=partition_count, replication_factor=1)
try:
admin_client.create_topics(new_topics=[new_topic])
except TopicAlreadyExistsError:
logging.info(f"Topic {get_input_channel()} was already created")
# add Kafka partitions to existing topic
new_partitions = NewPartitions(total_count=partition_count)
try:
admin_client.create_partitions(topic_partitions={get_input_channel(): new_partitions})
except InvalidPartitionsError as exp:
logging.info(f"No need to increase Kafka partitions for topic {get_input_channel()}")
</code></pre>