咖啡管

os-scrapy-kafka-pipeline的Python项目详细描述


os-scrapy-kafka管道

Build StatuscodecovPyPI - Python VersionPyPI

这个项目提供了一个以JSON格式发送Scrapy Item到kafka的管道

特点:

  • 中支持配置默认kafka代理和主题设置.py文件
  • 支持kafka-pythonproducer初始化参数
  • 支持使用item meta动态连接并发送到其他kafka集群和topic
  • item将以JSON格式发送到kafka,如果无法进行utf-8编码,字节可以编码为base64字符串

安装

pip install os-scrapy-kafka-pipeline

您可以直接在项目根路径中运行示例spider。在

^{pr2}$

使用

设置

  • 在项目中启用管道设置.py文件

    ITEM_PIPELINES = {
        "os_scrapy_kafka_pipeline.KafkaPipeline": 300,
    }
    
  • 配置默认kafka代理

    KAFKA_PRODUCER_BROKERS = ["broker01.kafka:9092", "broker02.kafka:9092"]
    
    • 项目meta中的代理将覆盖此默认值
    • 当此设置无法启动kafka连接时,将不启用管道
    • 当没有配置代理时,它将引发异常
  • 配置默认kafkaproducer

    KAFKA_PRODUCER_CONFIGS = {"client_id": "id01", "retries": 1}
    
    • 这是全局配置,动态连接将使用此配置
    • KAFKA_PRRDUCER_BROKERS已配置时,bootstrap_servers将不起作用
  • 配置默认主题

    KAFKA_PRODUCER_TOPIC = "topic01"
    
    • 中的配置项目.meta将覆盖此配置
    • 当没有配置主题时,它将引发异常
  • Config kafka python loglevel(默认为“警告”)

    KAFKA_PRODUCER_LOGLEVEL = "DEBUG"
    
  • 配置kafka producer关闭超时(默认值:无)

    KAFKA_PRODUCER_CLOSE_TIMEOUT = 10
    
  • 确保base64

    item mumber的字节类型将被utf-8解码,如果解码失败,当您设置:

    KAFKA_VALUE_ENSURE_BASE64 = True
    
  • 过滤字段

    您可以过滤不导出并发送到kafka的项目字段

    KAFKA_EXPORT_FILTER = ["filtered_field"]
    

动态卡夫卡联系项目.meta

  • 您可以使用item[“meta”]设置主题、键、分区

  • 项目必须具有dict类型的meta-mumber

  • 选项:

    meta = {
        "kafka.topic": "topic01",
        "kafka.key": "key01",
        "kafka.partition": 1,
        "kafka.brokers": "broker01.kafka:9092,broker02.kafka:9092"
    }
    

存储格式

项将以JSON格式发送到kafka,字节将编码为base64

单元测试

sh scripts/test.sh

许可证

麻省理工学院授权。在

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java使用无循环和乘法的递归调用查找值   java字符串大小冲突   在一组Java文件对象中查找唯一的超级目录   没有Eclipse控制台输出窗口(Java)?   java这怎么等于105而不是15?   java Adempiere列调出,用于不处理从(代码)选项导入和创建行的字段   java tomcat、2个webapps、2个log4js,但这两个应用都记录到一个文件中   lambda理解Java谓词   HotspotFX上的Java EOF问题   java google应用程序引擎:如何向连接/断开通道“ping”添加信息?   java如何使用VTDXML获取一个元素的所有名称空间声明?   java如何使用drawLine()获得一条线以随机方向拍摄?   java transactionManager应该使用哪个SessionFactory?   java在安卓上播放声音   在Mac上使用JBDC对SQL Server进行java Windows身份验证   java基本列表和字符串[]   java NamedParameterJdbcTemplate从中选择*   扩展上的java Android可扩展列表视图   使用ApacheAxis2的java SOAP附件