咖啡管
os-scrapy-kafka-pipeline的Python项目详细描述
os-scrapy-kafka管道
这个项目提供了一个以JSON格式发送Scrapy Item到kafka的管道
特点:
- 中支持配置默认kafka代理和主题设置.py文件
- 支持kafka-pythonproducer初始化参数
- 支持使用item meta动态连接并发送到其他kafka集群和topic
- item将以JSON格式发送到kafka,如果无法进行utf-8编码,字节可以编码为base64字符串
安装
pip install os-scrapy-kafka-pipeline
您可以直接在项目根路径中运行示例spider。在
^{pr2}$使用
设置
- 在
在项目中启用管道设置.py文件
在ITEM_PIPELINES = { "os_scrapy_kafka_pipeline.KafkaPipeline": 300, }
- 在
配置默认kafka代理
KAFKA_PRODUCER_BROKERS = ["broker01.kafka:9092", "broker02.kafka:9092"]
- 项目meta中的代理将覆盖此默认值
- 当此设置无法启动kafka连接时,将不启用管道
- 当没有配置代理时,它将引发异常
- 在
配置默认kafkaproducer
KAFKA_PRODUCER_CONFIGS = {"client_id": "id01", "retries": 1}
- 这是全局配置,动态连接将使用此配置
- 当
KAFKA_PRRDUCER_BROKERS
已配置时,bootstrap_servers
将不起作用
- 在
配置默认主题
KAFKA_PRODUCER_TOPIC = "topic01"
- 中的配置项目.meta将覆盖此配置
- 当没有配置主题时,它将引发异常
- 在
Config kafka python loglevel(默认为“警告”)
在KAFKA_PRODUCER_LOGLEVEL = "DEBUG"
- 在
配置kafka producer关闭超时(默认值:无)
在KAFKA_PRODUCER_CLOSE_TIMEOUT = 10
- 在
确保base64
item mumber的字节类型将被utf-8解码,如果解码失败,当您设置:
在KAFKA_VALUE_ENSURE_BASE64 = True
- 在
过滤字段
您可以过滤不导出并发送到kafka的项目字段
在KAFKA_EXPORT_FILTER = ["filtered_field"]
动态卡夫卡联系项目.meta
- 在
您可以使用item[“meta”]设置主题、键、分区
在 - 在
项目必须具有dict类型的meta-mumber
在 - 在
选项:
在meta = { "kafka.topic": "topic01", "kafka.key": "key01", "kafka.partition": 1, "kafka.brokers": "broker01.kafka:9092,broker02.kafka:9092" }
存储格式
项将以JSON格式发送到kafka,字节将编码为base64
单元测试
sh scripts/test.sh
许可证
麻省理工学院授权。在
- 项目
标签: