我有一个非常简单的API,它使用django和restframework作为物联网设备的端点。你知道吗
物联网设备——HTTP POST——REST API(django) 验证并保存数据。你知道吗
完全不需要任何渲染或GET/PATCH/DELETE。你知道吗
唯一的问题是我没有保存到数据库,但我想推送到MQTT通道(其他侦听消息的进程将保存/后处理)
由于我根本不是django专家,我的想法是重写Serializersave()
方法,它实际上不保存而是发布。你知道吗
型号
class Meas(models.Model):
SENSOR_TYPES = [
('temperature','temperature'),
('humidity','humidity')
]
sensorType = models.CharField(max_length=100, default='UNKNOWN', choices = SENSOR_TYPES )
sensorId = models.CharField(max_length=100)
homeId = models.CharField(max_length=100)
roomId = models.CharField(max_length=150)
hubId = models.CharField(max_length=100)
value = models.CharField(max_length=100)
last_seen = models.DateTimeField()
elapsed = models.IntegerField()
objects = MeasManager()
序列化程序
class MeasMQTTSerializer(serializers.ModelSerializer):
client = RMQPublisher(ch_name=rmq_chname,routing_key=rmq_routingkey,host=rmq_host,user=rmq_user,password=rmq_pass,port=rmq_port)
class Meta:
model = Meas
fields = '__all__'
def save(self):
logging.debug("Saving measurement")
measurement = self.validated_data['sensorType']
mytags = {
'sensorId' : self.validated_data['sensorId'],
'roomId' : self.validated_data['roomId'],
'hubId' : self.validated_data['hubId'],
'homeId' : self.validated_data['homeId'],
'elapsed' : self.validated_data['elapsed'],
'last_seen' : self.validated_data['last_seen']
}
for a, x in mytags.items():
mytags[a]=str(x)
value = float(self.validated_data['value'])
rmq_msg = mytags
rmq_msg['value']=value
rmq_msg['measurement'] = measurement
MeasMQTTSerializer.client.pushObject(rmq_msg)
logging.debug("Pushed to RMQ")
RMQPublisher只使用简单的push-to-mqtt:
class RMQPublisher(object):
def __init__(self,ch_name,routing_key,host,user,password,port):
self.routing_key = routing_key
self.ch_name = ch_name
self.rmq_host = host
self.rmq_user = user
self.rmq_pass = password
self.rmq_port = port
pass
def pushObject(self,object):
self.connect()
self.sendMessage(object)
self.disconnect()
def connect(self):
credentials = pika.PlainCredentials(self.rmq_user, self.rmq_pass)
parameters = pika.ConnectionParameters(self.rmq_host,self.rmq_port,'/',credentials)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange=self.ch_name, exchange_type='direct')
def sendMessage(self,pushObj):
message = {}
message["content"] = pushObj
self.channel.basic_publish(exchange=self.ch_name,
routing_key=self.routing_key,
body=json.dumps(message))
def disconnect(self):
self.connection.close()
这种方法似乎只起到了某些测量和某些测量的一半作用 很多测量结果都是推送的,但是对于一些我得到了一个错误,我真的找不到根本原因。你知道吗
我猜,随着消息数量的增加,序列化程序的生命周期出现了一些问题,我可能无法正确理解
safeh-api | body=json.dumps(message))
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2248, in basic_publish
safeh-api | self._flush_output()
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 1336, in _flush_output
safeh-api | self._connection._flush_output(lambda: self.is_closed, *waiters)
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 513, in _flush_output
safeh-api | self._impl.ioloop.poll()
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 576, in poll
safeh-api | self._poller.poll()
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 1200, in poll
safeh-api | self._dispatch_fd_events(fd_event_map)
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 904, in _dispatch_fd_events
safeh-api | handler(fileno, events)
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/adapters/utils/selector_ioloop_adapter.py", line 391, in _on_reader_writer_fd_events
safeh-api | callbacks.writer()
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/diagnostic_utils.py", line 53, in log_exception_func_wrap
safeh-api | return func(*args, **kwargs)
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/adapters/utils/io_services_utils.py", line 1108, in _on_socket_writable
safeh-api | self._initiate_abort(error)
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/diagnostic_utils.py", line 53, in log_exception_func_wrap
safeh-api | return func(*args, **kwargs)
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/adapters/utils/io_services_utils.py", line 911, in _initiate_abort
safeh-api | 'non-_STATE_COMPLETED', self._state)
safeh-api | AssertionError: ('_AsyncTransportBase._initate_abort() expected non-_STATE_COMPLETED', 4)
任何更好的设计或修复现有的提示将不胜感激! 谢谢您!你知道吗
我知道这听起来像是一个偏执的解决方案,但我最终将这个简单的API重写为restplus应用程序的3个文件。 MQTT的同一个模块也是一个魅力,所以可能与我对django对象生命周期的误解有关。你知道吗
相关问题 更多 >
编程相关推荐