Python 协议解析的构造方法

1 投票
1 回答
1689 浏览
提问于 2025-04-17 17:18

我正在尝试把twistedProtocolconstruct这个声明式的二进制数据解析器结合起来。

到目前为止,我的MessageReceiver协议是通过以下方式来接收来自tcp通道的数据的:

def rawDataReceived(self, data):
    '''
    This method bufferizes the data coming from the TCP channel in the following way:
        - Initially, discard the stream until a reserved character is detected
        - add data to the buffer up to the expected message length unless the reserved character is met again. In that case discard the message and start again
        - if the expected message length is reached, attempt to parse the message and clear the buffer
    '''
    if self._buffer:
        index = data.find(self.reserved_character)

        if index > -1:
            if len(self._buffer) + index >= self._fixed_size:
                self.on_message(self._buffer + data[:data.index(self._reserved_character)])

            self._buffer = b''
            data = data[data.index(self.reserved_character):]
            [self.on_message(chunks[:self._fixed_size]) for chunks in  [self.reserved_character + msg for msg in data.split(self._reserved_character) if msg]]

        elif len(self._buffer) + len(data) < self._expected_size:
            self._buffer = self._buffer + data
        else:
            self._buffer = b''
    else:  
        try:
            data = data[data.index(self._reserved_character):]
            [self.on_message(chunks[:self._fixed_size]) for chunks in  [self._reserved_character + msg for msg in data.split(self._reserved_character) if msg]]
        except Exception, exc:
            log.msg("Warning: Maybe there is no delimiter {delim} for the new message. Error: {err}".format(delim=self._reserved_character, err=str(exc)))

现在我需要对协议进行改进,因为消息可能会有可选字段(所以消息的长度不再是固定的)。我用construct设计了消息解析器的一个重要部分,如下所示:

def on_message(self, msg):
    return Struct(HEADER,
        Bytes(HEADER_RAW, 3),
        BitStruct(OPTIONAL_HEADER_STRUCT, 
            Nibble(APPLICATION_SELECTOR),
            Flag(OPTIONAL_HEADER_FLAG), 
            Padding(3)
        ),
        If(lambda ctx: ctx.optional_header_struct[OPTIONAL_HEADER_FLAG],
            Embed(Struct(None,
                   Byte(BATTERY_CHARGE),
                   Bytes(OPTIONAL_HEADER, 3)
                   )
            )
        )
    ).parse(msg)

所以现在我需要改变缓冲逻辑,以便将正确的块大小传递给Struct。我希望避免在rawDataReceived方法中调整要传递给Struct的数据大小,因为关于什么是可能的消息候选者的规则已经在construct对象中定义好了。

有没有办法把缓冲逻辑推到construct对象里面去呢?

编辑

我部分实现了将缓冲逻辑推入内部的目标,方法是简单地使用了MacroAdapter

MY_PROTOCOL = Struct("whatever",
    Anchor("begin"),
    RepeatUntil(lambda obj, ctx:obj==RESERVED_CHAR, Field("garbage", 1)),
    NoneOf(Embed(HEADER_SECTION), [RESERVED_CHAR]),
    Anchor("end"),
    Value("size", lambda ctx:ctx.end - ctx.begin)
)

这大大简化了调用代码(由于Glyph的建议,这部分代码不再在rawDataReceived中):

def dataReceived(self, data):
    log.msg('Received data: {}'.format(bytes_to_hex(data)))
    self._buffer += data
    try:
        container = My_PROTOCOL.parse(self._buffer)
        self._buffer = self._buffer[container.size:]
        d, self.d = self.d, self._create_new_transmission_deferred()
        d.callback(container)
    except ValidationError, err:
        self._cb_error("A validation error occurred. Discarding the rest of the message. {}".format(err))
        self._buffer = b''    
    except FieldError, err:     #Incomplete message. We simply keep on buffering and retry
        if len(self._buffer) >= MyMessageReceiver.MAX_GARBAGE_SIZE:
            self._cb_error("Buffer overflown. No delimiter found in the stream")

不幸的是,这个解决方案只能部分满足需求,因为我找不到方法让construct告诉我产生错误的流的索引,因此我不得不丢弃整个缓冲区,这并不是理想的解决方案。

1 个回答

0

要获取发生错误时的流位置,你需要使用 Anchor,并自己写一个版本的 NoneOf。假设 HEADER_SECTION 是另一个构造,像这样替换 NoneOf:

SpecialNoneOf(Struct('example', Anchor('position'), HEADER_SECTION), [RESERVED_CHAR]))

SpecialNoneOf 需要从 Adapter 继承,并将 NoneOf 的 init 和 _validate 与 Validator 的 _encode 和 _decode 结合起来。在 _decode 中,将

raise ValidationError("invalid object", obj)

替换为

raise ValidationError("invalid object", obj.header_section + " at " + obj.position)

把 header_section 替换成 HEADER_SECTION 构造的名字。你可能需要改变结果容器的结构,或者想出其他方法来使用 Embed,使这个方法能够正常工作。

撰写回答