pyserial serialwin32.py具有属性

2024-04-29 05:55:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我是Python新手,正在尝试运行Invensense提供的演示。它们提供了一个Python客户机,该客户机应该接受COM流量并操作图形。演示程序似乎已经崩溃,我不确定我安装的pyserial是否有问题。要求是python2.7,pyserial和pygame。pyserial可执行文件serialwin32.py引发以下错误:

Traceback (most recent call last):
.......
  File "C:\Python27\lib\site-packages\serial\serialwin32.py", line 47, in open
if port.upper().startswith('COM') and int(port[3:]) > 8:
AttributeError: 'int' object has no attribute 'upper'

使用来自用户101的输入(不知道如何在系统中感谢您,但谢谢!)

port = str(self.name)

此错误已解决。但是,与往常一样,当windows似乎拒绝访问时,下一个错误会在较低的位置显示几行:

 File "C:\Python27\lib\site-packages\serial\serialwin32.py", line 66, in open
raise SerialException("could not open port %r: %r" % (self.portstr, ctypes.WinError()))

serial.serialutil.SerialException: could not open port 'COM6': WindowsError(5, 'Access is denied.')

部分代码引用似乎是:

self.hComPort = win32.CreateFile(port,
           win32.GENERIC_READ | win32.GENERIC_WRITE,
           0, # exclusive access
           None, # no security
           win32.OPEN_EXISTING,
           win32.FILE_ATTRIBUTE_NORMAL | win32.FILE_FLAG_OVERLAPPED,
           0)
    if self.hComPort == win32.INVALID_HANDLE_VALUE:
        self.hComPort = None    # 'cause __del__ is called anyway
        raise SerialException("could not open port %r: %r" % (self.portstr, ctypes.WinError()))

最后是完整的代码摘录。

问题:
1) 有什么办法克服这个错误吗?

2)我的猜测是:—(这可能是一个特权问题,但作为python的新手,如果Windows 10认为它在保护我,我不知道如何解决这个问题。。。

3)我同意我根本不必修改pyserial(很多人可能已经尝试过),所以我怀疑还有其他问题。但是,我不知道,所以我也在这里发布客户端python代码。

———需要pyserial、pygame的客户端应用程序———

#!/usr/bin/python
# eMPL_client.py

# A PC application for use with Embedded MotionApps.
# Copyright 2012 InvenSense, Inc. All Rights Reserved.
import serial, sys, time, string, pygame from ponycube import *

class eMPL_packet_reader:

def __init__(self, port, quat_delegate=None, debug_delegate=None, data_delegate=None ):

    self.s = serial.Serial(port,115200)

    self.s.setTimeout(0.1)

    self.s.setWriteTimeout(0.2)

# TODO: Will this break anything?

        ##Client attempts to write to eMPL.

        #try:

        #self.s.write("\n")

        #except serial.serialutil.SerialTimeoutException:

        #pass # write will timeout if umpl app is already started.



    if quat_delegate:

        self.quat_delegate = quat_delegate

    else:

        self.quat_delegate = empty_packet_delegate()



    if debug_delegate:

        self.debug_delegate = debug_delegate

    else:

        self.debug_delegate = empty_packet_delegate()



    if data_delegate:

        self.data_delegate = data_delegate

    else:

        self.data_delegate = empty_packet_delegate()



    self.packets = []

    self.length = 0

    self.previous = None



def read(self):

    NUM_BYTES = 23

    p = None

    while self.s.inWaiting() >= NUM_BYTES:

        rs = self.s.read(NUM_BYTES)

        if ord(rs[0]) == ord('$'):

            pkt_code = ord(rs[1])

            if pkt_code == 1:

                d = debug_packet(rs)

                self.debug_delegate.dispatch(d)

            elif pkt_code == 2:

                p = quat_packet(rs)

                self.quat_delegate.dispatch(p) 

            elif pkt_code == 3:

                d = data_packet(rs)

                self.data_delegate.dispatch(d)

            else:

                print "no handler for pkt_code",pkt_code

        else:

            c = ' '

            print "serial misaligned!"

            while not ord(c) == ord('$'):

                c = self.s.read(1)

            self.s.read(NUM_BYTES-1)

def write(self,a):

    self.s.write(a)

def close(self):

    self.s.close()

def write_log(self,fname):

    f = open(fname,'w')

    for p in self.packets:

        f.write(p.logfile_line())

    f.close()

# ===========  PACKET DELEGATES  ==========

class packet_delegate(object):

def loop(self,event):

    print "generic packet_delegate loop w/event",event

def dispatch(self,p):

    print "generic packet_delegate dispatched",p

class empty_packet_delegate(packet_delegate):

def loop(self,event):

    pass

def dispatch(self,p):

    pass



class cube_packet_viewer (packet_delegate):

def __init__(self):

    self.screen = Screen(480,400,scale=1.5)

    self.cube = Cube(30,60,10)

    self.q = Quaternion(1,0,0,0)

    self.previous = None  # previous quaternion

    self.latest = None    # latest packet (get in dispatch, use in loop)



def loop(self,event):

    packet = self.latest

    if packet:

        q = packet.to_q().normalized()

        self.cube.erase(self.screen)

        self.cube.draw(self.screen,q)

        pygame.display.flip()

        self.latest = None



def dispatch(self,p):

    if isinstance(p,quat_packet):

        self.latest = p

class debug_packet_viewer (packet_delegate):

def loop(self,event):

    pass

def dispatch(self,p):

    assert isinstance(p,debug_packet);

    p.display()

class data_packet_viewer (packet_delegate):

def loop(self,event):

    pass

def dispatch(self,p):

    assert isinstance(p,data_packet);

    p.display()



# =============== PACKETS ================= 


# For 16-bit signed integers.

def two_bytes(d1,d2):

d = ord(d1)*256 + ord(d2)

if d > 32767:

    d -= 65536

return d



# For 32-bit signed integers.

def four_bytes(d1, d2, d3, d4):

d = ord(d1)*(1<<24) + ord(d2)*(1<<16) + ord(d3)*(1<<8) + ord(d4)

if d > 2147483648:

    d-= 4294967296

return d


class debug_packet (object):

# body of packet is a debug string

def __init__(self,l):

    sss = []

    for c in l[3:21]:

        if ord(c) != 0:

            sss.append(c)

    self.s = "".join(sss)



def display(self):

    sys.stdout.write(self.s)



class data_packet (object):

def __init__(self, l):

    self.data = [0,0,0,0,0,0,0,0,0]

    self.type = ord(l[2])

    if self.type == 0:   # accel

        self.data[0] = four_bytes(l[3],l[4],l[5],l[6]) * 1.0 / (1<<16)

        self.data[1] = four_bytes(l[7],l[8],l[9],l[10]) * 1.0 / (1<<16)

        self.data[2] = four_bytes(l[11],l[12],l[13],l[14]) * 1.0 / (1<<16)

    elif self.type == 1:   # gyro

        self.data[0] = four_bytes(l[3],l[4],l[5],l[6]) * 1.0 / (1<<16)

        self.data[1] = four_bytes(l[7],l[8],l[9],l[10]) * 1.0 / (1<<16)

        self.data[2] = four_bytes(l[11],l[12],l[13],l[14]) * 1.0 / (1<<16)

    elif self.type == 2:   # compass

        self.data[0] = four_bytes(l[3],l[4],l[5],l[6]) * 1.0 / (1<<16)

        self.data[1] = four_bytes(l[7],l[8],l[9],l[10]) * 1.0 / (1<<16)

        self.data[2] = four_bytes(l[11],l[12],l[13],l[14]) * 1.0 / (1<<16)

    elif self.type == 3:   # quat

        self.data[0] = four_bytes(l[3],l[4],l[5],l[6]) * 1.0 / (1<<30)

        self.data[1] = four_bytes(l[7],l[8],l[9],l[10]) * 1.0 / (1<<30)

        self.data[2] = four_bytes(l[11],l[12],l[13],l[14]) * 1.0 / (1<<30)

        self.data[3] = four_bytes(l[15],l[16],l[17],l[18]) * 1.0 / (1<<30)

    elif self.type == 4:   # euler

        self.data[0] = four_bytes(l[3],l[4],l[5],l[6]) * 1.0 / (1<<16)

        self.data[1] = four_bytes(l[7],l[8],l[9],l[10]) * 1.0 / (1<<16)

        self.data[2] = four_bytes(l[11],l[12],l[13],l[14]) * 1.0 / (1<<16)

    elif self.type == 5:   # rot

        self.data[0] = two_bytes(l[3],l[4]) * 1.0 / (1<<14)

        self.data[1] = two_bytes(l[5],l[6]) * 1.0 / (1<<14)

        self.data[2] = two_bytes(l[7],l[8]) * 1.0 / (1<<14)

        self.data[3] = two_bytes(l[9],l[10]) * 1.0 / (1<<14)

        self.data[4] = two_bytes(l[11],l[12]) * 1.0 / (1<<14)

        self.data[5] = two_bytes(l[13],l[14]) * 1.0 / (1<<14)

        self.data[6] = two_bytes(l[15],l[16]) * 1.0 / (1<<14)

        self.data[7] = two_bytes(l[17],l[18]) * 1.0 / (1<<14)

        self.data[8] = two_bytes(l[19],l[20]) * 1.0 / (1<<14)

    elif self.type == 6:   # heading

        self.data[0] = four_bytes(l[3],l[4],l[5],l[6]) * 1.0 / (1<<16)

    else:   # unsupported

        pass



def display(self):

    if self.type == 0:

        print 'accel: %7.3f %7.3f %7.3f' % \

            (self.data[0], self.data[1], self.data[2])

    elif self.type == 1:

        print 'gyro: %9.5f %9.5f %9.5f' % \

            (self.data[0], self.data[1], self.data[2])

    elif self.type == 2:

        print 'compass: %7.4f %7.4f %7.4f' % \

            (self.data[0], self.data[1], self.data[2])

    elif self.type == 3:

        print 'quat: %7.4f %7.4f %7.4f %7.4f' % \

            (self.data[0], self.data[1], self.data[2], self.data[3])

    elif self.type == 4:

        print 'euler: %7.4f %7.4f %7.4f' % \

            (self.data[0], self.data[1], self.data[2])

    elif self.type == 5:

        print 'rotation matrix: \n%7.3f %7.3f %7.3f\n%7.3f %7.3f %7.3f\n%7.3f %7.3f %7.3f' % \

            (self.data[0], self.data[1], self.data[2], self.data[3], \

             self.data[4], self.data[5], self.data[6], self.data[7], \

             self.data[8])

    elif self.type == 6:

        print 'heading: %7.4f' % self.data[0]

    else:

        print 'what?'



class quat_packet (object):

def __init__(self, l):

    self.l = l

    self.q0 = four_bytes(l[3],l[4],l[5],l[6]) * 1.0 / (1<<30)

    self.q1 = four_bytes(l[7],l[8],l[9],l[10]) * 1.0 / (1<<30)

    self.q2 = four_bytes(l[11],l[12],l[13],l[14]) * 1.0 / (1<<30)

    self.q3 = four_bytes(l[15],l[16],l[17],l[18]) * 1.0 / (1<<30)

def display_raw(self):

    l = self.l

    print "".join(

        [ str(ord(l[0])), " "] + \

        [ str(ord(l[1])), " "] + \

        [ str(ord(a)).ljust(4) for a in 

                            [ l[2], l[3], l[4], l[5], l[6], l[7], l[8], l[9], l[10] ] ] + \

        [ str(ord(a)).ljust(4) for a in 

                            [ l[8], l[9], l[10] , l[11], l[12], l[13]] ]

        )



def display(self):

    if 1:

        print "qs " + " ".join([str(s).ljust(15) for s in

            [ self.q0, self.q1, self.q2, self.q3 ]])

    if 0:

        euler0, euler1, euler2 = self.to_q().get_euler()

        print "eulers " + " ".join([str(s).ljust(15) for s in

            [ euler0, euler1, euler2 ]])

    if 0:

        euler0, euler1, euler2 = self.to_q().get_euler()

        print "eulers " + " ".join([str(s).ljust(15) for s in

            [ (euler0 * 180.0 / 3.14159) - 90 ]])



def to_q(self):

    return Quaternion(self.q0, self.q1, self.q2, self.q3)

# =============== MAIN ======================



if __name__ == "__main__":

if len(sys.argv) == 2:

    comport = int(sys.argv[1]) - 1

else:

    print "usage: " + sys.argv[0] + " port"

    sys.exit(-1)



pygame.init()

viewer = cube_packet_viewer()

debug  = debug_packet_viewer()

data   = data_packet_viewer()



reader = eMPL_packet_reader(comport, 

            quat_delegate = viewer, 

            debug_delegate = debug, 

            data_delegate = data)



while 1:

    event = pygame.event.poll()

    # TODO: Allow exit via keystroke.

    if event.type == pygame.QUIT:

        viewer.close()

        break

    if event.type == pygame.KEYDOWN:

        reader.write(pygame.key.name(event.key))

    reader.read()

    viewer.loop(event)

    debug.loop(event)

    data.loop(event)



    # TODO: If system load is too high, increase this sleep time.

    pygame.time.delay(0)

—————————————————————————————————————————————————————

#! python

import ctypes
import time
from serial import win32

import serial
from serial.serialutil import SerialBase, SerialException, to_bytes, portNotOpenError, writeTimeoutError


class Serial(SerialBase):
"""Serial port implementation for Win32 based on ctypes."""

BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
             9600, 19200, 38400, 57600, 115200)

def __init__(self, *args, **kwargs):
    super(SerialBase, self).__init__()
    self._port_handle = None
    self._overlapped_read = None
    self._overlapped_write = None
    SerialBase.__init__(self, *args, **kwargs)

