一个轻量级多智能体系统

MASlite的Python项目详细描述


build status

马斯利特

由bjorn madsen设计的多代理平台

版权所有©2016-2019。所有代码都是作者在 与其他系统的隔离和任何相似性纯属巧合。


马斯利特在60秒内解释:

Maslite是一个Simle Python模块,用于创建多代理模拟。

  • 简单api:只需学习3个模块:调度程序、代理和代理消息
  • 快速:每秒处理多达2.7亿条消息
  • 轻量级:52KB。

它只有3个组件:

  • 调度程序(主循环)

    • 处理暂停并继续一次呼叫。
    • 确保执行过程的可重复性,使代理易于调试。
    • 每秒处理多达2.7亿条信息。
  • 代理人

    • 是具有可自定义的setup()、update()和teardown()方法的python类。
    • 可以使用send()和receive()交换消息。
    • 可以订阅/取消订阅邮件类。
    • 有时钟,可以设置闹钟。
    • 可单独测试。
    • 可以有独立的I/O/数据库交互。
  • 消息

    • 使发送方和接收方启用直接通信的
    • 有主题但没有接收器的内容将被视为广播并发送给订户。

Maslite有很多用例:

  • 原型制作Massive™类型的游戏。
  • 创建数据处理管道
  • 优化发动机,用于:
    • 调度(使用Bjorn-Madsen的分布式调度方法)
    • 拍卖(使用Dimtry Bertsekas交替迭代拍卖)

用户需要担心的是交互协议, 可以方便地概括为:

  1. 设计代理将定期发送或接收的消息 继承必要实现详细信息的python对象 基本的agentMessage。消息必须具有明确的主题
  2. 编写应该在代理中执行一次的函数 接收其中一条消息。
  3. 用字典更新代理操作(self.operations) 它描述了主题函数之间的关系。
  4. 编写更新函数来维护代理的内部状态 使用发送发送消息,使用接收获取消息。

因此,用户可以使用以下命令创建代理:

class HelloMessage(AgentMessage):
    def __init__(self, sender, receiver)
        super().__init__(sender=sender, receiver=receiver)


class myAgent(Agent):
    def __init__(self):
        super().__init__()
        self.operations.update({HelloMessage.__name__: self.hello})
    
    def update(self):
        while self.messages:
            msg = self.receive()
            operation = self.operations.get(msg.topic))
            if operation is not None:
                operation(msg)
            else:
                self.logger.debug("%s: don't know what to do with: %s" % (self.uuid), str(msg)))
                
    def hello(self, msg)
        print(msg)

太简单了!

字典self.operations继承自代理 用{hellomessage.\uu name\uuu:self.hello}更新。自我操作 当HelloMessage到达时作为指针,因此当代理 调用update函数,它将从消息的 指向函数self.hello,在这个简单的函数中 示例只打印消息的内容。

更微妙的行为,也可以嵌入用户没有 担心任何外部因素。例如,如果某些消息 优先于其他邮件(优先邮件),应清空收件箱 在用于排序的更新函数的开头。

下面是一个例子,其中一些主题的优先级高于 其他:

class AgentWithPriorityInbox(Agent):
    def __init__(self):
        super().__init__()
        self.operations.update({"1": self.some_priority_function, 
                                "2": self.some_function, 
                                "3": self.some_function,  # Same function for 2 topics.! 
                                "hello": self.hello, })
        self.priority_topics = ["1","2","3"]
        self.priority_messages = deque()  # from collections import deque
        self.normal_messages = deque()    # deques append and popleft are threadsafe.

    def update(self):
        # 1. Empty the inbox and sort the messages using the topic:
        while self.messages:
            msg = self.receive()
            if msg.topic in self.priority_topics:
                self.priority_messages.append(msg)
            else:
                self.normal_messages.append(msg)
        
        # 2. We've now sorted the incoming messages and can now extend
        # the priority message deque with the normal messages:
        self.priority_messages.extend(normal_messages)
        
        # 3. Next we process them as usual:
        while len(self.priority_messages) > 0:
            msg = self.priority_messages.popleft()
            operation = self.operations.get(msg.topic)
            if operation is not None:
                operation(msg)
            else:
                ...

