Nameko微服务框架的Kafka扩展
nameko-kafka的Python项目详细描述
卡夫卡名字
Nameko微服务框架的Kafka扩展。在
简介
这是一个Nameko微服务框架extension来支持 卡夫卡切入点和依赖性。这个项目背后的动机是问题569。 Nameko kafka基于calumpeterwebb的方法提供了入口点的简单实现。 它还包括一个依赖提供者,用于从Nameko服务中发布Kafka消息。在
安装
软件包支持Python>;=3.5
$ pip install nameko-kafka
使用
扩展可以同时用于服务依赖项和入口点。中显示了这两种情况的用法示例 以下章节。在
依赖关系
这基本上是一个以Nameko依赖形式出现的python-kafka生产者。 Nameko使用依赖注入来实例化生产者。您只需在服务类中声明它,如下所示:
^{pr2}$这里KafkaProducer
接受对python-kafka
的KafkaProducer有效的所有选项。在
入口点
您可以在服务中使用nameko_kafka.consume
修饰符来处理Kafka消息:
fromnameko_kafkaimportconsumeclassMyService:""" My microservice """name="my-service"@consume("kafka-topic",group_id="my-group",bootstrap_servers='localhost:1234')defmethod(self,message):# Your message handlerhandle_message(message)
consume
修饰符接受对python-kafka
KafkaConsumer有效的所有选项。在
除了默认的python-kafka
的自动提交功能之外,入口点还支持三种不同的功能
偏移提交策略的类型:至少一次,最多一次和{em1}$正好一次。这三种策略相对应
不同的消息传递语义。下面的小节中显示了每种方法的示例。在
至少一次
fromnameko_kafkaimportconsume,SemanticclassMyService:""" My microservice """name="my-service"# At least once semantic consumer@consume("kafka-topic",group_id="my-group",bootstrap_servers='localhost:1234',semantic=Semantic.AT_LEAST_ONCE)defmethod(self,message):# Your message handlerhandle_message(message)
最多一次
fromnameko_kafkaimportconsume,SemanticclassMyService:""" My microservice """name="my-service"# At most once semantic consumer@consume("kafka-topic",group_id="my-group",bootstrap_servers='localhost:1234',semantic=Semantic.AT_MOST_ONCE)defmethod(self,message):# Your message handlerhandle_message(message)
正好一次
精确一次语义需要一个持久存储来保存消息偏移量。这样的持久存储可以
使用Nameko kafka提供的OffsetStorage
接口实现。可以有各种后端实现
如RDBMS、NoSQL数据库等。对某些数据库的支持是现成的:
MongoDB存储
fromnameko_kafkaimportconsume,Semanticfromnameko_kafka.storageimportMongoStoragefrompymongoimportMongoClientclassMyService:""" My microservice """name="my-service"# At most once semantic consumer@consume("kafka-topic",group_id="my-group",bootstrap_servers='localhost:1234',semantic=Semantic.EXACTLY_ONCE,storage=MongoStorage(# MongoDB backend clientclient=MongoClient('localhost',27017),# Database to use for storagedb_name="database-name",# Collection to use for storagecollection="collection-name"))defmethod(self,message):# Your message handlerhandle_message(message)
注意:如果没有指定db_name
和collection
参数,则默认值"nameko_kafka_offsets"
和
"offsets"
将分别由存储器使用。在
SQL存储
v0.2.1的一部分
S3存储
v0.2.2的一部分
Azure块存储
v0.2.3的一部分
创建自定义存储
您可以通过实现OffsetStorage
接口创建自己的偏移存储。它公开了以下方法:
fromnameko_kafka.storage.baseimportOffsetStorageclassMyStorage(OffsetStorage):""" My custom offset storage. """defsetup(self):""" Method for setup of the storage. """defstop(self):""" Method to teardown the storage. """defread(self,topic,partition):""" Read last stored offset from storage for given topic and partition. :param topic: message topic :param partition: partition number of the topic :returns: last committed offset value """defwrite(self,offsets):""" Write offsets to storage. :param offsets: mapping between topic-partition tuples and corresponding latest offset value, e.g. { ("topic-1", 0): 1, ("topic-1", 1): 3, ("topic-2", 1): 10, ... } """
配置
扩展配置可以在namekoconfig.yaml文件中设置,或者 环境变量。在
配置文件
# Config for entrypointKAFKA_CONSUMER:bootstrap_servers:'localhost:1234'retry_backoff_ms:100...# Config for dependencyKAFKA_PRODUCER:bootstrap_servers:'localhost:1234'retries:3...
环境变量
# Config for entrypoint KAFKA_CONSUMER='{"bootstrap_servers": "localhost:1234", "retry_backoff_ms": 100}' # Config for dependency KAFKA_PRODUCER='{"bootstrap_servers": "localhost:1234", "retries": 3}'
里程碑
- [x] 卡夫卡入口点
- [x] 卡夫卡依赖
- [x] 承诺策略:
- 几乎一次交货
- 至少一次交货
- 正好是一次交货
- [x] 为EXACT\u ONCE_DELIVERY策略提交存储
开发商
发展需要卡夫卡经纪人。您可以使用docker-compose.yml生成一个
tests
文件夹中的文件:
$ cd tests
$ docker-compose up -d
要安装所有程序包依赖项,请执行以下操作:
$ pip install -r .[dev] or $ make deps
其他有用的命令:
$ pytest --cov=nameko_kafka tests/ # to get coverage report or $ make coverage $ pylint nameko_kafka # to check code quality with PyLint or $ make lint
捐款
发布报告和拉取请求总是受欢迎的。谢谢!在
- 项目
标签: