bspump是python 3.5的实时流处理器+
bspump的Python项目详细描述
原理
- 写一次,多次使用
- 一切都是一条小溪
- 无模式
- 卡帕建筑
- 实时
- 高性能
- 简单易用且文档齐全,因此任何人都可以编写自己的流处理器
- 通过python 3.5+async/await和asyncio 异步
- Event driven Architecture/Reactor pattern
- 单线程内核,但与线程兼容
- 与pypy兼容,能够将python代码性能提高5倍以上的即时编译器
- python生态系统的好公民
- 模块化
流处理器示例
#!/usr/bin/env python3importbspumpimportbspump.socketimportbspump.commonimportbspump.elasticsearchclassMyPipeline(bspump.Pipeline):def__init__(self,app):super().__init__(app)self.build(bspump.socket.TCPStreamSource(app,self),bspump.common.JSONParserProcessor(app,self),bspump.elasticsearch.ElasticSearchSink(app,self,"ESConnection"))if__name__=='__main__':app=bspump.BSPumpApplication()svc=app.get_service("bspump.PumpService")svc.add_connection(bspump.elasticsearch.ElasticSearchConnection(app,"ESConnection"))svc.add_pipeline(MyPipeline(app))app.run()
空白应用程序设置
您可以从it’s own repository克隆空白应用程序。
可用技术
- bspump.amqpamqp/rabbitmq连接,源和汇
- bspump.avroapache avro文件源和汇
- bspump.common通用处理器和分析器
- bspump.elasticsearchelasticsearch连接、源和汇
- bspump.file文件源和接收器(普通文件、json、csv)
- bspump.filter内容、属性和时间漂移过滤器
- bspump.http.clienthttp客户端源,websocket客户端接收器
- bspump.http.webhttp服务器源和接收器,websocket服务器源
- bspump.influxdbinfloxdb连接和接收器
- bspump.kafkakafka连接,源和汇
- bspump.mailsmtp连接和接收器
- bspump.mongodbmongodb连接和查找
- bspump.mysqlmysql连接,源和汇
- bspump.parquetapache parquet文件接收器
- bspump.postgresqlpostgresql连接和接收器
- bspump.slack松弛连接和接收器
- bspump.sockettcp源,udp源
- bspump.trigger机会触发器、pubsub触发器和周期触发器
- bspump.crypto加密
- 散列:sha224、sha256、sha384、sha512、sha1、md5、blake2b、blake2s
- 对称加密:AES 128、AES 192、AES 256
- bspump.analyzer
- 时间窗口分析器
- 会话分析器
- 地理分析仪
- 时间漂移分析仪
- bspump.lookup
- 地理IP查找
- bspump.unittest
- 用于测试处理器/管道的接口
- bspump.oob带外接收器和引擎
- bspump.webpump管道、查找等的api端点。
谷歌技术兼容矩阵表: https://docs.google.com/spreadsheets/d/1L1DvSuHuhKUyZ3FEFxqEKNpSoamPH2Z1ZaFuHyageoI/edit?usp=sharing
高级体系结构
单元测试
fromunittest.mockimportMagicMockfrombspump.unittestimportProcessorTestCaseclassMyProcessorTestCase(ProcessorTestCase):deftest_my_processor(self):# setup processor for testself.set_up_processor(my_project.processor.MyProcessor)# mock methods to suit your needs on pipeline ..self.Pipeline.method=MagicMock()# .. or instance of processormy_processor=self.Pipeline.locate_processor("MyProcessor")my_processor.method=MagicMock()output=self.execute([(None,{'foo':'bar'})]# Context, event)# assert outputself.assertEqual([eventforcontext,eventinoutput],[{'FOO':'BAR'}])# asssert expected calls on `self.Pipeline.method` or `my_processor.method`my_processor.method.assert_called_with(**expected)
单元测试的运行
python3 -m unittest test
可以用单元测试模块的位置替换test。
许可证
bspump是一个开源软件,根据bsd 3条款许可证提供。