用户唯一需要担心的是 赋nction不能依赖于任何外部。探员只限于 发送(self.send(msg))和接收(msg=self.receive()) 必须在函数self.update中处理的消息。 在代理运行之前,对已发送邮件的任何响应都不会发生 再次更新。

如果任何状态需要存储在代理中,例如 存储发送或接收的消息,然后代理\u init\u应该 将变量声明为类变量并存储信息。 当然可以调用数据库、文件等,包括使用 当代理调用 分别是启动或停止。更多信息请参见锅炉板(如下) 详细说明。

样板

以下锅炉板允许用户管理整个生命周期 代理人,包括:

  1. 将变量添加到可以在更新之间存储信息的\uu init中。
  2. 通过扩展self.operations来响应主题
  3. 扩展设置拆卸以开始和结束代理生命周期。
  4. 在阅读消息之前(1)、期间(2)和之后(3)的操作中使用update

没有使用所有功能的要求。只是锅炉板 试图说明典型用法。

也不要求在程序中对代理进行编程, 功能性的或面向对象的方式。这完全取决于 Maslite的用户。

class MyAgent(Agent):
    def __init__(self):
        super().__init__()
        # add variables here.
        
        # remember to register topics and their functions:
        self.operations.update({"topic x": self.x,
                                "topic y": self.y,
                                "topic ...": self....})
    
    def setup(self):
        # add own setup operations here.
        
        # register topics with the mailman..!
        # naive:
        for topic in self.operations.keys():
            self.subscribe(topic)
        # selective
        for topic in ["topic x","topic y","topic ..."]:
            self.subscribe(topic)
    
    def teardown(self):
        # add own teardown operations here.
        
    def update(self):
        # do something before reading messages
        self.action_before_processing_messages()
    
        # read the messages
        while self.messages:
            msg = self.receive()
            
            # react immediately to some messages:
            operation = self.operations.get(msg.topic)
            if operation is not None:
                operation(msg)
        
        # react after reading all messages:
        self.action_after_processing_all_messages()
    
    # Functions added by the user that are not inherited from the 
    # `Agent`-class. If the `update` function should react on these,
    # the topic of the message must be in the self.operations dict.
    
    def action_before_processing_messages(self)
        # do something.
        
    def action_after_processing_all_messages(self)
        # do something. Perhaps send a message to somebody that update is done?
        msg = DoneMessages(sender=self, receiver=SomeOtherAgent)
        self.send(msg)
    
    def x(msg):
        # read msg and send a response
        from_ = msg.sender
        response = SomeMessage(sender=self, receiver=from_) 
        self.send(response)
    
    def y(msg):
        # ...

消息

消息是对象,需要使用基类agentMessage

当代理接收到消息时,应该根据其主题对其进行解释,该主题 应该(按惯例)也是消息的类名。实践证明 没有明显的理由不适用本公约,所以 未显式声明主题的消息将继承类名。 示例如下:

>>> from maslite import AgentMessage
>>> class MyMsg(AgentMessage):
...     def __init__(sender, receiver):
...         super().__init__(sender=sender, receiver=receiver)
...

>>> m = MyMsg(sender=1, receiver=2)
>>> m.topic

'MyMsg'

向消息添加函数。下面是一个消息的例子 功能:

class DatabaseUpdateMessage(AgentMessage):
    """ Description of the message """
    def __init__(self, sender, senders_db_alias):
        super().__init__(sender=sender, receiver=DatabaseAgent.__name__)
        self.senders_db_alias
        self._states = {1: 'new', 2: 'read'} 
        self._state = 1
        
    def get_senders_alias(self):
        return self.senders_db_alias
        
    def __next__(self)
        if self._state + 1 <= max(self._states.keys()):
            self._state += 1
    
    def state(self):
        return self._states[self._state]

