bspump是python 3.5的实时流处理器+

bspump的Python项目详细描述


Join the chat at https://gitter.im/TeskaLabs/bspump

原理

  • 写一次,多次使用
  • 一切都是一条小溪
  • 无模式
  • 卡帕建筑
  • 实时
  • 高性能
  • 简单易用且文档齐全,因此任何人都可以编写自己的流处理器
  • 通过python 3.5+async/awaitasyncio
  • 异步
  • Event driven Architecture/Reactor pattern
  • 单线程内核,但与线程兼容
  • pypy兼容,能够将python代码性能提高5倍以上的即时编译器
  • python生态系统的好公民
  • 模块化

流处理器示例

#!/usr/bin/env python3importbspumpimportbspump.socketimportbspump.commonimportbspump.elasticsearchclassMyPipeline(bspump.Pipeline):def__init__(self,app):super().__init__(app)self.build(bspump.socket.TCPStreamSource(app,self),bspump.common.JSONParserProcessor(app,self),bspump.elasticsearch.ElasticSearchSink(app,self,"ESConnection"))if__name__=='__main__':app=bspump.BSPumpApplication()svc=app.get_service("bspump.PumpService")svc.add_connection(bspump.elasticsearch.ElasticSearchConnection(app,"ESConnection"))svc.add_pipeline(MyPipeline(app))app.run()

视频教程

http://img.youtube.com/vi/QvjiPxO4w6w/0.jpg

空白应用程序设置

您可以从it’s own repository克隆空白应用程序。

可用技术

  • bspump.amqpamqp/rabbitmq连接,源和汇
  • bspump.avroapache avro文件源和汇
  • bspump.common通用处理器和分析器
  • bspump.elasticsearchelasticsearch连接、源和汇
  • bspump.file文件源和接收器(普通文件、json、csv)
  • bspump.filter内容、属性和时间漂移过滤器
  • bspump.http.clienthttp客户端源,websocket客户端接收器
  • bspump.http.webhttp服务器源和接收器,websocket服务器源
  • bspump.influxdbinfloxdb连接和接收器
  • bspump.kafkakafka连接,源和汇
  • bspump.mailsmtp连接和接收器
  • bspump.mongodbmongodb连接和查找
  • bspump.mysqlmysql连接,源和汇
  • bspump.parquetapache parquet文件接收器
  • bspump.postgresqlpostgresql连接和接收器
  • bspump.slack松弛连接和接收器
  • bspump.sockettcp源,udp源
  • bspump.trigger机会触发器、pubsub触发器和周期触发器
  • bspump.crypto加密
    • 散列:sha224、sha256、sha384、sha512、sha1、md5、blake2b、blake2s
    • 对称加密:AES 128、AES 192、AES 256
  • bspump.analyzer
    • 时间窗口分析器
    • 会话分析器
    • 地理分析仪
    • 时间漂移分析仪
  • bspump.lookup
    • 地理IP查找
  • bspump.unittest
    • 用于测试处理器/管道的接口
  • bspump.oob带外接收器和引擎
  • bspump.webpump管道、查找等的api端点。

谷歌技术兼容矩阵表: https://docs.google.com/spreadsheets/d/1L1DvSuHuhKUyZ3FEFxqEKNpSoamPH2Z1ZaFuHyageoI/edit?usp=sharing

高级体系结构

Schema of BSPump high-level achitecture

单元测试

fromunittest.mockimportMagicMockfrombspump.unittestimportProcessorTestCaseclassMyProcessorTestCase(ProcessorTestCase):deftest_my_processor(self):# setup processor for testself.set_up_processor(my_project.processor.MyProcessor)# mock methods to suit your needs on pipeline ..self.Pipeline.method=MagicMock()# .. or instance of processormy_processor=self.Pipeline.locate_processor("MyProcessor")my_processor.method=MagicMock()output=self.execute([(None,{'foo':'bar'})]# Context, event)# assert outputself.assertEqual([eventforcontext,eventinoutput],[{'FOO':'BAR'}])# asssert expected calls on `self.Pipeline.method` or `my_processor.method`my_processor.method.assert_called_with(**expected)

单元测试的运行

python3 -m unittest test

可以用单元测试模块的位置替换test

许可证

bspump是一个开源软件,根据bsd 3条款许可证提供。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
eclipse java btc交易应用程序编译时错误(HTTP组件)   java客户端状态保存,我能看到发送到客户端的状态数据吗?   spring中的java内部字段注入工作以及为什么不建议使用它   在iReports、Jasper和JavaBean数据源中使用表功能   如何在Java树中查找节点   用于日志分析的java模式匹配   java如何在Spring中处理json列表?   java SQL注入和可能的攻击   java如果我将实体转换为DTO,那么转换代码应该存在于何处?   java素数方法不起作用   通过NFC启动java Android Q隐私更改活动   java在Spring引导时为Spring AMQP和RabbitMQ动态设置主机   swing Java Paste From Clipboard不适用于Linux上的所有应用程序   java从不同对象访问对象的内部类   java Ektorp CouchDB测试连接   十进制前8位、十进制后2位和整个字符串的java正则表达式不应计算为零   在Java中以长格式存储两个整数之和   ConfigurationProperties中嵌套属性的java验证