一个轻量级多智能体系统
MASlite的Python项目详细描述
马斯利特
由bjorn madsen设计的多代理平台
版权所有©2016-2019。所有代码都是作者在 与其他系统的隔离和任何相似性纯属巧合。
马斯利特在60秒内解释:
Maslite是一个Simle Python模块,用于创建多代理模拟。
- 简单api:只需学习3个模块:调度程序、代理和代理消息
- 快速:每秒处理多达2.7亿条消息
- 轻量级:52KB。
它只有3个组件:
调度程序(主循环)
- 处理暂停并继续一次呼叫。
- 确保执行过程的可重复性,使代理易于调试。
- 每秒处理多达2.7亿条信息。
代理人
- 是具有可自定义的setup()、update()和teardown()方法的python类。
- 可以使用send()和receive()交换消息。
- 可以订阅/取消订阅邮件类。
- 有时钟,可以设置闹钟。
- 可单独测试。
- 可以有独立的I/O/数据库交互。
消息
- 使发送方和接收方启用直接通信的
- 有主题但没有接收器的内容将被视为广播并发送给订户。
Maslite有很多用例:
- 原型制作Massive™类型的游戏。
- 创建数据处理管道
- 优化发动机,用于:
- 调度(使用Bjorn-Madsen的分布式调度方法)
- 拍卖(使用Dimtry Bertsekas交替迭代拍卖)
用户需要担心的是交互协议, 可以方便地概括为:
- 设计代理将定期发送或接收的消息
继承必要实现详细信息的python对象
基本的
agentMessage
。消息必须具有明确的主题 - 编写应该在代理中执行一次的函数 接收其中一条消息。
- 用字典更新代理操作(
self.operations
) 它描述了主题
和函数
之间的关系。 - 编写更新函数来维护代理的内部状态
使用
发送
发送消息,使用接收
获取消息。
因此,用户可以使用以下命令创建代理:
class HelloMessage(AgentMessage):
def __init__(self, sender, receiver)
super().__init__(sender=sender, receiver=receiver)
class myAgent(Agent):
def __init__(self):
super().__init__()
self.operations.update({HelloMessage.__name__: self.hello})
def update(self):
while self.messages:
msg = self.receive()
operation = self.operations.get(msg.topic))
if operation is not None:
operation(msg)
else:
self.logger.debug("%s: don't know what to do with: %s" % (self.uuid), str(msg)))
def hello(self, msg)
print(msg)
太简单了!
字典self.operations
继承自代理
用
{hellomessage.\uu name\uuu:self.hello}
更新。自我操作
当
HelloMessage
到达时作为指针,因此当代理
调用update函数,它将从消息的
指向函数self.hello
,在这个简单的函数中
示例只打印消息的内容。
更微妙的行为,也可以嵌入用户没有 担心任何外部因素。例如,如果某些消息 优先于其他邮件(优先邮件),应清空收件箱 在用于排序的更新函数的开头。
下面是一个例子,其中一些主题的优先级高于 其他:
class AgentWithPriorityInbox(Agent):
def __init__(self):
super().__init__()
self.operations.update({"1": self.some_priority_function,
"2": self.some_function,
"3": self.some_function, # Same function for 2 topics.!
"hello": self.hello, })
self.priority_topics = ["1","2","3"]
self.priority_messages = deque() # from collections import deque
self.normal_messages = deque() # deques append and popleft are threadsafe.
def update(self):
# 1. Empty the inbox and sort the messages using the topic:
while self.messages:
msg = self.receive()
if msg.topic in self.priority_topics:
self.priority_messages.append(msg)
else:
self.normal_messages.append(msg)
# 2. We've now sorted the incoming messages and can now extend
# the priority message deque with the normal messages:
self.priority_messages.extend(normal_messages)
# 3. Next we process them as usual:
while len(self.priority_messages) > 0:
msg = self.priority_messages.popleft()
operation = self.operations.get(msg.topic)
if operation is not None:
operation(msg)
else:
...
用户唯一需要担心的是
赋nction不能依赖于任何外部。探员只限于
发送(self.send(msg)
)和接收(msg=self.receive()
)
必须在函数self.update
中处理的消息。
在代理运行之前,对已发送邮件的任何响应都不会发生
再次更新。
如果任何状态需要存储在代理中,例如
存储发送或接收的消息,然后代理\u init\u
应该
将变量声明为类变量并存储信息。
当然可以调用数据库、文件等,包括使用
当代理调用
分别是启动或停止。更多信息请参见锅炉板(如下)
详细说明。
样板
以下锅炉板允许用户管理整个生命周期 代理人,包括:
- 将变量添加到可以在更新之间存储信息的
\uu init
中。 - 通过扩展self.operations来响应主题
- 扩展
设置
和
拆卸
以开始和结束代理生命周期。
- 在阅读消息之前(1)、期间(2)和之后(3)的操作中使用
update
。
没有使用所有功能的要求。只是锅炉板 试图说明典型用法。
也不要求在程序中对代理进行编程, 功能性的或面向对象的方式。这完全取决于 Maslite的用户。
class MyAgent(Agent):
def __init__(self):
super().__init__()
# add variables here.
# remember to register topics and their functions:
self.operations.update({"topic x": self.x,
"topic y": self.y,
"topic ...": self....})
def setup(self):
# add own setup operations here.
# register topics with the mailman..!
# naive:
for topic in self.operations.keys():
self.subscribe(topic)
# selective
for topic in ["topic x","topic y","topic ..."]:
self.subscribe(topic)
def teardown(self):
# add own teardown operations here.
def update(self):
# do something before reading messages
self.action_before_processing_messages()
# read the messages
while self.messages:
msg = self.receive()
# react immediately to some messages:
operation = self.operations.get(msg.topic)
if operation is not None:
operation(msg)
# react after reading all messages:
self.action_after_processing_all_messages()
# Functions added by the user that are not inherited from the
# `Agent`-class. If the `update` function should react on these,
# the topic of the message must be in the self.operations dict.
def action_before_processing_messages(self)
# do something.
def action_after_processing_all_messages(self)
# do something. Perhaps send a message to somebody that update is done?
msg = DoneMessages(sender=self, receiver=SomeOtherAgent)
self.send(msg)
def x(msg):
# read msg and send a response
from_ = msg.sender
response = SomeMessage(sender=self, receiver=from_)
self.send(response)
def y(msg):
# ...
消息
消息是对象,需要使用基类agentMessage
当代理接收到消息时,应该根据其主题对其进行解释,该主题 应该(按惯例)也是消息的类名。实践证明 没有明显的理由不适用本公约,所以 未显式声明主题的消息将继承类名。 示例如下:
>>> from maslite import AgentMessage
>>> class MyMsg(AgentMessage):
... def __init__(sender, receiver):
... super().__init__(sender=sender, receiver=receiver)
...
>>> m = MyMsg(sender=1, receiver=2)
>>> m.topic
'MyMsg'
向消息添加函数。下面是一个消息的例子 功能:
class DatabaseUpdateMessage(AgentMessage):
""" Description of the message """
def __init__(self, sender, senders_db_alias):
super().__init__(sender=sender, receiver=DatabaseAgent.__name__)
self.senders_db_alias
self._states = {1: 'new', 2: 'read'}
self._state = 1
def get_senders_alias(self):
return self.senders_db_alias
def __next__(self)
if self._state + 1 <= max(self._states.keys()):
self._state += 1
def state(self):
return self._states[self._state]
类databaseupdateMessage是agentMessage
的子类,因此基本消息
处理属性可用于DatabaseUpdateMessage。这有助于用户
需要了解信息处理系统的工作原理。
init函数需要一个sender,它通常默认为代理的self
。
agentMessage
知道如果在它的\uu in it\uu调用中获得代理,它将
获取代理uuid并使用它。类似的情况也适用于接收器,其中
操作基于本地代理从发件人获取消息,并且仅
根据返回发送代理Uuid的msg.get_sender()知道发件人。
如果发送方可能在多次运行过程中更改uuid,则本地代理
例如,应指示使用
发送者数据库别名
。为了这个目的
如图所示,上面的消息包含函数
然后可以在多次运行中保持不变。
消息还被设计为返回以保存pythons垃圾收集器:
当databaseagent接收到消息时,下面的-函数允许
要调用
next(msg)
来进行的代理是self.\u state
从'1'(new)到'2'(read)
在使用"self.send(msg)"将其返回给发件人之前。在这种情况下
重要的是,databaseagent不将消息存储在其变量中,因为
消息发送时必须不有任何打开的对象指针。这是因为
多处理,使用多处理。queue
s交换消息,其中
要求agent
s和agentMessage
s可以进行pickle操作。
如果代理
无法pickle当添加到调度程序时,调度程序将
引发一个错误,说明是打开的指针引用。消息是
更宽容一点,因为管理邮件的mailman
将尝试发送
消息并希望共享指针不会导致冲突。如果共享
对象指针的数量是用户所必需的(例如在原型设计过程中)。
调度程序必须设置为多处理器数量=0
调度程序运行单进程单线程。
消息约定:
将
none
作为接收器的消息视为广播。逻辑是 如果您不知道要发送给谁,请将其发送到none
,然后 如果任何其他代理对消息主题作出反应,您可能会得到响应。 幕后的魔力由调度程序mailmanager处理(mailman
) 它跟踪任何代理
订阅的所有主题。 按照惯例,消息的主题应该是
self.\uu class.\uu name.\uu
具有
类的消息将由所有代理接收。 是那个班的。
具有特定uuid作为接收器的消息将由代理接收 拿着那个Uuid。如果其他任何代理正在跟踪该uuid,请订阅 然后,跟踪代理将收到消息的
deepcopy
,而不是 原件。要获取发送者的uuid,可以使用
msg.sender
方法。要订阅/取消订阅邮件,代理应使用
订阅
直接工作。
当将代理添加到(setup
)或从(teardown
)中移除时,将运行这些方法。
调度器。代理的内部操作run
方法保证了这一点:
def run(self):
""" The main operation of the Agent. """
if not self.is_setup():
self.setup()
if not self._quit:
self.update()
if self._quit:
self.teardown()
可以通过编写自己的self.setup
-方法来扩展设置方法
(推荐方法)。
如何从数据库连接加载数据
当将代理添加到调度程序时,将运行安装程序
。
当代理从Maslite中删除时,将运行Teardown
。
如果以迭代方式添加和删除代理,它们将加载 在设置过程中声明,并在从某个数据库中拆卸过程中存储。 不必让调度程序知道数据库在哪里。 探员可以自己跟踪。
尽管用户可能会发现使用uuid
来识别,
用户应该保留一个特定的代理
记住,uuid是独一无二的
代理人的。期望或依赖uuid持久化会导致
逻辑谬误。
用户必须使用setup
和teardown
并包含命名约定
这确保代理不依赖于uuid。例如:
# get the data from the previously stored agent
begin transaction:
id = SELECT agent_id FROM stored_agents WHERE agent_alive == False LIMIT 1;
UPDATE stored_agents WHERE agent_id == id VALUES (agent_live = TRUE);
properties = SELECT * FROM stored_agents WHERE agent_id == id;
end transaction;
# Finally let the agent load the properties:
self.load(properties)
上述方法确保被复活的代理人没有 对uuid的依赖性。
入门
要开始,只需要3个步骤:
步骤1。设置计划程序
>>> from maslite import Agent, Scheduler
>>> s = Scheduler(number_of_multi_processors=0)
步骤2。创建具有setup
、teardown
和update
方法的代理。
>>> class MyAgent(Agent):
... def __init__(self):
... super().__init__()
... def setup(self):
... pass
... def teardown(self):
... pass
... def update(self):
... pass
>>> m = MyAgent()
>>> s.add(m)
2017-02-11 15:05:27,171 - DEBUG - Registering agent MyAgent 331228774898081874557512062996431768652
步骤3。运行调度程序(此处不执行任何操作)
class HelloMessage(AgentMessage):
def __init__(self, sender, receiver)
super().__init__(sender=sender, receiver=receiver)
class myAgent(Agent):
def __init__(self):
super().__init__()
self.operations.update({HelloMessage.__name__: self.hello})
def update(self):
while self.messages:
msg = self.receive()
operation = self.operations.get(msg.topic))
if operation is not None:
operation(msg)
else:
self.logger.debug("%s: don't know what to do with: %s" % (self.uuid), str(msg)))
def hello(self, msg)
print(msg)
0
其他方法,如s.run(seconds=none,iterations=none,pause_if_idle=false)
可以作为用户应用,
觉得合适。
步骤4。调用schedulersstop
方法以优雅地执行teardown
方法
作为关闭过程的一部分,在所有代理上。
class HelloMessage(AgentMessage):
def __init__(self, sender, receiver)
super().__init__(sender=sender, receiver=receiver)
class myAgent(Agent):
def __init__(self):
super().__init__()
self.operations.update({HelloMessage.__name__: self.hello})
def update(self):
while self.messages:
msg = self.receive()
operation = self.operations.get(msg.topic))
if operation is not None:
operation(msg)
else:
self.logger.debug("%s: don't know what to do with: %s" % (self.uuid), str(msg)))
def hello(self, msg)
print(msg)
1
使用pdb或断点(pycharm)进行调试
通过将断点放在 更新函数。这样你就能看到里面发生了什么这个 状态更新期间的代理。
典型错误
用户使用以下命令正确构造代理:
- 方法
update
,send
,receive
,setup
和teardown
, - 使用scheduler.add(代理) 将代理添加到计划程序
- 使用scheduler.run()运行调度程序
…但是…
问:探员似乎没有更新?
A:代理没有收到任何消息,因此没有更新。
这是正确的行为,因为只有在
新消息!
要强制代理在每个调度周期中运行update
,请使用隐藏的
方法:代理。保持清醒=真
。然而,盲目地这样做是一个糟糕的设计
如果代理只是轮询数据,则选择。为此目的
应该使用agent.set_闹钟(agent.now()+1)
,因为这允许
代理休眠1秒,并被警报消息"唤醒"。
建议使用警报而不是设置的原因
保持清醒=真
系统的工作负载保持透明
在消息交换级别。记住
代理应始终隐藏,而消息应
表示任何活动。
使用时钟调整运行速度。
时钟是马斯利特的一个动力工具,应该加以研究。 时钟能够:
以
-inf
;+inf
的速度运行,并且在这两者之间进行任何浮点运算。从时间
-inf
;+inf
开始,中间的任何浮点时间。
使用对时钟的api调用设置时钟:
class HelloMessage(AgentMessage):
def __init__(self, sender, receiver)
super().__init__(sender=sender, receiver=receiver)
class myAgent(Agent):
def __init__(self):
super().__init__()
self.operations.update({HelloMessage.__name__: self.hello})
def update(self):
while self.messages:
msg = self.receive()
operation = self.operations.get(msg.topic))
if operation is not None:
operation(msg)
else:
self.logger.debug("%s: don't know what to do with: %s" % (self.uuid), str(msg)))
def hello(self, msg)
print(msg)
2
在上述调用中,调度器首先将时间设置为1000
(无论是什么)。
使用s.clock.time。接下来,它使用s.clock.clock速度设置时钟速度
达到200倍的实时性
clock.time=
以秒为单位的时间。通常时间设置为1970-01-01t00:00:00.000000-Unix纪元-使用time.time()
clock.clock_speed=1.000000
如果时钟设置为在clock.clock_speed=none的情况下运行,则调度程序 将要求时钟进行跳转,因此其行为如下:(伪代码):
class HelloMessage(AgentMessage):
def __init__(self, sender, receiver)
super().__init__(sender=sender, receiver=receiver)
class myAgent(Agent):
def __init__(self):
super().__init__()
self.operations.update({HelloMessage.__name__: self.hello})
def update(self):
while self.messages:
msg = self.receive()
operation = self.operations.get(msg.topic))
if operation is not None:
operation(msg)
else:
self.logger.debug("%s: don't know what to do with: %s" % (self.uuid), str(msg)))
def hello(self, msg)
print(msg)
3
要在模拟过程中调整计时(无论出于何种原因),请 调度程序应准备好消息:
- 以最高速度运行:
self.将新时钟速度设置为定时事件(开始时间=现在(),速度=无)
- 在模拟中将时钟设置为10倍速度1小时:
设置运行时(开始时间=现在()+1*60*60,速度=10)
这将需要6分钟的实时时间。 - 在模拟中将时钟设置为1X(实时)速度3小时:
设置运行时(开始时间=现在()+3*60*60,速度=1)
这将需要1小时的实时时间。 - 将时钟设置为模拟中4小时的10倍速度:
设置运行时(开始时间=现在()+4*60*60,速度=10)
- 将时钟设置为"无",以便在模拟的其余部分尽可能快地运行:
设置运行时(开始时间=现在()+6*60*60,速度=无)
注意:时钟速度可以在调度程序中设置为参数run
函数:
class HelloMessage(AgentMessage):
def __init__(self, sender, receiver)
super().__init__(sender=sender, receiver=receiver)
class myAgent(Agent):
def __init__(self):
super().__init__()
self.operations.update({HelloMessage.__name__: self.hello})
def update(self):
while self.messages:
msg = self.receive()
operation = self.operations.get(msg.topic))
if operation is not None:
operation(msg)
else:
self.logger.debug("%s: don't know what to do with: %s" % (self.uuid), str(msg)))
def hello(self, msg)
print(msg)
4
…