类databaseupdateMessageagentMessage的子类,因此基本消息 处理属性可用于DatabaseUpdateMessage。这有助于用户 需要了解信息处理系统的工作原理。

init函数需要一个sender,它通常默认为代理的selfagentMessage知道如果在它的\uu in it\uu调用中获得代理,它将 获取代理uuid并使用它。类似的情况也适用于接收器,其中 操作基于本地代理从发件人获取消息,并且仅 根据返回发送代理Uuid的msg.get_sender()知道发件人。 如果发送方可能在多次运行过程中更改uuid,则本地代理 例如,应指示使用发送者数据库别名。为了这个目的 如图所示,上面的消息包含函数 然后可以在多次运行中保持不变。

消息还被设计为返回以保存pythons垃圾收集器: 当databaseagent接收到消息时,下面的-函数允许 要调用next(msg)来进行的代理是self.\u state从'1'(new)到'2'(read) 在使用"self.send(msg)"将其返回给发件人之前。在这种情况下 重要的是,databaseagent不将消息存储在其变量中,因为 消息发送时必须有任何打开的对象指针。这是因为 多处理,使用多处理。queues交换消息,其中 要求agents和agentMessages可以进行pickle操作。

如果代理无法pickle当添加到调度程序时,调度程序将 引发一个错误,说明是打开的指针引用。消息是 更宽容一点,因为管理邮件的mailman将尝试发送 消息并希望共享指针不会导致冲突。如果共享 对象指针的数量是用户所必需的(例如在原型设计过程中)。 调度程序必须设置为多处理器数量=0 调度程序运行单进程单线程。

消息约定

  • none作为接收器的消息视为广播。逻辑是 如果您不知道要发送给谁,请将其发送到none,然后 如果任何其他代理对消息主题作出反应,您可能会得到响应。 幕后的魔力由调度程序mailmanager处理(mailman) 它跟踪任何代理订阅的所有主题。 按照惯例,消息的主题应该是self.\uu class.\uu name.\uu

  • 具有类的消息将由所有代理接收。 是那个班的。

  • 具有特定uuid作为接收器的消息将由代理接收 拿着那个Uuid。如果其他任何代理正在跟踪该uuid,请订阅 然后,跟踪代理将收到消息的deepcopy,而不是 原件。

  • 要获取发送者的uuid,可以使用msg.sender方法。

  • 要订阅/取消订阅邮件,代理应使用订阅 直接工作。

当将代理添加到(setup)或从(teardown)中移除时,将运行这些方法。 调度器。代理的内部操作run方法保证了这一点:

def run(self):
    """ The main operation of the Agent. """
    if not self.is_setup():
        self.setup()
    if not self._quit:
        self.update()
    if self._quit:
        self.teardown()

可以通过编写自己的self.setup-方法来扩展设置方法 (推荐方法)。

如何从数据库连接加载数据

当将代理添加到调度程序时,将运行安装程序。 当代理从Maslite中删除时,将运行Teardown

如果以迭代方式添加和删除代理,它们将加载 在设置过程中声明,并在从某个数据库中拆卸过程中存储。 不必让调度程序知道数据库在哪里。 探员可以自己跟踪。

尽管用户可能会发现使用uuid来识别, 用户应该保留一个特定的代理 记住,uuid是独一无二的 代理人的。期望或依赖uuid持久化会导致 逻辑谬误。

用户必须使用setupteardown并包含命名约定 这确保代理不依赖于uuid。例如:

# get the data from the previously stored agent
begin transaction:
    id = SELECT agent_id FROM stored_agents WHERE agent_alive == False LIMIT 1;
    UPDATE stored_agents WHERE agent_id == id VALUES (agent_live = TRUE);
    properties = SELECT * FROM stored_agents WHERE agent_id == id;
