Python线程和Arduino通信

2024-04-16 17:21:43 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图创建一个线程,它接受3个变量(float),这些变量在python代码中不断更新(cX, cY, angle),并每隔30ms将其发送到一个arduino uno

我以前从未使用过线程,作为一个初学者,我很难理解它。在

有人给我的想法是创建一个由三个变量组成的元组,并在主python流中创建一个线程,将该元组发送到arduino(PC通过serial发送到arduino)。在

谁能帮帮我,或者至少让我看看如何开始。在


Tags: 代码serialfloat线程arduinouno元组cx
1条回答
网友
1楼 · 发布于 2024-04-16 17:21:43

线程是并发运行代码的一种方式。这意味着您可以运行函数A,但在它完成之前,您可以启动函数B,然后它们继续执行,不一定同时执行(这是并行编程)。就像在一个给定的时间点,如果你停止时间,看看你的代码里发生了什么,你会发现函数a已经执行了一些语句,但是函数B的一些语句也已经执行了。在

在Python线程模块中,每个线程都是一个带有一些可能参数的函数(或方法),当它启动时,函数体将运行。当函数返回时,线程完成运行,线程状态变为not alive(请参见threading.Thread.is_alive())。在

因此,我们通过传递函数/方法引用和可能的参数来创建线程:

from threading import Thread

simple_thread = Thread(target=func_with_no_args)
thread_with_args = Thread(target=my_function, args=(arg1, arg2))

然后我们启动线程,运行函数体。在

^{pr2}$

但当前的代码流仍在继续(当前线程,程序中总是有主线程开始执行它)。所以我们不需要等待函数(func_with_no_argsmy_function)完成。在

如果我们需要检查我们在线程中执行的函数是否已经完成,我们可以检查它是否是活动的:

simple_thread.is_alive()  # return bool

如果我们想等到线程中的函数执行完毕,我们可以join线程。当线程完成时join也很好。在

simple_thread.join()

这对IO任务很有帮助,因为当程序等待数据准备好输入/输出时,它可以做其他事情。在

I am trying to create a thread that takes 3 variables (float) that are ...

我假设你想实现某个目标,你把线程作为一种工具。我的意思是,创建线程不是这里的目标,而是实现您想要的东西的一种方法。 因为问题中缺少应用程序的细节,我无法提供确切的代码/示例,所以我将给出一个抽象/通用的答案,希望您能将这个想法应用到您的应用程序中。在

我认为应用程序正在接收来自某个地方的数据(因此是IO),每隔30毫秒,新数据就应该被发送到ardunio(它的作用又类似于IO)。因此,应用程序有2个IO ARM,一个用于接收数据,另一个用于发送到ardunio。所以使用线程可能是合理的。在

我们可以有两个功能,一个读取数据,一个更新ardunio。我们在两个线程中运行它们,thread_read(读取数据)和{}(更新ardunio)。在

如果是这样,我们需要一种在线程之间共享信息的方法。线程使使用内存变得很容易(在进程内存中,可以通过变量访问),因此我们可以使用两个函数都可以访问的变量。当变量被1个线程更新时,另一个线程也将看到更新的结果。在

storage = None

def read_data():
    global storage
    # read data from DB, web API, user input, etc.
    # maybe in a loop to keep receiving data. Or generate the data from here
    storage = (cX, cY, angle)

def send_to_ardunio():
    global storage
    # send storage data to ardunio, maybe in a loop
    # sleeping 30ms after each update


thread_read = Thread(target=read_data)
thread_ardunio = Thread(target=send_to_ardunio)
thread_read.start()
thread_ardunio.start()

这是一条路。我不太漂亮,因为有一个全球变量在那里。我们能做些什么来消除这个变量?我们可以使用队列(请参见queue.Queue)。在

我认为队列是线程之间通信的好方法。因此,一个拥有数据的线程将它们放入队列(又称生产者),而另一个线程从队列中挑选项目(又称消费者)。在

像这样:

def read_data(queue_):
    # read data from DB, web API, user input, etc.
    # maybe in a loop to keep receiving data, or generate the data from here
    data = (cX, cY, angle)
    queue_.put(data)

def send_to_ardunio(queue_):
    # send data to ardunio, maybe in a loop
    # sleeping 30ms after each update
    data = queue_.get()
    cX, cY, angle = data

queue_ = Queue()  # this will be used to transfer data
thread_read = Thread(target=read_data, args=(queue_,))
thread_ardunio = Thread(target=send_to_ardunio, args=(queue_,))
thread_read.start()
thread_ardunio.start()

看起来好多了。在

现在我们需要等待函数运行。所以我们可以在线程上调用join方法。这次我还冒昧地假设我们可以控制读取数据所需的时间。如果我们每30毫秒更新一次ardunio新数据,那么生产商可以调整频率,消费者可以毫不犹豫地消费。在

我们还需要一种方法来告诉线程停止生产/消费。 为此,我们可以使用Event(请参见threading.Event),或者为了简单起见,队列上的数据将代表消费者应该停止。在

def read_data(queue_):
    while True:
        # calculate/get cX, cY, angle
        queue_.put((cX, cY, angle))
        # sleep 30ms
        # when we finished producing data, put something to the queue to tell the consumer there is no more data. I'll assume None is good option here
    queue_.put(None)


def send_to_ardunio(queue_):
    while True:
        data = queue_.get()
        if data is None:
            break
        cX, cY, angle = data
        # update ardunio, because the data is updated every 30ms on the producer, we don't need to do anything special. We can just wait when the data is ready, we'll update.

queue_ = Queue()
thread_read = Thread(target=read_data, args=(queue_,))
thread_ardunio = Thread(target=send_to_ardunio, args=(queue_,))

thread_read.start()
thread_ardunio.start()

thread_read.join()
thread_ardunio.join()

上面的代码假定生产者(thread_read)将知道停止生成数据。在

如果不是这样,那么我们可以使用Event来触发两个函数停止生产和消费。在

最后,我在连接线程时遇到了一个小问题。如果主线程正在连接其他线程,它将被阻塞,并且不会很好地响应SIGINT。因此,如果您试图停止Python进程(通过按Ctrl+C或发送SIGINT),它不会退出。在

但是,我们可以尝试使用超时连接线程。因此,每一次主线程都可以查看接收到的信号并进行处理。在

def read_data(queue_, should_stop):
    while not should_stop.is_set():
        # calculate/get cX, cY, angle
        queue_.put((cX, cY, angle))
        # sleep 30ms

def send_to_ardunio(queue_, should_stop):
    while not should_stop.is_set():
        data = queue_.get()
        cX, cY, angle = data
        # update ardunio

def tell_when_to_stop(should_stop):
    # detect when to stop, and set the Event. For example, we'll wait for 10 seconds and then ask all to stop
    time.sleep(10)
    should_stop.set()


queue_ = Queue()
should_stop = Event()

thread_stop_decider = Thread(target=tell_when_to_stop, args=(should_stop,))
thread_read = Thread(target=read_data, args=(queue_, should_stop))
thread_ardunio = Thread(target=send_to_ardunio, args=(queue_, should_stop))

thread_read.start()
thread_ardunio.start()
thread_stop_decider.start()

try:
    while thread_read.is_alive():
        thread_read.join(1)
except KeyboardInterrupt:
        should_stop.set()
thread_read.join()
thread_ardunio.join()
thread_stop_decider.join()

相关问题 更多 >