线程问题及何时停止它们

0 投票
1 回答
31 浏览
提问于 2025-04-13 02:37
import time
import threading
import qi
import balldetection
import common
import cv2
import vision_definitions
import math 

lock = threading.Lock()

ball_detected = False
ip = "10.42.0.98"
port = 9559
session = qi.Session()
session.connect("tcp://" + ip + ":" + str(port))
motion_service = session.service("ALMotion")
posture_service = session.service("ALRobotPosture")
video_service = session.service("ALVideoDevice")
speak_service = session.service("ALTextToSpeech")

def detect_ball_and_act():
    global ball_detected
    print("In detect_ball_and_act")
    videoClient = video_service.subscribeCamera("nao_opencv", 0, vision_definitions.kQVGA, vision_definitions.kBGRColorSpace, 5)
while not ball_detected:
    print("Not ball detected")
    try:
        print("In try")
        frame = common.readImage(video_service, videoClient)
        if frame is not None:
            print("in none")
            hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
            img, (ret, x, y, radius, distance) = balldetection.detect_ball(hsv)
            if ret:
                print("Ball in sight at ({}, {}) with radius {} and distance {}".format(x, y, radius, distance))
                print("Performing action...")
                speak_service.say("Ball is in sight!")
                with lock:
                    ball_detected = True
            else:
                time.sleep(1.0)
    except Exception as e:
        print("An error occurred:", e)

def keep_walking_and_looking():
    while not ball_detected:
        print("Walking and looking around...")
        walk_and_look()

 def walk_and_look():
    print("In walk_and_look")
    motion_service.moveToward(0.5, 0, 0)
    time.sleep(3.0)
    motion_service.moveToward(0, 0, 0)
    time.sleep(1.0)
    motion_service.angleInterpolationWithSpeed(["HeadYaw", "HeadPitch"],     [0.5, 1.0], 0.2)
    time.sleep(1.0)
    motion_service.angleInterpolationWithSpeed(["HeadYaw", "HeadPitch"], [-0.5, 1.0], 0.2)
    time.sleep(1.0)
    motion_service.angleInterpolationWithSpeed(["HeadYaw", "HeadPitch"], [0.0, 0.0], 0.2)

walking_thread = threading.Thread(target=keep_walking_and_looking)
ball_detection_thread = threading.Thread(target=detect_ball_and_act)

motion_service.setStiffnesses("Body", 1.0)
posture_service.goToPosture("StandInit", 1.0)

walking_thread.start()
ball_detection_thread.start()

ball_detection_thread.join()
walking_thread.join()

这段代码能实现我想要的功能,但我想知道有没有更好的处理方法。这是针对一个Nao机器人,现在他的任务是在房间里走动并寻找一个球。目前他确实在找球并在房间里走,但当他看到球后,他还在继续走,这点我想要停止。我知道这是因为线程还在运行,等他走完后才会停下来,但我希望他在看到球后能完全停下来,因为接下来的步骤是让他走到球那里。这应该像一个状态机一样运作。

1 个回答

0

naoqi API的设计是为了简化多线程操作。简单来说,如果你想让机器人移动或者转头,可以用一个叫motion_service.post.angleInterpolationWithSpeed的命令。这样,机器人的头会动,但你的程序不会被这个动作卡住,你可以同时做其他事情。

不过,如果你想让机器人在看到球的时候突然停下来,可以用一些命令来停止,比如motion_service.stopWalk或者motion_service.stopMove。同时也要注意post方法返回的ID。

idToKillTaskLater = motion_service.post.angleInterpolationWithSpeed

别忘了查看一下NAOqi的文档,还有里面的一些示例,这些都能帮助你更好地理解...

撰写回答