在转换中使用AsyncMachine和多个对象的正确方法

2024-06-17 15:17:11 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试通过WebSocket实现客户机-服务器应用程序 我对如何正确地维护每个连接的客户端的状态有一些疑问

全局计算机+每个连接的多个对象?
机器+对象-对于每个连接

因此,我从几个测试开始,检查它是如何并发工作的

基础机器

class AsyncModel:
    def __init__(self, id_):
        self.req_id = id_

    async def prepare_model(self, _):
        print("prepare_model", self.req_id)


    async def before_change(self, _):
        print("before_change", self.req_id)


    async def after_change(self, _):
        print("After change", self.req_id)


transition = dict(trigger="start", source="Start", dest="Done",
                  prepare="prepare_model",
                  before=["before_change"],
                  after="after_change")

和几种运行类型

我希望所有模型同时更改其状态


async def main():
    tasks = []
    machine = AsyncMachine(model=None,
                           states=["Start", "Done"],
                           transitions=[transition],
                           initial='Start',
                           send_event=True,
                           queued=True)
    for i in range(3):
        model = AsyncModel(id_=i)
        machine.add_model(model)

        tasks.append(model.start())

    await asyncio.gather(*tasks)

    for m in machine.models:
        machine.remove_model(m)

asyncio.run(main())

但结果是:


prepare_model 0
before_change 0
After change 0
prepare_model 1
before_change 1
After change 1
prepare_model 2
before_change 2
After change 2

如果我创建机器+模型:


async def main():
    tasks = []

    for i in range(3):
        model = AsyncModel(id_=i)
        machine = AsyncMachine(model=model,
                               states=["Start", "Done"],
                               transitions=[transition],
                               initial='Start',
                               send_event=True,
                               queued=True)


        tasks.append(model.start())

    await asyncio.gather(*tasks)

输出为:

prepare_model 0
prepare_model 1
prepare_model 2
before_change 0
before_change 1
before_change 2
After change 0
After change 1
After change 2

正确的方法是什么

更新

我希望每个正在运行的模型都有可用的contextvar,以便能够正确记录模型调用的其他模块的所有活动,而不是向每个外部函数调用(outisde model class)显式传递一些标识符
请参见某种示例https://pastebin.com/qMfh0kNb,它的工作方式与预期不符,断言火灾


Tags: 模型selfidtrueasyncmodeldefmachine
1条回答
网友
1楼 · 发布于 2024-06-17 15:17:11

“什么是正确的方法?”这个问题的一个常见答案是“好吧,这取决于……”。如果不清楚你想要实现什么,我只能回答我能在你的帖子中找到的一般性问题

对于transitions,我应该为每个型号使用一台机器还是为所有型号使用一台机器?

使用transitions时,模型是有状态的,并且包含转换回调。在那里,这台机器就像一本“规则手册”。因此,当机器具有相同的配置时,我建议对大多数用例的所有型号使用一台机器。在大多数情况下,使用具有相同配置的多台计算机只会增加内存占用和代码复杂性。不经意间,我可以想到一个用例,在这个用例中,拥有多台具有相同配置的机器可能会很有用。但首先你可能会想,为什么两个版本的行为都不一样,尽管我刚才说应该没什么区别

为什么在使用一个AsyncMachine和多个AsyncMachines时调用回调的顺序不同?

如果没有自定义参数,使用一个AsyncMachine或多个AsyncMachines没有区别。但是,您在构造函数中传递了queued=True,根据Documentation,构造函数执行以下操作:

If queued processing is enabled, a transition will be finished before the next transition is triggered

这就是为什么你的单机一次考虑一个转换,处理一个模型的所有回调,然后转移到下一个事件/转换。 因为每台机器都有自己的事件/转换队列,所以当使用多台机器时,事件将立即被处理。在使用多台机器的示例中,传递queued=True没有任何效果。通过不传递queued参数或传递queued=False(默认值),可以为一台机器获得相同的行为。我对您的示例进行了一些修改,以便于说明:

from transitions.extensions import AsyncMachine
import asyncio


class AsyncModel:
    def __init__(self, id_):
        self.req_id = id_

    async def prepare_model(self):
        print("prepare_model", self.req_id)

    async def before_change(self):
        print("before_change", self.req_id)

    async def after_change(self):
        print("after change", self.req_id)


transition = dict(trigger="start", source="Start", dest="Done",
                  prepare="prepare_model",
                  before="before_change",
                  after="after_change")

models = [AsyncModel(i) for i in range(3)]


async def main(queued):
    machine = AsyncMachine(model=models,
                           states=["Start", "Done"],
                           transitions=[transition],
                           initial='Start',
                           queued=queued)

    await asyncio.gather(*[model.start() for model in models])
    # alternatively you can dispatch an event to all models of a machine by name
    # await machine.dispatch("start")

print(">>> Queued=True")
asyncio.run(main(queued=False))
print(">>> Queued=False")
asyncio.run(main(queued=False))

所以这取决于你需要什么。对于一台机器,您可以使用queued=True对事件进行顺序处理,也可以使用queued=False进行并行处理

您提到了一个可能需要多台机器的用例…

文件中有这样一段话:

You should consider passing queued=True to the TimeoutMachine constructor. This will make sure that events are processed sequentially and avoid asynchronous racing conditions that may appear when timeout and event happen in close proximity.

当使用超时事件或连续发生的其他事件时,在同一模型上同时处理多个转换时,可能会出现racing conditions。因此,当这个问题影响到您的用例,并且您需要在不同的模型上并行处理转换时,使用具有相同配置的多台机器可能是一个解决方案

如何在AsyncMachine中使用上下文?

这对我来说是小菜一碟,我可能是错的。我可以试着简要总结一下我目前对事物为何会以某种方式表现的理解。考虑这个例子:

from transitions.extensions import AsyncMachine
import asyncio
import contextvars

context_model = contextvars.ContextVar('model', default=None)
context_message = contextvars.ContextVar('message', default="unset")

def process():
    model = context_model.get()
    print(f"Processing {model.id} Request {model.count} => '{context_message.get()}'")


class Model:

    def __init__(self, id):
        self.id = id
        self.count = 0

    def request(self):
        self.count += 1
        context_message.set(f"Super secret request from {self.id}")

    def nested(self):
        context_message.set(f"Not so secret message from {self.id}")
        process()


models = [Model(i) for i in range(3)]


async def model_loop(model):
    context_model.set(model)
    context_message.set(f"Hello from the model loop of {model.id}")
    while model.count < 3:
        await model.loop()


async def main():
    machine = AsyncMachine(model=models, initial='Start', transitions=[['loop', 'Start', '=']],
                           before_state_change='request',
                           after_state_change=[process, 'nested'])
    await asyncio.gather(*[model_loop(model) for model in models])

asyncio.run(main())

输出:

# Processing 0 Request 1 => 'Hello from the model loop of 0'
# Processing 0 Request 1 => 'Not so secret message from 0'
# Processing 1 Request 1 => 'Hello from the model loop of 1'
# Processing 1 Request 1 => 'Not so secret message from 1'
# Processing 2 Request 1 => 'Hello from the model loop of 2'
# Processing 2 Request 1 => 'Not so secret message from 2'
# Processing 0 Request 2 => 'Hello from the model loop of 0'
# Processing 0 Request 2 => 'Not so secret message from 0'
# Processing 1 Request 2 => 'Hello from the model loop of 1'
# Processing 1 Request 2 => 'Not so secret message from 1'
# Processing 2 Request 2 => 'Hello from the model loop of 2'
# Processing 2 Request 2 => 'Not so secret message from 2'
# Processing 0 Request 3 => 'Hello from the model loop of 0'
# Processing 0 Request 3 => 'Not so secret message from 0'
# Processing 1 Request 3 => 'Hello from the model loop of 1'
# Processing 1 Request 3 => 'Not so secret message from 1'
# Processing 2 Request 3 => 'Hello from the model loop of 2'
# Processing 2 Request 3 => 'Not so secret message from 2'

触发事件已转发到设置两个上下文变量的模型循环。这两个函数都由process使用,这是一个使用上下文变量进行处理的全局函数。当触发转换时,将在转换之前调用Model.request,并增加Model.count。更改Model.state后,将调用全局函数processModel.nested

process被调用两次:一次在模型循环中,一次在Model.nested回调中。无法访问从Model.request更改的context_message,但是Model.nested中的更改可用于process。 怎么样?因为processModel.request共享相同的父上下文(Model可以检索context_message的当前值),但是当Model设置变量时,它仅在其当前本地上下文中可用,而后者对process的调用(在另一个回调中)无法访问。如果希望本地更改可以被process访问,则需要像在Model.nested中所做的那样从回调中触发它

长话短说:AsyncMachine的回调确实共享相同的父上下文,但不能访问彼此的本地上下文,因此更改没有效果。但是,当上下文变量是referec时e(如^{)对模型的更改可以在其他回调中访问

使用transitions事件队列(queued=True)和依赖contextvars需要一些额外的考虑,因为正如文档所述“在处理队列中的事件时,触发器调用将始终返回True,因为在排队时无法确定涉及排队调用的转换是否最终成功完成。即使只处理一个事件,也是如此”。触发的事件只能添加到队列中。紧接着,它的上下文在事件处理之前就被保留了。如果您需要队列处理和contextvars,并且也无法从内部模型回调调用函数,则应选中asyncio.Lock并将调用包装到loop,但保留queued=False防止函数调用在完成之前返回

相关问题 更多 >