IBM Streams Kafka集成
streamsx.kafka的Python项目详细描述
概述
提供将来自Kafka代理的消息作为流读取的函数 并将元组作为消息提交给kafka代理。
代理配置必须使用应用程序配置中的属性或通过 使用字典变量。 最小属性集必须包含有效的bootstrap.servers配置 对于消费者和生产者,即对于subscribe和publish函数。
也可以使用不同的应用程序配置来订阅和发布 当必须使用特殊的使用者或生产者配置时。
样品
streams应用程序发布到 一个主题和同一个应用程序使用同一主题:
from streamsx.topology.topology import Topology from streamsx.topology.schema import CommonSchema from streamsx.topology.context import submit, ContextTypes import streamsx.kafka as kafka import time def delay(v): time.sleep(5.0) return True topology = Topology('KafkaHelloWorld') to_kafka = topology.source(['Hello', 'World!']) to_kafka = to_kafka.as_string() # delay tuple by tuple to_kafka = to_kafka.filter(delay) # Publish a stream to Kafka using TEST topic, the Kafka servers # assuming, the broker is running on localhost, port 9092 kafka_props = {} kafka_props['bootstrap.servers'] = 'localhost:9092' kafka.publish(to_kafka, 'TEST', kafka_props) # Subscribe to same topic as a stream from_kafka = kafka.subscribe(topology, 'TEST', kafka_props, CommonSchema.String) # You'll find the Hello World! in stdout log file: from_kafka.print() submit(ContextTypes.DISTRIBUTED, topology) # The Streams job is kept running.