从卡夫卡获取数据,处理后发送到ElasticSearch

kfk2es的Python项目详细描述


kafka to elasticsearch

从kafka读取数据,并处理后写入elastcsearch,只支持python3

使用方法

编写自己的处理程序

example.py

fromkfk2esimportStreamProcessdefyour_own_handler(event):"""    event的内容就是从kafka中获取的每条数据    """print(event)#return ('esindex-%Y%m%d', '_log', event)#return (None, '_log', event)returnevent# 返回的结果会发到es中,如果没有返回,则不发if__name__=='__main__':pipe=StreamProcess(configfile='config.yml')# from myconf import Conf# conf = Conf('config.yml')# pipe = StreamProcess(configfile=conf)pipe.handler=your_own_handlerpipe.run()

StreamProcess的configfile参数可以是一个配置文件路径,也可以是包含必要参数的字典。

如果不添加任何处理程序,那么从kafka读取的数据会直接尝试往elasticsearch发送

自己添加处理程序,只需要编写函数,并按照如下格式进行返回:

  • 返回一个字典, 那么字典中的数据会被传到es中
  • 返回一个3个元素的tuple, 那么第一个元素是es的索引,第二个元素则是es的类型,第三个元素为需要导入es的数据。 通过数据直接返回的索引和类型要优先于配置文件中的配置。如果只希望动态设置其中一个,另一个使用默认,那么不需要定义的设置为None即可
  • 如果不需要传输到es中,只要不返回任何数据就可以了。这样可以只测试kafka的连通性

编辑配置文件

配置文件可以是json或者yaml格式,以json,yaml或者yml结尾

可以使用多个kafka作为输入,elasticsearch只有一个输出

yaml格式例子

config.yml

kafka:-topic:topbootstrap_servers:-server1:9092-server2:9092-server3:9092group_id:group_idsasl_plain_username:usersasl_plain_password:passwordelasticsearch:params:#parms for elasticsearchhosts:-http://user:pass@10.1.0.1:9200index:test-%Y%m%dtype:log             # optional(default:"log")cache_size:150       # optional(default:150)timeout:1            # optional(default:1)

需要注意的是elasticsearch选项中的parms选项包含了elasticsearch库中Elasticsearch的参数. 而其他参数是StreamProcess需要的

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
将Java中的对象列表序列化为json   在同一Play Framework项目中同时使用Ebean和JPA的java   oop如何在整个Java项目中共享API密钥   java ADT Eclipse SDK故障未找到DDM   扫描程序中变量的递归Java输入值未应用于方法中的变量   java将空格转换为命令行类型的空格   JavaSpring,如何决定客户机应该使用哪个服务?   java致命异常:main(Android标准)   java编译错误是因为类型检查还是三元运算符?   java Sikuli+Webdriver:getting error x.png看起来像一个文件,但在磁盘上找不到。假设是文本   java选择位置。。。。ms sql 2005中需要电气状态   由于OSGi捆绑包依赖性问题,java无法启动RCP应用程序   json java gson fromjson返回非泛型的null   如何将google api访问令牌从java服务器传递到gapi javascript客户端?   java在已排序的LinkedList中添加元素   java ForkJoinPool为什么程序抛出OutOfMemoryError?   java SQUARE无法解析为Processing/Eclipse中的变量   java如何为Sun App Server 8.2设置JNDI