Nameko微服务框架的Kafka扩展

nameko-kafka的Python项目详细描述


卡夫卡名字

Build Statuscodecov.ioMIT licensed

Nameko微服务框架的Kafka扩展。在

简介

这是一个Nameko微服务框架extension来支持 卡夫卡切入点和依赖性。这个项目背后的动机是问题569Nameko kafka基于calumpeterwebb的方法提供了入口点的简单实现。 它还包括一个依赖提供者,用于从Nameko服务中发布Kafka消息。在

安装

软件包支持Python>;=3.5

$ pip install nameko-kafka

使用

扩展可以同时用于服务依赖项和入口点。中显示了这两种情况的用法示例 以下章节。在

依赖关系

这基本上是一个以Nameko依赖形式出现的python-kafka生产者。 Nameko使用依赖注入来实例化生产者。您只需在服务类中声明它,如下所示:

^{pr2}$

这里KafkaProducer接受对python-kafkaKafkaProducer有效的所有选项。在

入口点

您可以在服务中使用nameko_kafka.consume修饰符来处理Kafka消息:

fromnameko_kafkaimportconsumeclassMyService:"""        My microservice     """name="my-service"@consume("kafka-topic",group_id="my-group",bootstrap_servers='localhost:1234')defmethod(self,message):# Your message handlerhandle_message(message)

consume修饰符接受对python-kafkaKafkaConsumer有效的所有选项。在

除了默认的python-kafka的自动提交功能之外,入口点还支持三种不同的功能 偏移提交策略的类型:至少一次最多一次和{em1}$正好一次。这三种策略相对应 不同的消息传递语义。下面的小节中显示了每种方法的示例。在

至少一次

fromnameko_kafkaimportconsume,SemanticclassMyService:"""        My microservice     """name="my-service"# At least once semantic consumer@consume("kafka-topic",group_id="my-group",bootstrap_servers='localhost:1234',semantic=Semantic.AT_LEAST_ONCE)defmethod(self,message):# Your message handlerhandle_message(message)

最多一次

fromnameko_kafkaimportconsume,SemanticclassMyService:"""        My microservice     """name="my-service"# At most once semantic consumer@consume("kafka-topic",group_id="my-group",bootstrap_servers='localhost:1234',semantic=Semantic.AT_MOST_ONCE)defmethod(self,message):# Your message handlerhandle_message(message)

正好一次

精确一次语义需要一个持久存储来保存消息偏移量。这样的持久存储可以 使用Nameko kafka提供的OffsetStorage接口实现。可以有各种后端实现 如RDBMS、NoSQL数据库等。对某些数据库的支持是现成的:

MongoDB存储
fromnameko_kafkaimportconsume,Semanticfromnameko_kafka.storageimportMongoStoragefrompymongoimportMongoClientclassMyService:"""        My microservice     """name="my-service"# At most once semantic consumer@consume("kafka-topic",group_id="my-group",bootstrap_servers='localhost:1234',semantic=Semantic.EXACTLY_ONCE,storage=MongoStorage(# MongoDB backend clientclient=MongoClient('localhost',27017),# Database to use for storagedb_name="database-name",# Collection to use for storagecollection="collection-name"))defmethod(self,message):# Your message handlerhandle_message(message)

注意:如果没有指定db_namecollection参数,则默认值"nameko_kafka_offsets""offsets"将分别由存储器使用。在

SQL存储

v0.2.1的一部分

S3存储

v0.2.2的一部分

Azure块存储

v0.2.3的一部分

创建自定义存储

您可以通过实现OffsetStorage接口创建自己的偏移存储。它公开了以下方法:

fromnameko_kafka.storage.baseimportOffsetStorageclassMyStorage(OffsetStorage):"""        My custom offset storage.    """defsetup(self):"""            Method for setup of the storage.        """defstop(self):"""            Method to teardown the storage.        """defread(self,topic,partition):"""            Read last stored offset from storage for             given topic and partition.            :param topic: message topic            :param partition: partition number of the topic            :returns: last committed offset value        """defwrite(self,offsets):"""            Write offsets to storage.            :param offsets: mapping between topic-partition                tuples and corresponding latest offset value,                 e.g.                {                    ("topic-1", 0): 1,                    ("topic-1", 1): 3,                    ("topic-2", 1): 10,                    ...                }        """

配置

扩展配置可以在namekoconfig.yaml文件中设置,或者 环境变量。在

配置文件

# Config for entrypointKAFKA_CONSUMER:bootstrap_servers:'localhost:1234'retry_backoff_ms:100...# Config for dependencyKAFKA_PRODUCER:bootstrap_servers:'localhost:1234'retries:3...

环境变量

# Config for entrypoint
KAFKA_CONSUMER='{"bootstrap_servers": "localhost:1234", "retry_backoff_ms": 100}'

# Config for dependency
KAFKA_PRODUCER='{"bootstrap_servers": "localhost:1234", "retries": 3}'

里程碑

  • [x] 卡夫卡入口点
  • [x] 卡夫卡依赖
  • [x] 承诺策略:
    • 几乎一次交货
    • 至少一次交货
    • 正好是一次交货
  • [x] 为EXACT\u ONCE_DELIVERY策略提交存储

开发商

发展需要卡夫卡经纪人。您可以使用docker-compose.yml生成一个 tests文件夹中的文件:

$ cd tests
$ docker-compose up -d 

要安装所有程序包依赖项,请执行以下操作:

$ pip install -r .[dev]
or
$ make deps

其他有用的命令:

$ pytest --cov=nameko_kafka tests/			# to get coverage report
or
$ make coverage

$ pylint nameko_kafka       # to check code quality with PyLint
or
$ make lint

捐款

发布报告和拉取请求总是受欢迎的。谢谢!在

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java文件路径中的java UTF8字符   java如何可靠地写入OPC UA服务器?   for-my-if语句中的java循环不能产生我想要的结果   java我如何在1个XSL中加入2个XML   macos无法在“/Library/Java/JavaVirtualMachines/jdk15.0.1.jdk/Contents/Home/bin/apt”中找到可执行文件   Java代码简单数学   java如何避免selenium中的sendKeys方法清除之前填充的文本?   java如何将PlacesAPI自动完成小部件传递到自定义对话框?   带有文本块的java Intellij json片段问题   启动时Spring启动错误:java。木卫一。IOException:jsse。别名\u否\u键\u项   移动文件时发生java异常   http Java Web客户端远程连接