各种总线/用户类型的中继/发送器库

mula的Python项目详细描述


这个库的主要目的是建立一个公共框架,用于将消息从一个总线中继到另一个总线,或中继到某个消息使用者(例如carbon/graphitedb)。它目前包括对mqtt、amqp的支持,以及将空闲通道用作穷人的消息总线的支持。

基本架构

您几乎可以按您所需的任何方式配置此库。每个发送方或中继类都需要一个配置dict,其中包含用于连接的相关详细信息。中继也需要sender对象,它允许您将任何接收器类型(在中继中实现)与任何类型的发送器配对。由于每个类只需要一个dict进行配置,因此可以使用不同的配置dict或相同较大配置的子部分将同一协议的两个总线桥接在一起。

示例:将mqtt中继到碳(石墨化b)

我们可以将mqtt中继类与carbon sender类一起用于在公共mqtt服务器和graphitedb服务器之间进行中继,而无需将graphitedb服务器公开到Internet。首先,配置,我们可以方便地将其存储在yaml文件中:

carbon:
        host: my.graphitedb.home.net
        port: 2023
mqtt:
        host: m16.cloudmqtt.com
        port: 20996
        user: <some-user>
        password: <some-password>

然后,在我们的代码中,我们为carbon启动一个发送器,并为mqtt启动一个继电器,发送器作为参数:

    from mulay.carbon import PlaintextSender
    from mulay.mqtt import Relay

    sender = PlaintextSender(config['carbon'])
    relay = Relay(config['mqtt'], sender)

try:
            sender.start()

            # Relay will loop forever
            relay.start()
    except KeyboardInterrupt:
            relay.stop()
            sender.stop()

示例:将mqtt中继到碳(石墨化b)

或者,我们可以选择使用amqp服务,方法是将amqp中继类指定为carbon sender类。同样,这不会将GraphitedB服务器公开到Internet。首先,配置,我们可以方便地将其存储在yaml文件中:

carbon:
        host: my.graphitedb.home.net
        port: 2023
amqp:
        url: amqp://<amqp-user>:<amqp-password>@wombat.rmq.cloudamqp.com/<amqp-instance>
        queue: my-metrics

然后,在我们的代码中,我们为carbon启动一个发送器,并为mqtt启动一个继电器,发送器作为参数:

    from mulay.carbon import PlaintextSender
    from mulay.amqp import Relay

    sender = PlaintextSender(config['carbon'])
    relay = Relay(config['amqp'], sender)

try:
            sender.start()

            # Relay will loop forever
            relay.start()
    except KeyboardInterrupt:
            relay.stop()
            sender.stop()

示例:继电器松弛到碳(石墨化B)

我们甚至可以用与mqtt或amqp几乎相同的方式,在carbon sender类中使用slack。首先,配置,我们可以方便地将其存储在yaml文件中:

carbon:
        host: my.graphitedb.home.net
        port: 2023
slack:
        token: <your-token-here>
        channel: my-metrics

然后,在我们的代码中,我们启动一个碳发送器和一个slack继电器,发送器作为参数:

    from mulay.carbon import PlaintextSender
    from mulay.slack import Relay

    sender = PlaintextSender(config['carbon'])
    relay = Relay(config['slack'], sender)

try:
            sender.start()

            # Relay will loop forever
            relay.start()
    except KeyboardInterrupt:
            relay.stop()
            sender.stop()

示例:发布到mqtt

作为一种方便,为了使中继功能真正灵活,Muraye提供了一个发送器类,具有所有的协议实现。这是为了允许从任何协议到库支持的任何其他协议的桥接,但对于将一些最终将路由到内部使用者服务器(如graphitedb)的数据发布到公共总线也很有用。这只需要对公共总线进行配置,这使得它比继电器配置更简单:

host: m16.cloudmqtt.com
port: 20996
user: <some-user>
password: <some-password>

然后,在我们的代码中,我们设置一个循环来进行测量并发布它们:

    from mulay.mqtt import Sender
    import time
    import speedtest as st

    sender = Sender(config)
    sender.start()

test = st.Speedtest()
test.get_best_server()

try:
        while True:
                test.download()
                test.upload()

                result = test.results.dict()

                now = int(time.time())

                sender.send_raw(f"my.speedtest.download {result['download']} {now}")
                sender.send_raw(f"my.speedtest.upload {result['upload']} {now}")

                time.sleep(30) # check this at most every 30 seconds
    except KeyboardInterrupt:
            sender.stop()

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
windows请帮助获取java。运行程序时发生lang.nullpointerexception   JPA标准中的错误:java。lang.IllegalArgumentException:此JPQLquery中没有名为“:inputMsgId_1_”的参数   java调度一个小程序,从ScheduledExecutorService开始   java点击AVD“创建设备”按钮抛出NullPointerException   缓存Java9WebStart会多次加载jar文件   使用java计算最小二乘   当代理关闭时,java kafka生产者不会抛出异常   我们什么时候以及为什么要在java中进行自定义序列化?   java使用GSON解析包含包含所需字符串的对象的对象数组   java如何使用BOBJ REST API实现分页?   java身份验证失败:用户的凭据已过期。CAS v4。2.   合并排序中的java无限循环?   java jackson xml将pojo子元素转换为字符串   tcp Java ObjectOutputStream重置错误   如何调用。bat文件,并使用java中的ProcessBuilder发送字符串   奇怪的Java数学结果   java如何在Android中禁用最近的活动按钮?   java刷下应用程序崩溃   使用Java收集所有VersionOne资产及其所有属性   Java打印:创建具有最小可接受边距的页面格式