end transaction;
# Finally let the agent load the properties:
self.load(properties) 

上述方法确保被复活的代理人没有 对uuid的依赖性。

入门

要开始,只需要3个步骤:

步骤1。设置计划程序

>>> from maslite import Agent, Scheduler
>>> s = Scheduler(number_of_multi_processors=0)

步骤2。创建具有setupteardownupdate方法的代理。

>>> class MyAgent(Agent):
...     def __init__(self):
...         super().__init__()
...     def setup(self):
...         pass
...     def teardown(self):
...         pass
...     def update(self):
...         pass
    
>>> m = MyAgent()
>>> s.add(m)
2017-02-11 15:05:27,171 - DEBUG - Registering agent MyAgent 331228774898081874557512062996431768652

步骤3。运行调度程序(此处不执行任何操作)

class HelloMessage(AgentMessage):
    def __init__(self, sender, receiver)
        super().__init__(sender=sender, receiver=receiver)


class myAgent(Agent):
    def __init__(self):
        super().__init__()
        self.operations.update({HelloMessage.__name__: self.hello})
    
    def update(self):
        while self.messages:
            msg = self.receive()
            operation = self.operations.get(msg.topic))
            if operation is not None:
                operation(msg)
            else:
                self.logger.debug("%s: don't know what to do with: %s" % (self.uuid), str(msg)))
                
    def hello(self, msg)
        print(msg)
0

其他方法,如s.run(seconds=none,iterations=none,pause_if_idle=false)可以作为用户应用, 觉得合适。

步骤4。调用schedulersstop方法以优雅地执行teardown方法 作为关闭过程的一部分,在所有代理上。

class HelloMessage(AgentMessage):
    def __init__(self, sender, receiver)
        super().__init__(sender=sender, receiver=receiver)


class myAgent(Agent):
    def __init__(self):
        super().__init__()
        self.operations.update({HelloMessage.__name__: self.hello})
    
    def update(self):
        while self.messages:
            msg = self.receive()
            operation = self.operations.get(msg.topic))
            if operation is not None:
                operation(msg)
            else:
                self.logger.debug("%s: don't know what to do with: %s" % (self.uuid), str(msg)))
                
    def hello(self, msg)
        print(msg)
1

使用pdb或断点(pycharm)进行调试

通过将断点放在 更新函数。这样你就能看到里面发生了什么这个 状态更新期间的代理。

典型错误

用户使用以下命令正确构造代理:

  1. 方法updatesendreceivesetupteardown
  2. 使用scheduler.add(代理)
  3. 将代理添加到计划程序
  4. 使用scheduler.run()运行调度程序

…但是…

问:探员似乎没有更新?

A:代理没有收到任何消息,因此没有更新。 这是正确的行为,因为只有在 新消息! 要强制代理在每个调度周期中运行update,请使用隐藏的 方法:代理。保持清醒=真。然而,盲目地这样做是一个糟糕的设计 如果代理只是轮询数据,则选择。为此目的 应该使用agent.set_闹钟(agent.now()+1),因为这允许 代理休眠1秒,并被警报消息"唤醒"。

建议使用警报而不是设置的原因 保持清醒=真系统的工作负载保持透明 在消息交换级别。记住 代理应始终隐藏,而消息应 表示任何活动。

使用时钟调整运行速度。

时钟是马斯利特的一个动力工具,应该加以研究。 时钟能够:

  • -inf+inf的速度运行,并且在这两者之间进行任何浮点运算。

  • 从时间-inf+inf开始,中间的任何浮点时间。

使用对时钟的api调用设置时钟:

class HelloMessage(AgentMessage):
    def __init__(self, sender, receiver)
        super().__init__(sender=sender, receiver=receiver)


