在Django Serializer save()方法中发布到MQTT

2024-04-18 21:26:15 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个非常简单的API,它使用django和restframework作为物联网设备的端点。你知道吗

物联网设备——HTTP POST——REST API(django) 验证并保存数据。你知道吗

完全不需要任何渲染或GET/PATCH/DELETE。你知道吗

唯一的问题是我没有保存到数据库,但我想推送到MQTT通道(其他侦听消息的进程将保存/后处理)

由于我根本不是django专家,我的想法是重写Serializersave()方法,它实际上不保存而是发布。你知道吗

型号

class Meas(models.Model):
    SENSOR_TYPES = [
        ('temperature','temperature'),
        ('humidity','humidity')
        ]
    sensorType = models.CharField(max_length=100, default='UNKNOWN', choices = SENSOR_TYPES )
    sensorId = models.CharField(max_length=100)
    homeId = models.CharField(max_length=100)
    roomId = models.CharField(max_length=150)
    hubId = models.CharField(max_length=100)
    value = models.CharField(max_length=100)
    last_seen = models.DateTimeField()
    elapsed = models.IntegerField()

    objects = MeasManager()


序列化程序



class MeasMQTTSerializer(serializers.ModelSerializer):


    client = RMQPublisher(ch_name=rmq_chname,routing_key=rmq_routingkey,host=rmq_host,user=rmq_user,password=rmq_pass,port=rmq_port)
    class Meta:
        model = Meas
        fields = '__all__'     
    def save(self):

        logging.debug("Saving measurement")

        measurement = self.validated_data['sensorType']
        mytags = { 
            'sensorId' : self.validated_data['sensorId'],
            'roomId' : self.validated_data['roomId'],
            'hubId' : self.validated_data['hubId'],
            'homeId' : self.validated_data['homeId'],
            'elapsed' : self.validated_data['elapsed'],
            'last_seen' : self.validated_data['last_seen']
        }
        for a, x in mytags.items():
            mytags[a]=str(x)
        value = float(self.validated_data['value'])

        rmq_msg = mytags
        rmq_msg['value']=value
        rmq_msg['measurement'] = measurement
        MeasMQTTSerializer.client.pushObject(rmq_msg)
        logging.debug("Pushed to RMQ")  

RMQPublisher只使用简单的push-to-mqtt:

class RMQPublisher(object):



    def __init__(self,ch_name,routing_key,host,user,password,port):
        self.routing_key = routing_key
        self.ch_name     = ch_name
        self.rmq_host    = host
        self.rmq_user    = user
        self.rmq_pass    = password
        self.rmq_port    = port
        pass


    def pushObject(self,object):
        self.connect()
        self.sendMessage(object)
        self.disconnect()


    def connect(self):
        credentials = pika.PlainCredentials(self.rmq_user, self.rmq_pass)
        parameters = pika.ConnectionParameters(self.rmq_host,self.rmq_port,'/',credentials)
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()
        self.channel.exchange_declare(exchange=self.ch_name, exchange_type='direct')

    def sendMessage(self,pushObj):   
        message = {}
        message["content"]  = pushObj

        self.channel.basic_publish(exchange=self.ch_name,
                            routing_key=self.routing_key,
                            body=json.dumps(message))

    def disconnect(self):
        self.connection.close()


这种方法似乎只起到了某些测量和某些测量的一半作用 很多测量结果都是推送的,但是对于一些我得到了一个错误,我真的找不到根本原因。你知道吗

我猜,随着消息数量的增加,序列化程序的生命周期出现了一些问题,我可能无法正确理解

safeh-api      |     body=json.dumps(message))
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2248, in basic_publish
safeh-api      |     self._flush_output()
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 1336, in _flush_output
safeh-api      |     self._connection._flush_output(lambda: self.is_closed, *waiters)
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 513, in _flush_output
safeh-api      |     self._impl.ioloop.poll()
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 576, in poll
safeh-api      |     self._poller.poll()
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 1200, in poll
safeh-api      |     self._dispatch_fd_events(fd_event_map)
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 904, in _dispatch_fd_events
safeh-api      |     handler(fileno, events)
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/adapters/utils/selector_ioloop_adapter.py", line 391, in _on_reader_writer_fd_events
safeh-api      |     callbacks.writer()
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/diagnostic_utils.py", line 53, in log_exception_func_wrap
safeh-api      |     return func(*args, **kwargs)
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/adapters/utils/io_services_utils.py", line 1108, in _on_socket_writable
safeh-api      |     self._initiate_abort(error)
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/diagnostic_utils.py", line 53, in log_exception_func_wrap
safeh-api      |     return func(*args, **kwargs)
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/adapters/utils/io_services_utils.py", line 911, in _initiate_abort
safeh-api      |     'non-_STATE_COMPLETED', self._state)
safeh-api      | AssertionError: ('_AsyncTransportBase._initate_abort() expected non-_STATE_COMPLETED', 4)

任何更好的设计或修复现有的提示将不胜感激! 谢谢您!你知道吗


Tags: inpyselfapilibpackagesusrlocal
1条回答
网友
1楼 · 发布于 2024-04-18 21:26:15

我知道这听起来像是一个偏执的解决方案,但我最终将这个简单的API重写为restplus应用程序的3个文件。 MQTT的同一个模块也是一个魅力,所以可能与我对django对象生命周期的误解有关。你知道吗

相关问题 更多 >