如何同时运行两个函数
我正在进行测试,但我想同时运行两个功能。我有一个摄像头,我通过suds让它移动,然后我通过SSH登录到摄像头,检查摄像头的速度。当我检查速度时,摄像头已经停止了,所以没有速度可供查看。有没有办法让我这两个功能同时运行,以便测试摄像头的速度?下面是示例代码:
class VerifyPan(TestAbsoluteMove):
def runTest(self):
self.dest.PanTilt._x=350
# Runs soap move command
threading.Thread(target = SudsMove).start()
self.command = './ptzpanposition -c 0 -u degx10'
# Logs into camera and checks speed
TestAbsoluteMove.Ssh(self)
# Position of the camera verified through Ssh (No decimal point added to the Ssh value)
self.assertEqual(self.Value, '3500')
我现在尝试了下面提到的线程模块。但是线程并没有和函数TestAbsoluteMove.Ssh()同步运行。我还需要其他代码来让它工作吗?
我考虑过在线程语句中加入参数,说明线程在Ssh()函数运行时启动。有没有人知道我该在这个语句中输入什么?
抱歉如果我没有解释清楚。'SudsMove'函数是让摄像头移动,而'Ssh'函数是登录到摄像头并检查摄像头当前的移动速度。问题是,当'Ssh'函数登录时,摄像头已经停止了。我需要这两个功能并行运行,这样我才能在摄像头移动时检查速度。
谢谢
4 个回答
一次只能有一个线程在运行。这方面的问题已经有很多详细的解答,大家可以在这里找到。一个解决办法是使用两个独立的进程。上面的回答里也提供了一些建议。
首先,你需要导入 threading
模块,然后像下面这样运行 SudsMove()
:
threading.Thread(target = SudsMove).start()
这样做会创建并启动一个后台线程,负责执行移动操作。
对编辑后问题的回答:
根据我的理解,TestAbsoluteMove.Ssh(self)
是一次性获取速度,并把结果存储在 self.Value
里,对吧?!然后你用 self.assertEqual(self.Value, '3500')
来测试预期的最终倾斜/旋转/位置?!
如果我理解没错,你应该等相机开始移动后再进行测试。你可以在一定的时间间隔内获取速度:
# Move camera in background thread
threading.Thread(target = SudsMove).start()
# What does this do?
self.command = './ptzpanposition -c 0 -u degx10'
# Poll the current speed in an interval of 250 ms
import time
measuredSpeedsList = []
for i in xrange(20):
# Assuming that this call will put the result in self.Value
TestAbsoluteMove.Ssh(self)
measuredSpeedsList.append(self.Value)
time.sleep(0.25)
print "Measured movement speeds: ", measuredSpeedsList
移动速度会是 measuredSpeedsList
中的最大值(也就是 max(measuredSpeedsList)
)。希望这样解释能让你明白……
如果你想使用常见的Python版本(CPython),你可以使用multiprocessing模块。这个模块非常强大,可以让你在不同的子进程中运行任务,比如你可以把一些不能被序列化的参数传给子进程,或者可以终止正在运行的任务等等。它的使用方式和线程很相似,而且不会受到全局解释器锁的限制。
不过,这里有个缺点,就是创建子进程需要比创建线程更多的时间;这在你有很多短小的任务时可能会成为问题。此外,由于进程之间需要通过序列化来传递数据,所以如果数据量很大,传递这些数据会耗费很多时间,而且每个进程都会有一份数据的副本,这样会占用更多的内存。在每个任务执行时间比较长,而且进出数据量不大的情况下,使用multiprocessing模块会非常合适。