具有“外部”更新属性的PyTango设备

2024-05-13 03:06:36 发布

您现在位置:Python中文网/ 问答频道 /正文

假设您有一个硬件设备,它以或多或少的随机间隔向您发送更新,并且客户机希望每次发生这种情况时都接收事件。您将如何编写PyTango(Python包装器,用于Tango controls libraryPyTango.server.Device类,它使用一个推送新属性值的线程来模拟这一点?在


Tags: 客户机间隔属性硬件serverdevice事件情况
1条回答
网友
1楼 · 发布于 2024-05-13 03:06:36

答案似乎是

  1. 调用PyTango.server.Device方法set_change_event()告诉Tango设备正在处理自己的更改事件推送,而不需要使用轮询循环。在
  2. 使用更新线程中的Pytango.sever.Device方法push_change_event()。似乎线程安全到目前为止,没有看到任何错误,即使是非常高的更新率。如果有人能证实就好了。在
  3. 同时缓存更新,以便轮询属性的客户端获得正确的值和时间戳。这似乎有点多余,有更好的方法吗?在
  4. 不要对属性设置轮询,因为它会导致客户端轮询获得过时的值(Tango似乎以一种我无法完全理解的方式缓存轮询中的属性值)。在

下面是一个使用外部更新的randomnumber属性的示例(python2.7)服务器

import time
import threading
import random

from PyTango.server import server_run
from PyTango.server import Device, DeviceMeta
from PyTango.server import attribute, command

from PyTango import AttrQuality


class ExternallyUpdated(Device):
    __metaclass__ = DeviceMeta

    def __init__(self, *args, **kwargs):
        super(ExternallyUpdated, self).__init__(*args, **kwargs)
        self._randomnumber = (0, 0, AttrQuality.ATTR_VALID)
        # Tell Tango that we don't need a polling loop, we'll 
        # push change events explicitly
        self.set_change_event('randomnumber', True)

    @attribute(label="Random Number", dtype=int,
               # Enables update events for absolute changes >= 1
               abs_change=1)
    def randomnumber(self):
        return self._randomnumber

    def init_device(self):
        super(ExternallyUpdated, self).init_device()
        self.t = threading.Thread(target=self.update_loop)
        self.t.setDaemon(True)
        self.t.start()

    def update_loop(self):
        while True:
            try:
                new_number = random.randint(0, 10000)
                ts = time.time()
                sleeptime = random.random()*10
                print ('Timestamp: {:.5f}    New value: {}    sleeptime: {}'
                        .format(ts, new_number, sleeptime))
                # Need to cache the value so that clients can poll the attribute
                self._randomnumber = (new_number, ts, AttrQuality.ATTR_VALID)
                self.push_change_event(
                    'randomnumber', new_number, ts, AttrQuality.ATTR_VALID)
                time.sleep(sleeptime)
            except Exception:
                logger.exception('Exception in update loop')
                time.sleep(1)

if __name__ == "__main__":
    server_run([ExternallyUpdated])

以及下面的一个客户机示例,假设您将设备导出为so_example/external/1。它应该在每次更新randomnumber时打印一条消息。在

^{pr2}$

相关问题 更多 >