IBM流事件流集成

streamsx.eventstreams的Python项目详细描述


概述

提供将事件流中的消息作为流读取的函数。 并将元组作为消息发布到事件流。

IBM® Event Streams是一个完全托管的、基于云的消息服务。ibm事件流构建于apache kafka之上,是一个高吞吐量、容错、事件管理平台,可帮助您构建智能、响应迅速、事件驱动的应用程序。

样品

一个简单的hello world示例,说明一个streams应用程序发布到一个主题,而同一个应用程序使用同一个主题:

from streamsx.topology.topology import Topology
from streamsx.topology.schema import CommonSchema
from streamsx.topology.context import submit, ContextTypes
import streamsx.eventstreams as eventstreams
import time

def delay(v):
    time.sleep(5.0)
    return True

topology = Topology('EventStreamsHelloWorld')

to_evstr = topology.source(['Hello', 'World!'])
to_evstr = to_evstr.as_string()
# delay tuple by tuple
to_evstr = to_evstr.filter(delay)

# Publish a stream to Event Streams using HELLO topic
eventstreams.publish(to_evstr, topic='HELLO')

# Subscribe to same topic as a stream
from_evstr = eventstreams.subscribe(topology, schema=CommonSchema.String, topic='HELLO')

# You'll find the Hello World! in stdout log file:
from_evstr.print()

# finally submit the topology to a Streaming  Analytics Service instance
submit(ContextTypes.STREAMING_ANALYTICS_SERVICE, topology)

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
具有动态标题的java Spring批处理ItemWriter   确定一个数是否为斐波那契数的java   java游标。moveToFirst永远不会返回true   java验证可以与控制器中增强的参数一起使用吗?   java可以使用scanner类读取所有文件的名称。特定文件夹中的txt文件?   java如何告诉hibernate生成带有“on update/delete restrict”的外键约束   逐步为java小程序签名   JavaSpring云函数/将请求传播到函数   eclipse运行Java6项目的ANT版本是什么?   java使用sparksql和Spark流   java无法将整数转换为双精度整数   jpa中的java更新错误   java我想向数组中添加列表视图新项(旧项除外)