IBM Streams Kafka集成

streamsx.kafka的Python项目详细描述


概述

提供将来自Kafka代理的消息作为流读取的函数 并将元组作为消息提交给kafka代理。

代理配置必须使用应用程序配置中的属性或通过 使用字典变量。 最小属性集必须包含有效的bootstrap.servers配置 对于消费者和生产者,即对于subscribepublish函数。

也可以使用不同的应用程序配置来订阅和发布 当必须使用特殊的使用者或生产者配置时。

样品

streams应用程序发布到 一个主题和同一个应用程序使用同一主题:

from streamsx.topology.topology import Topology
from streamsx.topology.schema import CommonSchema
from streamsx.topology.context import submit, ContextTypes
import streamsx.kafka as kafka
import time

def delay(v):
    time.sleep(5.0)
    return True

topology = Topology('KafkaHelloWorld')

to_kafka = topology.source(['Hello', 'World!'])
to_kafka = to_kafka.as_string()
# delay tuple by tuple
to_kafka = to_kafka.filter(delay)

# Publish a stream to Kafka using TEST topic, the Kafka servers
# assuming, the broker is running on localhost, port 9092
kafka_props = {}
kafka_props['bootstrap.servers'] = 'localhost:9092'
kafka.publish(to_kafka, 'TEST', kafka_props)

# Subscribe to same topic as a stream
from_kafka = kafka.subscribe(topology, 'TEST', kafka_props, CommonSchema.String)

# You'll find the Hello World! in stdout log file:
from_kafka.print()

submit(ContextTypes.DISTRIBUTED, topology)
# The Streams job is kept running.

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
当您有许多具有不同密钥值的位置时,java Enterprise是加密环境变量的正确方法   java如何使用视图保持器模式制作自定义适配器?   java如何迭代Camel体中的嵌套列表?   序列化用base 64进行Java序列化   java打开文件的最佳方式(并确保选择了文件)   java marvin图像色差插件错误   java如何在eclipse中添加属性文件文件夹   比较java。util。日历日期到java。util。日期   java无法在下一个类(活动)中获取哈希表   java如何将这段代码转换为循环?   java查找通过REST失败   java getIntent返回null   在Java中,如何通过外部集合从内部集合检索数据?   java单点登录以保护REST API和内部基于web的系统