如何使用kafka python订阅多个kafka通配符模式的列表?

2024-09-30 12:18:28 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用带有通配符的模式订阅Kafka,如下所示。通配符表示动态客户id

consumer.subscribe(pattern='customer.*.validations')

这很有效,因为我可以从主题字符串中提取客户Id。但现在我需要扩展这个功能来收听一个类似的主题,目的略有不同。我们称之为customer.*.additional-validations。代码需要存在于同一个项目中,因为有很多功能是共享的,但是我需要能够根据队列的类型采取不同的路径。

Kafka documentation中,我可以看到订阅一个主题数组是可能的。但是这些是硬编码字符串。不是允许灵活性的模式。

>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
...     assert isinstance(msg.value, dict)

所以我想知道是否可以把两者结合起来?有点像这样(不工作):

consumer.subscribe(pattern=['customer.*.validations', 'customer.*.additional-validations'])

Tags: kafka字符串功能主题客户consumervalue模式
1条回答
网友
1楼 · 发布于 2024-09-30 12:18:28

在KafkaConsumer代码中,它支持主题列表或模式

https://github.com/dpkp/kafka-python/blob/68c8fa4ad01f8fef38708f257cb1c261cfac01ab/kafka/consumer/group.py#L717

   def subscribe(self, topics=(), pattern=None, listener=None):
        """Subscribe to a list of topics, or a topic regex pattern
        Partitions will be dynamically assigned via a group coordinator.
        Topic subscriptions are not incremental: this list will replace the
        current assignment (if there is one).

因此,您可以使用|创建一个带有或条件的regex,该regex应该作为订阅多个动态主题regex工作,因为它在内部使用re模块进行匹配。

(customer.*.validations)|(customer.*.additional-validations)

相关问题 更多 >

    热门问题