Python管
canal-py2的Python项目详细描述
canal-py2
First, canal-py2 is fork by https://github.com/haozi3156666/canal-python thx.
一.canal-py2 简介
canal-py2 是阿里巴巴开源项目 Canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件 的 python 客户端。为 python 开发者提供一个更友好的使用 Canal 的方式。Canal 是mysql数据库binlog的增量订阅&消费组件。
基于日志增量订阅&消费支持的业务:
- 数据库镜像
- 数据库实时备份
- 多级索引 (卖家和买家各自分库索引)
- search build
- 业务cache刷新
- 价格变化等重要业务消息
关于 Canal 的更多信息请访问 https://github.com/alibaba/canal/wiki
二.工作原理
canal-py2 是 Canal 的 python 客户端,它与 Canal 是采用的Socket来进行通信的,传输协议是TCP,交互协议采用的是 Google Protocol Buffer 3.0。
三.工作流程
1.Canal连接到mysql数据库,模拟slave
2.canal-py2 与 Canal 建立连接
2.数据库发生变更写入到binlog
5.Canal向数据库发送dump请求,获取binlog并解析
4.canal-py2 向 Canal 请求数据库变更
4.Canal 发送解析后的数据给canal-py2
5.canal-py2 收到数据,消费成功,发送回执。(可选)
6.Canal记录消费位置。
四.快速启动
安装Canal
Canal 的安装以及配置使用请查看 https://github.com/alibaba/canal/wiki/QuickStart
环境要求
python 2.7 (canal-python支持python3)
构建canal python客户端
pip install canal-py2
建立与Canal的连接
importtimefromcanal.clientimportClientfromcanal.protocolimportEntryProtocol_pb2fromcanal.protocolimportCanalProtocol_pb2client=Client()client.connect(host='127.0.0.1',port=11111)client.check_valid(username='canal',password='canal')client.subscribe(client_id='1001',destination='example',filter='.*\\..*')whileTrue:message=client.get(100)entries=message['entries']forentryinentries:entry_type=entry.entryTypeifentry_typein[EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN,EntryProtocol_pb2.EntryType.TRANSACTIONEND]:continuerow_change=EntryProtocol_pb2.RowChange()row_change.MergeFromString(entry.storeValue)event_type=row_change.eventTypeheader=entry.headerdatabase=header.schemaNametable=header.tableNameevent_type=header.eventTypeforrowinrow_change.rowDatas:format_data=dict()ifevent_type==EntryProtocol_pb2.EventType.DELETE:forcolumninrow.beforeColumns:format_data={column.name:column.value}elifevent_type==EntryProtocol_pb2.EventType.INSERT:forcolumninrow.afterColumns:format_data={column.name:column.value}else:format_data['before']=format_data['after']=dict()forcolumninrow.beforeColumns:format_data['before'][column.name]=column.valueforcolumninrow.afterColumns:format_data['after'][column.name]=column.valuedata=dict(db=database,table=table,event_type=event_type,data=format_data,)print(data)time.sleep(1)client.disconnect()
- 项目
标签: