如何在树莓pi上使用多线程进行LCD输出

2024-04-18 16:55:40 发布

您现在位置:Python中文网/ 问答频道 /正文

在raspberryp pi上写入16x2 LCD显示屏可能需要一些时间才能完成,尤其是我编写的模块可以自动滚动超过显示器长度的文本。在

我需要使用多线程或类似的方法,将输出发送到显示器并继续执行程序的其余部分。我尝试过多线程的一些方法,但还没有完全实现。在

这是没有任何多线程的工作代码。要成为多线程的方法I“TextToLCD.ProcessFrameBuffer". 在

在皮贝尔.py在

#!/usr/bin/env python3

import time
import rekognition
import TextToLCD
import PiPhoto
import json
import logging
import re
import threading
from queue import Queue

logFormatter = logging.Formatter("%(asctime)s [%(name)-8.8s]/[%(funcName)-12.12s] [%(levelname)-5.5s]  %(message)s")
rootLogger = logging.getLogger('piBell')

fileHandler = logging.FileHandler("{0}/{1}.log".format("./", "piBell"), 'a')
fileHandler.setFormatter(logFormatter)
rootLogger.addHandler(fileHandler)

consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logFormatter)
rootLogger.addHandler(consoleHandler)

reFace = re.compile('face|head|selfie|portrait|person', re.IGNORECASE)

def main(debugMode='INFO'):
    TextToLCD.Clear()
    rootLogger.setLevel(debugMode)
    imgRotation = 270
    imgPath = './'
    imgName = 'image.jpg'

    TextToLCD.ProcessFrameBuffer(["Scanning:", "................."], debugMode)
    PiPhoto.GetPhoto(imgPath + imgName, imgRotation, "INFO")

    rootLogger.info("Sending image to rekognition.")
    TextToLCD.ProcessFrameBuffer(["Processing","................."], debugMode)

    jsonLabels = rekognition.get_labels(imgPath + imgName)
    rootLogger.info("Obtained JSON payload from rekognition.")
    rootLogger.debug(json.dumps(jsonLabels))

    if len(json.dumps(jsonLabels)) > 0:
        if IsFace(jsonLabels):
            if TestFace(imgPath + imgName):
                TextToLCD.ProcessFrameBuffer(['Hello', '      :)'], debugMode)

                celeb = IsCelebrity(imgPath + imgName)
                if celeb:
                    TextToLCD.ProcessFrameBuffer(["You look like:", celeb], debugMode)
            else:
                rootLogger.info("No face detected.")
                TextToLCD.ProcessFrameBuffer(['No face detected', '       :('], debugMode)

        else:
            rootLogger.info("No face detected.")
            TextToLCD.ProcessFrameBuffer(['No face detected', '       :('], debugMode)
    else:
        rootLogger.error("JSON payload from rekognition was empty.")

def IsFace(jsonPayload):
    for value in jsonPayload:
        rootLogger.info("Label: " + value['Name'] + ", Confidence: " +  str(round(value['Confidence'])))
        rootLogger.debug(json.dumps(jsonPayload))

        if reFace.match(value['Name']) and round(value['Confidence']) > 75:
            rootLogger.info("Possible face match.")
            return True
    return False

def TestFace(img):
    jsonFaces = rekognition.get_faces(img)
    rootLogger.debug(json.dumps(jsonFaces))

    if len(json.dumps(jsonFaces)) > 2:
        for item in jsonFaces:
            if item['Confidence']:
                if item['Confidence'] > 75:
                    rootLogger.info("Face detected. Confidence: " + str(round(item['Confidence'])))
                    return True
    else:
        rootLogger.info("No facial data obtained.")

    return False

def IsCelebrity(img):
    celebMatchAccuracy = 25
    jsonCelbFaces = rekognition.get_celebrities(img)
    rootLogger.debug(json.dumps(jsonCelbFaces))

    if len(json.dumps(jsonCelbFaces)) > 2:
        for item in jsonCelbFaces:
            if item['MatchConfidence']:
                if item['MatchConfidence'] > celebMatchAccuracy and item['Name']:
                    rootLogger.info("Celebirity match detected: " + item['Name'] + ", Confidence: " + str(round(item['MatchConfidence'])))

                    return item['Name']
    else:
        rootLogger.info("No celebirity match found.")

    return False


if __name__ == "__main__":
    main('INFO')

Tags: noimportinfojsonifitemfaceconfidence
1条回答
网友
1楼 · 发布于 2024-04-18 16:55:40

首先,很高兴看到你的液晶显示功能。 您使用的是硬件,所以硬件是有限的资源。 为此,您需要某种访问控制,这可以用Lock对象或Event对象来实现。在

使用显示器时有两个选择:

  1. 把当前的写作进行到底
  2. 中断当前写入

    import threading
    
    import time
    def parallelWithLock(lock:threading.Lock, name:str):
    
        with lock:
             for i in range(5):
             print(f"{name}: {i}")
             time.sleep(0.5)
             # doWantYouWant(...)
    
    def parallelWithInterrupt(event:threading.Event,lock:threading.Lock,name:str):
    
    
        event.set()
        i = 0
        with lock:
            event.clear()
            while True:# or writing
                 if event.isSet():
                      print(f"{name} Interrupted!")
                      break
                 print(f"{name}: {i}")
                 time.sleep(1)
                 i += 1
                 #doWantYouWant(...)
    
     if __name__ == '__main__':
          lock = threading.Lock()
    
          t1 = threading.Thread(target=parallelWithLock,args=(lock,"Thread_1"))
          t2 = threading.Thread(target=parallelWithLock,args=(lock,"Thread_2"))
          t1.start()
          t2.start()
          t1.join()
          t2.join()
          event = threading.Event()
          lock = threading.Lock()
          t3 = threading.Thread(target=parallelWithInterrupt,args=(event,lock,"Thread_3"))
          t4 = threading.Thread(target=parallelWithInterrupt,args=(event,lock,"Thread_4"))
          t5 = threading.Thread(target=parallelWithInterrupt, args=(event,lock, "Thread_4"))
          t3.start()
          time.sleep(5)
          t4.start()
          time.sleep(3)
          t5.start()
          t3.join()
          t4.join()
          time.sleep(2)
          event.set()
          t5.join()
    

相关问题 更多 >