IBM流事件流集成
streamsx.eventstreams的Python项目详细描述
概述
提供将事件流中的消息作为流读取的函数。 并将元组作为消息发布到事件流。
IBM® Event Streams是一个完全托管的、基于云的消息服务。ibm事件流构建于apache kafka之上,是一个高吞吐量、容错、事件管理平台,可帮助您构建智能、响应迅速、事件驱动的应用程序。
样品
一个简单的hello world示例,说明一个streams应用程序发布到一个主题,而同一个应用程序使用同一个主题:
from streamsx.topology.topology import Topology from streamsx.topology.schema import CommonSchema from streamsx.topology.context import submit, ContextTypes import streamsx.eventstreams as eventstreams import time def delay(v): time.sleep(5.0) return True topology = Topology('EventStreamsHelloWorld') to_evstr = topology.source(['Hello', 'World!']) to_evstr = to_evstr.as_string() # delay tuple by tuple to_evstr = to_evstr.filter(delay) # Publish a stream to Event Streams using HELLO topic eventstreams.publish(to_evstr, topic='HELLO') # Subscribe to same topic as a stream from_evstr = eventstreams.subscribe(topology, schema=CommonSchema.String, topic='HELLO') # You'll find the Hello World! in stdout log file: from_evstr.print() # finally submit the topology to a Streaming Analytics Service instance submit(ContextTypes.STREAMING_ANALYTICS_SERVICE, topology)