IBM Streams Kafka集成

streamsx.kafka的Python项目详细描述


概述

提供将来自Kafka代理的消息作为流读取的函数 并将元组作为消息提交给kafka代理。

代理配置必须使用应用程序配置中的属性或通过 使用字典变量。 最小属性集必须包含有效的bootstrap.servers配置 对于消费者和生产者,即对于subscribepublish函数。

也可以使用不同的应用程序配置来订阅和发布 当必须使用特殊的使用者或生产者配置时。

样品

streams应用程序发布到 一个主题和同一个应用程序使用同一主题:

from streamsx.topology.topology import Topology
from streamsx.topology.schema import CommonSchema
from streamsx.topology.context import submit, ContextTypes
import streamsx.kafka as kafka
import time

def delay(v):
    time.sleep(5.0)
    return True

topology = Topology('KafkaHelloWorld')

to_kafka = topology.source(['Hello', 'World!'])
to_kafka = to_kafka.as_string()
# delay tuple by tuple
to_kafka = to_kafka.filter(delay)

# Publish a stream to Kafka using TEST topic, the Kafka servers
# assuming, the broker is running on localhost, port 9092
kafka_props = {}
kafka_props['bootstrap.servers'] = 'localhost:9092'
kafka.publish(to_kafka, 'TEST', kafka_props)

# Subscribe to same topic as a stream
from_kafka = kafka.subscribe(topology, 'TEST', kafka_props, CommonSchema.String)

# You'll find the Hello World! in stdout log file:
from_kafka.print()

submit(ContextTypes.DISTRIBUTED, topology)
# The Streams job is kept running.

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java我的计时器(scheduleAtFixedRate)不循环   如何在Java中实现过滤迭代器?   java如何在不从本地xml、csv文件访问API的情况下将变量发布到php站点?   SuiteClasses语法的Java JUnit类数组   java从URLConnection读取二进制文件   java在Android Studio中发送加密文本时失败   Android:最近最少使用(LRU)算法在java中的实现?   java Selenium WebDriver无法打开Firefox配置文件   java如何处理带有嵌套抽象类的GSON?   java类型通知的方法SetLateStevenInfo(GcmMessageHandler,String,String,PendingContent)未定义   java Apple或Mac Mail会打开所有附件图像,即使它们已嵌入   java如何解析下面的xml代码?   java如何创建特定于API级别的UI(针对平板电脑和Android旧版本的不同UI,针对同一应用)?   servlet的通配符路径?