class myAgent(Agent):
    def __init__(self):
        super().__init__()
        self.operations.update({HelloMessage.__name__: self.hello})
    
    def update(self):
        while self.messages:
            msg = self.receive()
            operation = self.operations.get(msg.topic))
            if operation is not None:
                operation(msg)
            else:
                self.logger.debug("%s: don't know what to do with: %s" % (self.uuid), str(msg)))
                
    def hello(self, msg)
        print(msg)
2

在上述调用中,调度器首先将时间设置为1000(无论是什么)。 使用s.clock.time。接下来,它使用s.clock.clock速度设置时钟速度 达到200倍的实时性

  1. clock.time=以秒为单位的时间。通常时间设置为1970-01-01t00:00:00.000000-Unix纪元-使用time.time()
  2. clock.clock_speed=1.000000

如果时钟设置为在clock.clock_speed=none的情况下运行,则调度程序 将要求时钟进行跳转,因此其行为如下:(伪代码):

class HelloMessage(AgentMessage):
    def __init__(self, sender, receiver)
        super().__init__(sender=sender, receiver=receiver)


class myAgent(Agent):
    def __init__(self):
        super().__init__()
        self.operations.update({HelloMessage.__name__: self.hello})
    
    def update(self):
        while self.messages:
            msg = self.receive()
            operation = self.operations.get(msg.topic))
            if operation is not None:
                operation(msg)
            else:
                self.logger.debug("%s: don't know what to do with: %s" % (self.uuid), str(msg)))
                
    def hello(self, msg)
        print(msg)
3

要在模拟过程中调整计时(无论出于何种原因),请 调度程序应准备好消息:

  1. 以最高速度运行:self.将新时钟速度设置为定时事件(开始时间=现在(),速度=无)
  2. 在模拟中将时钟设置为10倍速度1小时:设置运行时(开始时间=现在()+1*60*60,速度=10)这将需要6分钟的实时时间。
  3. 在模拟中将时钟设置为1X(实时)速度3小时:设置运行时(开始时间=现在()+3*60*60,速度=1)这将需要1小时的实时时间。
  4. 将时钟设置为模拟中4小时的10倍速度:设置运行时(开始时间=现在()+4*60*60,速度=10)
  5. 将时钟设置为"无",以便在模拟的其余部分尽可能快地运行:设置运行时(开始时间=现在()+6*60*60,速度=无)

注意:时钟速度可以在调度程序中设置为参数run函数:

class HelloMessage(AgentMessage):
    def __init__(self, sender, receiver)
        super().__init__(sender=sender, receiver=receiver)


class myAgent(Agent):
    def __init__(self):
        super().__init__()
        self.operations.update({HelloMessage.__name__: self.hello})
    
    def update(self):
        while self.messages:
            msg = self.receive()
            operation = self.operations.get(msg.topic))
            if operation is not None:
                operation(msg)
            else:
                self.logger.debug("%s: don't know what to do with: %s" % (self.uuid), str(msg)))
                
    def hello(self, msg)
        print(msg)
4

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java如何强制用户在允许访问活动之前处理对话框?我的许可证代码怎么了?   java ArraysList作为JSON   mysql如何在java中创建包含多个可选where子句的搜索语句?   java如何让Apache Camel在“直接”路径的末尾删除文件?   使用socket在两个Androids之间进行java实时数据传输。IO(websocket)和4G   如何在java中实现两个CORBA服务器之间的通信   会话树xml表示为java对象   java Skype4Java编号swtwin323325   java RecyclerView getAdapterPosition()不工作:第一次单击返回正确位置,第二次单击返回1   java在$TOMCAT/conf/context上为JNDI设置资源。xml   java为什么第二个矩形冲突在第一个矩形冲突时不起作用?   JScrollPane上的java JTextArea未出现在JPanel上   java如何将实现的PriorityQueue打印为字符串?   jpa使用Jersey更新用户角色RESTJava(JAXRS)