各种总线/用户类型的中继/发送器库
mula的Python项目详细描述
这个库的主要目的是建立一个公共框架,用于将消息从一个总线中继到另一个总线,或中继到某个消息使用者(例如carbon/graphitedb)。它目前包括对mqtt、amqp的支持,以及将空闲通道用作穷人的消息总线的支持。
基本架构
您几乎可以按您所需的任何方式配置此库。每个发送方或中继类都需要一个配置dict,其中包含用于连接的相关详细信息。中继也需要sender对象,它允许您将任何接收器类型(在中继中实现)与任何类型的发送器配对。由于每个类只需要一个dict进行配置,因此可以使用不同的配置dict或相同较大配置的子部分将同一协议的两个总线桥接在一起。
示例:将mqtt中继到碳(石墨化b)
我们可以将mqtt中继类与carbon sender类一起用于在公共mqtt服务器和graphitedb服务器之间进行中继,而无需将graphitedb服务器公开到Internet。首先,配置,我们可以方便地将其存储在yaml文件中:
carbon: host: my.graphitedb.home.net port: 2023 mqtt: host: m16.cloudmqtt.com port: 20996 user: <some-user> password: <some-password>
然后,在我们的代码中,我们为carbon启动一个发送器,并为mqtt启动一个继电器,发送器作为参数:
from mulay.carbon import PlaintextSender from mulay.mqtt import Relay sender = PlaintextSender(config['carbon']) relay = Relay(config['mqtt'], sender) try: sender.start() # Relay will loop forever relay.start() except KeyboardInterrupt: relay.stop() sender.stop()
示例:将mqtt中继到碳(石墨化b)
或者,我们可以选择使用amqp服务,方法是将amqp中继类指定为carbon sender类。同样,这不会将GraphitedB服务器公开到Internet。首先,配置,我们可以方便地将其存储在yaml文件中:
carbon: host: my.graphitedb.home.net port: 2023 amqp: url: amqp://<amqp-user>:<amqp-password>@wombat.rmq.cloudamqp.com/<amqp-instance> queue: my-metrics
然后,在我们的代码中,我们为carbon启动一个发送器,并为mqtt启动一个继电器,发送器作为参数:
from mulay.carbon import PlaintextSender from mulay.amqp import Relay sender = PlaintextSender(config['carbon']) relay = Relay(config['amqp'], sender) try: sender.start() # Relay will loop forever relay.start() except KeyboardInterrupt: relay.stop() sender.stop()
示例:继电器松弛到碳(石墨化B)
我们甚至可以用与mqtt或amqp几乎相同的方式,在carbon sender类中使用slack。首先,配置,我们可以方便地将其存储在yaml文件中:
carbon: host: my.graphitedb.home.net port: 2023 slack: token: <your-token-here> channel: my-metrics
然后,在我们的代码中,我们启动一个碳发送器和一个slack继电器,发送器作为参数:
from mulay.carbon import PlaintextSender from mulay.slack import Relay sender = PlaintextSender(config['carbon']) relay = Relay(config['slack'], sender) try: sender.start() # Relay will loop forever relay.start() except KeyboardInterrupt: relay.stop() sender.stop()
示例:发布到mqtt
作为一种方便,为了使中继功能真正灵活,Muraye提供了一个发送器类,具有所有的协议实现。这是为了允许从任何协议到库支持的任何其他协议的桥接,但对于将一些最终将路由到内部使用者服务器(如graphitedb)的数据发布到公共总线也很有用。这只需要对公共总线进行配置,这使得它比继电器配置更简单:
host: m16.cloudmqtt.com port: 20996 user: <some-user> password: <some-password>
然后,在我们的代码中,我们设置一个循环来进行测量并发布它们:
from mulay.mqtt import Sender import time import speedtest as st sender = Sender(config) sender.start() test = st.Speedtest() test.get_best_server() try: while True: test.download() test.upload() result = test.results.dict() now = int(time.time()) sender.send_raw(f"my.speedtest.download {result['download']} {now}") sender.send_raw(f"my.speedtest.upload {result['upload']} {now}") time.sleep(30) # check this at most every 30 seconds except KeyboardInterrupt: sender.stop()