def open(self):
    """\
    Open port with current settings. This may throw a SerialException
    if the port cannot be opened.
    """
    if self._port is None:
        raise SerialException("Port must be configured before it can be used.")
    if self.is_open:
        raise SerialException("Port is already open.")
    # the "\\.\COMx" format is required for devices other than COM1-COM8
    # not all versions of windows seem to support this properly
    # so that the first few ports are used with the DOS device name
    str port = self.name
    try:
        if port.upper().startswith('COM') and int(port[3:]) > 8:
            port = '\\\\.\\' + port
    except ValueError:
        # for like COMnotanumber
        pass
    self._port_handle = win32.CreateFile(
            port,
            win32.GENERIC_READ | win32.GENERIC_WRITE,
            0,  # exclusive access
            None,  # no security
            win32.OPEN_EXISTING,
            win32.FILE_ATTRIBUTE_NORMAL | win32.FILE_FLAG_OVERLAPPED,
            0)
    if self._port_handle == win32.INVALID_HANDLE_VALUE:
        self._port_handle = None    # 'cause __del__ is called anyway
        raise SerialException("could not open port %r: %r" % (self.portstr, ctypes.WinError()))

    try:
        self._overlapped_read = win32.OVERLAPPED()
        self._overlapped_read.hEvent = win32.CreateEvent(None, 1, 0, None)
        self._overlapped_write = win32.OVERLAPPED()
        #~ self._overlapped_write.hEvent = win32.CreateEvent(None, 1, 0, None)
        self._overlapped_write.hEvent = win32.CreateEvent(None, 0, 0, None)

        # Setup a 4k buffer
        win32.SetupComm(self._port_handle, 4096, 4096)

        # Save original timeout values:
        self._orgTimeouts = win32.COMMTIMEOUTS()
        win32.GetCommTimeouts(self._port_handle, ctypes.byref(self._orgTimeouts))

        self._reconfigure_port()

        # Clear buffers:
        # Remove anything that was there
        win32.PurgeComm(
                self._port_handle,
                win32.PURGE_TXCLEAR | win32.PURGE_TXABORT |
                win32.PURGE_RXCLEAR | win32.PURGE_RXABORT)
    except:
        try:
            self._close()
        except:
            # ignore any exception when closing the port
            # also to keep original exception that happened when setting up
            pass
        self._port_handle = None
        raise
    else:
        self.is_open = True

——————————————————————————————————————————————————


Tags: debugselfnonedataifbytespacketport
3条回答

回答1)因为这依赖于可用的硬件,所以测试代码完全有可能在它所编写的环境中工作,但在您的环境中不工作-如果您在Windows上,而这是在Linux上编写的,则很可能是这样。代码使用端口0-不知道如何映射到COM1等

2)在Windows上,COM端口以前有DOS名称,如COM1、COM2,即字符串,而不是int(它们不像TCP/IP端口号)。最近在Windows中有一种允许更通用的名称的“\.\COMnotanumber”格式,我见过USB到串行转换器使用这种格式。快速查看了serialutil.py中pyserial SerialBase的源代码后,我觉得有点奇怪,因为AFAICT self.name只在通过调用self.port(portname)使用显式端口设置时才被设置。您可能希望尝试使用serport=serial(0)初始化串行端口实例,然后显式调用serport.port('COM1')(或者使用您的端口名而不是COM1)。

在pySerial 3.x中,使用数字作为端口的可能性已被删除。请改用设备名(字符串),例如"/dev/ttyS0""COM4"

这些数字在操作系统中并不一致,有时甚至不包括所有设备。您可以使用serial.tools.list_port模块枚举大多数操作系统上实际可用的串行端口。

您可能有错误的serialwin32.py版本。在尝试另一个版本之前,我收到了相同的错误消息。我在这里使用2.7 https://pypi.python.org/pypi/pyserial/2.7。然后我简单地键入'empl-client.py 8',错误消息就消失了。我也没有为“Embedded MotionApps”设备安装驱动程序。所以,我在motion_driver_6.12文件夹中找到了eMPL_CDC.inf,并通过设备管理器安装了它。

现在我遇到的问题是嵌入式MotionApps设备在Windows中持续连接和重新连接。虽然Pygame现在正在运行,但只是坐在那里的一个黑屏。Invensense告诉我,这意味着它已经连接并等待数据。

看起来I2C总线上有数据,如果端口保持连接,也许可以工作。虽然当端口断开连接时数据从总线上消失,但可能整个程序正在重置。实际上,代码中有一个msp430_reset()函数,它看起来确实被调用了。

相关问题 更多 >