GeoLib输入数据

geolibs-glutemulo的Python项目详细描述


臀肌

ha geo社会演示数据摄取器

用法

阅读deexamples files

我们使用环境变量。完整列表见Environ vars file example, 和例子。

使用producer将数据上传到kafka

参见下面的python示例。必须使用列mame:value生成dict

使用摄取器消费者

使用glutodocker并填充环境变量。

使用GLUTEMULO_BACKEND和它的特定变量(数据库、主机等)选择后端。 您可以选择2个后端:postgrescarto 完整列表见Environ vars file example

然后设置:

  1. GLUTEMULO_INGESTOR_DATASET
    要上载数据的表
  2. GLUTEMULO_INGESTOR_DATASET_COLUMNS
    逗号分隔的列名列表

现在,在后端创建表或设置GLUTEMULO_INGESTOR_DATASET_DDLGLUTEMULO_INGESTOR_DATASET_AUTOCREATE=False

然后为kafka配置摄取器。 首先阅读python-kafka doc 然后使用以下变量:

  1. GLUTEMULO_INGESTOR_TOPIC
    要使用的主题
  2. GLUTEMULO_INGESTOR_BOOTSTRAP_SERVERS
    要连接的服务器列表
  3. GLUTEMULO_INGESTOR_GROUP_ID
    组ID.
  4. GLUTEMULO_INGESTOR_AUTO_OFFSET_RESET
    最新的或最早的。
  5. GLUTEMULO_INGESTOR_MAX_POLL_RECORDS
    一批消息
  6. 中返回的最大记录数
  7. GLUTEMULO_INGESTOR_FETCH_MIN_BYTES
    服务器应为获取请求返回的最小数据量,否则请等待获取最大值等待毫秒,以便累积更多数据。默认值:1

对于docker,我们包含一个example docker-compose file。 请记住,您可以使用相同的组id进行缩放

docker-compose scale gluto=3

运行烧瓶演示

$ FLASK_ENV=development flask run
 * Environment: development
 * Debug mode: on
 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 194-409-049

测试

$ http -j POST localhost:5000/v1/ uno=1dos=2`
HTTP/1.0 201 CREATED
Content-Length: 13
Content-Type: text/html;charset=utf-8
Date: Thu, 02 May 201914:56:07 GMT
Server: Werkzeug/0.15.2 Python/3.7.2

DATA Received

生产者/消费者

卡夫卡+json

异步生产者:

fromglutemulo.kafka.producerimportJsonKafkaproductor=JsonKafka(bootstrap_servers="localhost:9092")future=productor.produce('simple-topic',dict(dos='BB'))

成批消费:

fromglutemulo.kafka.consumerimportJsonKafkaconsumer=JsonKafka('simple-topic',bootstrap_servers="localhost:9092")formsginconsumer.consume():formsginmessages:print(msg)

卡夫卡+阿夫罗

同步制作人:

SCHEMA={"type":"record","name":"simpledata","doc":"This is a sample Avro schema to get you started.","fields":[{"name":"name","type":"string"},{"name":"number1","type":"int"},],}SCHEMA_ID=1
fromglutemulo.kafka.producerimportAvroKafkaasProducerproductor=Producer(SCHEMA,SCHEMA_ID,bootstrap_servers="localhost:9092")future=productor.produce('simple-topic-avro',dict(name='Un nombre',number1=10))

消费者:

fromglutemulo.kafka.consumerimportAvroKafkaasConsumerconsumer=Consumer('simple-topic-avro',SCHEMA,SCHEMA_ID,bootstrap_servers="localhost:9092")formessagesinconsumer.consume():formsginmessages:print(msg)

用于测试

可以使用kafka附带的kafka控制台使用者脚本设置kafka使用者。

$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.240:9092 --topic pylog --from-beginning

this is an awsome log

用卡夫卡测试

您可以使用名为kafkacat的应用程序。

安装应用程序后,我们将以使用者模式(这是默认模式)运行它。

kafkacat -b 192.168.240.41:9092 -t one-test

这应该还没有显示任何东西,因为我们还没有发送任何东西到我们的主题…

要发送内容,我们可以将任何文本文件复制到当前目录并发送到我们的卡夫卡主题。在另一个窗口中,运行以下命令。

$ cat README | kafkacat -b 192.168.240.41 -t one-test

您应该在第一个窗口中看到kafkacat仍在consumer模式下运行的输出。

链接

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java我能在这个程序中更好地使用内存吗?   为什么我的Java while循环迭代了一半   java IntelliJ IDEA不在构建时复制资源   socket仅在Java TCP服务器输出上检查客户端断开连接   java游戏物理摩擦   java片段onClick调用方法   symja数学分析器中无法识别java Abs[x]   java在使用泛型时创建二进制搜索树类的实例?   java在外键约束表上的添加和删除   语法java表达式的含义,如果有条件   java创建内联对象并作为参数传递   是否有相当于Redis排序集(zset)的Java数据结构   java找不到适合的方法(无参数)   音频文件操作给定字节帧的音量Java   Eclipse4不以JavaWebStart启动   java如何使用org在JSON对象中获取嵌套的键元素。json?   java与Jackson的反序列化:“org.codehaus.Jackson.map.JsonMappingException:无法反序列化[projectname]的实例。”   字符串的Java正则表达式   spring集成上的java检测缺火指令