蟒蛇实现
PyRSMQ的Python项目详细描述
redis简单消息队列
python的轻量级消息队列,不需要专用的队列服务器。只是一个redis服务器。
这是http://github.com/smrchy/rsmq" rel="nofollow">https://github.com/smrchy/rsmq)
的python实现Pyrsmq发行说明
0.4.1
- 在consumer中为来自json的消息添加自动解码选项(如果可能)(默认为打开)
0.4.0
- 能够从包而不是模块导入
redismq
(即现在可以使用rsmq import redismq中的,而不是rsmq.rsmq import redismq中的
)
- 向大多数命令添加quiet选项,以便在禁用异常时隐藏错误
- 附加单元测试
- 将非字符串消息自动编码为json以发送消息
- 添加
redismqconsumer
和redismqconsumerthread
以便更容易地创建队列消费者 - 为简单的生产者/消费者添加示例
- 能够从包而不是模块导入
0.3.1
- 修复消息ID生成匹配rsmq算法
0.3.0
- 使消息ID生成与rsmq算法匹配
- 允许队列名称中除
:
之外的任何字符
0.2.1
- 允许队列名称中使用大写字符
0.2.0-添加Python2支持
- 一些python 2支持
- 一些单元测试
- 将
.exec()
更改为.execute()
以获得P2兼容性
0.1.0-初始版本
- 初始端口
- 缺少"实时"模式
- 缺少单元测试
rsmq简介
rsmq试图模拟amazon的类似sqs的功能,其中有一个命名队列(name 由redis支持的"namespace"和"qname")组成。使用前必须创建队列。 一旦创建,生产者将把消息放入队列,消费者将检索它们。 消息具有"可见性"属性,在该属性中可以使用任何"可见"消息,但是 "Invisible"消息将一直保留在队列中,直到它们变为可见或被删除。
一旦队列存在,生产者就可以将消息推送到队列中。当推送到队列时,消息得到一个 用于跟踪邮件的唯一ID。该id可用于由producer或 消费者或控制其"可见性"
在插入过程中,消息可能具有与其相关联的延迟
。""延迟"将标记消息
在指定的延迟时间内"不可见",从而防止它被消耗。延迟可能
在消息创建时指定,如果未指定,则在队列属性中设置默认值
使用。
消费者将重试e通过receiveMessage()
或popMessage()
进入队列的下一条消息
命令。如果我们不关心消息传递以外的可靠性,一个简单的好方法
通过popMessage()
检索。当使用popMessage()时
在接收的同时删除。
但是,在许多情况下,我们希望确保消息不仅被接收,而且
在被删除之前处理。为此,receiveMessage()
是最佳选择。使用receiveMessage()时,
消息保留在队列中,但在一段时间内标记为"不可见"。数量
时间由队列属性
vt
(可见性超时)指定,也可以由
在receiveMessage()
调用中指定自定义vt
值。使用receiveMessage()时,
consumer'负责在
vt
超时发生之前删除消息,否则
消息可能会被另一个消费者拾取。消费者还可以根据需要延长超时时间
更多时间,如果处理失败,请清除超时。
使用rsmq队列时,可以指定"实时"模式。"实时"模式添加一个redis pubsub 基于此的通知,允许在将新消息添加到 排队。这可以消除消费者在队列为空时不断轮询队列的需要 (注意:截至本文撰写时,"realtime"尚未在python版本中实现)
python实现说明
注意此项目是为python 3.x编写的。 我不确定在python 2下它会有多稳定
此版本基于Java版本(
API
首先,尽最大努力维护两个版本的相同方法/参数/用法 (不可否认,这导致了一个不怎么pythonic的api)
尽管许多原始的api仍然存在,但是添加了一些替代的api以使生命有一点活力 更容易。
例如,虽然可以使用"setter"方法将任何可用参数设置为command,但可以 也可以在创建命令时简单地指定参数。所以这两个命令执行相同的操作:
rqsm.createQueue().qname("my-queue").vt(20).execute()
rqsm.createQueue(qname="my-queue", vt=20).execute()
此外,在创建主控制器时,指定的任何非控制器参数都将变为 通过此控制器创建的所有命令的默认值-例如,如果计划工作 如果只有一个队列使用此控制器,则可以在创建 控制器,不需要在每个命令中指定它。
"消费者"服务实用程序
除了原始rsmq项目中的所有api之外,一个简单易用的消费者实现
包含在这个项目中,作为redismqconsumer
和redismqconsumerthread
类。
重新发现消费者
redismqconsumer
实例包装了一个rsmq控制器,并配置了一个处理器方法
每次接收到新消息时调用。processor方法返回true或false
指示邮件是否已成功接收,以及邮件是否已被删除或返回到
在此基础上排队。只要处理器是
运行,减少了处理时间过长和
可见性超时已过。
注意:由于当前未实现实时
功能,因此消费者实现是
当前正在使用轮询检查队列项目。
示例用法:
from rsmq.consumer import RedisSMQConsumer
# define Processor
def processor(id, message, rc, ts):
''' process the message '''
# Do something
return True
# create consumer
consumer = RedisSMQConsumer('my-queue', processor, host='127.0.0.1')
# run consumer
consumer.run()
有关更完整的示例,请参见示例目录。
重新发现qconsumerthread
重新查找qconsumerthread
是否SIM卡提供一个扩展线程类的redismqconsumer
版本。
创建之后,您可以像其他线程一样启动它,或者使用stop(wait)
方法停止它,其中
wait指定在返回之前等待线程停止的最长时间(线程将仍然
如果wait
时间已过,请尝试停止)
注意,线程默认设置为一个守护进程
线程,因此在退出主线程时
将被停止。如果要禁用守护进程标志,只需在启动线程之前禁用它
使用任何其他线程
示例用法:
from rsmq.consumer import RedisSMQConsumerThread
# define Processor
def processor(id, message, rc, ts):
''' process the message '''
# Do something
return True
# create consumer
consumer = RedisSMQConsumerThread('my-queue', processor, host='127.0.0.1')
# start consumer
consumer.start()
# do what else you need to, then stop the consumer
# (waiting for 10 seconds for it to stop):
consumer.stop(10)
有关更完整的示例,请参见示例目录。
一般使用方法
正如从其他版本复制的那样,一般的方法是创建一个控制器对象并使用它 对象来创建、配置和执行命令
错误处理
命令遵循其他版本的模式,并在出错时引发异常。
异常都在扩展redismqexception()并包括:
invalidParameterValue()
-指定的参数无效queue already exists()
-尝试创建已存在的队列queuedoesnotexist()
-尝试使用/删除不存在的队列nomessageinqueue()
-尝试从没有可见消息的队列中检索消息
但是,如果不希望使用异常,可以在每个命令或
通过对相关对象使用.exceptions(false)
,以每个控制器为基础。例如,
只有当队列不存在而不引发异常时,以下操作才会创建队列:
rsmq.createQueue().exceptions(False).execute()
用法
示例用法
在本例中,我们将创建一个名为"my queue"的新队列,删除以前的版本,如果 存在,然后以2秒的延迟发送消息。然后我们将展示 延迟前的消息过期,超时后获取消息
from pprint import pprint
import time
from rsmq import RedisSMQ
# Create controller.
# In this case we are specifying the host and default queue name
queue = RedisSMQ(host="127.0.0.1", qname="myqueue")
# Delete Queue if it already exists, ignoring exceptions
queue.deleteQueue().exceptions(False).execute()
# Create Queue with default visibility timeout of 20 and delay of 0
# demonstrating here both ways of setting parameters
queue.createQueue(delay=0).vt(20).execute()
# Send a message with a 2 second delay
message_id = queue.sendMessage(delay=2).message("Hello World").execute()
pprint({'queue_status': queue.getQueueAttributes().execute()})
# Try to get a message - this will not succeed, as our message has a delay and no other
# messages are in the queue
msg = queue.receiveMessage().exceptions(False).execute()
# Message should be False as we got no message
pprint({"Message": msg})
print("Waiting for our message to become visible")
# Wait for our message to become visible
time.sleep(2)
pprint({'queue_status': queue.getQueueAttributes().execute()})
# Get our message
msg = queue.receiveMessage().execute()
# Message should now be there
pprint({"Message": msg})
# Delete Message
queue.deleteMessage(id=msg['id'])
pprint({'queue_status': queue.getQueueAttributes().execute()})
# delete our queue
queue.deleteQueue().execute()
# No action
queue.quit()
redismq控制器api用法
用法:rsmq.rqsm.redismq([选项])
- 选项(所有选项都作为关键字选项提供):
- redis连接参数:
客户端
-提供一个现有的、已配置的redis客户端 或主机
-redis主机名(默认值:127.0.0.1
)端口
-redis端口(默认值:6379
)选项
-附加的redis客户端选项。默认值:编码
:UTF-8
解码U响应
:真
- 控制器选项
ns
-名称空间-所有redis键前面都有<;ns>;:
。默认值:rsmq
实时
-如果设置为true,则启用实时选项。默认值:false
异常
-如果设置为true,则抛出所有命令的异常。默认值:true
- 默认命令选项。其他任何内容都作为默认值传递给每个命令。实例:
qname
-默认队列名称
- redis连接参数:
控制器方法
异常(真/假)
-启用/禁用异常setclient(client)
-指定新的redis client对象ns(命名空间)
-设置新命名空间
quit()
-断开与redis的连接。这主要是为了与其他版本兼容。做得不多
控制器命令
create queue()
-创建新队列
- 参数:
qname
-(必需)队列的名称
vt
-默认可见性超时(秒)。默认值:30
延迟
-默认延迟(插入时的可见性超时)。默认值:0
- <代码>最大值ze-最大消息大小(1024-65535,默认值:65535)
quiet
-如果设置为true
并且禁用异常,则不生成错误日志条目
- 返回:
true
如果创建了队列
deleteQueue()
-删除现有队列
- 参数:
qname
-(必需)队列的名称
quiet
-如果设置为true
并且禁用异常,则不生成错误日志条目
- 返回:
真
如果队列已删除
setQueueAttributes()
-更新队列属性。如果未指定值,则不会更新该值。
- 参数:
qname
-(必需)队列的名称
vt
-默认可见性超时(秒)。默认值:30
延迟
-默认延迟(插入时的可见性超时)。默认值:0
最大大小
-最大消息大小(1024-65535,默认值:65535)quiet
-如果设置为true
并且禁用异常,则不生成错误日志条目
- 返回:
getQueueAttributes()
调用的输出
getQueueAttributes()
-获取队列属性和统计信息
- 参数:
qname
-(必需)队列的名称
quiet
-如果设置为true
并且禁用异常,则不生成错误日志条目
- 返回包含以下字段的词典:
vt
-默认可见性超时延迟
-默认插入延迟最大大小
-消息的最大大小totalRecv
-已使用的消息数。注意,每次检索邮件时,此值都会递增,因此,如果邮件未被删除并再次显示,则会在此处多次显示。TotalSent
-发送到队列的消息数。已创建
-创建队列时的Unix时间戳(从epoch开始的秒数)
已修改
-上次更新队列的Unix时间戳(从epoch开始的秒数)
msgs
-当前队列中的消息总数
hiddenmsgs
-队列中不可见的消息数
listQueues()
-列出此命名空间中的所有队列
- 参数:
- 返回:
- 此命名空间中的所有队列名称均为
set()
更改消息可见性()
-更改消息可见性
- 参数:
qname
-(必需)队列的名称
id
-(必需)消息idquiet
-如果设置为true
并且禁用异常,则不生成错误日志条目????
- 返回:
????
异常(真/假)
-启用/禁用异常setclient(client)
-指定新的redis client对象ns(命名空间)
-设置新命名空间
quit()
-断开与redis的连接。这主要是为了与其他版本兼容。做得不多create queue()
-创建新队列- 参数:
qname
-(必需)队列的名称vt
-默认可见性超时(秒)。默认值:30
延迟
-默认延迟(插入时的可见性超时)。默认值:0
- <代码>最大值ze-最大消息大小(1024-65535,默认值:65535)
quiet
-如果设置为true
并且禁用异常,则不生成错误日志条目
- 返回:
true
如果创建了队列
- 参数:
deleteQueue()
-删除现有队列- 参数:
qname
-(必需)队列的名称quiet
-如果设置为true
并且禁用异常,则不生成错误日志条目
- 返回:
真
如果队列已删除
- 参数:
setQueueAttributes()
-更新队列属性。如果未指定值,则不会更新该值。- 参数:
qname
-(必需)队列的名称vt
-默认可见性超时(秒)。默认值:30
延迟
-默认延迟(插入时的可见性超时)。默认值:0
最大大小
-最大消息大小(1024-65535,默认值:65535)quiet
-如果设置为true
并且禁用异常,则不生成错误日志条目
- 返回:
getQueueAttributes()
调用的输出
- 参数:
getQueueAttributes()
-获取队列属性和统计信息- 参数:
qname
-(必需)队列的名称quiet
-如果设置为true
并且禁用异常,则不生成错误日志条目
- 返回包含以下字段的词典:
vt
-默认可见性超时延迟
-默认插入延迟最大大小
-消息的最大大小totalRecv
-已使用的消息数。注意,每次检索邮件时,此值都会递增,因此,如果邮件未被删除并再次显示,则会在此处多次显示。TotalSent
-发送到队列的消息数。已创建
-创建队列时的Unix时间戳(从epoch开始的秒数)已修改
-上次更新队列的Unix时间戳(从epoch开始的秒数)msgs
-当前队列中的消息总数hiddenmsgs
-队列中不可见的消息数
- 参数:
listQueues()
-列出此命名空间中的所有队列- 参数:
- 返回:
- 此命名空间中的所有队列名称均为
set()
- 此命名空间中的所有队列名称均为
更改消息可见性()
-更改消息可见性- 参数:
qname
-(必需)队列的名称id
-(必需)消息idquiet
-如果设置为true
并且禁用异常,则不生成错误日志条目
????
- 参数:
- 返回:
????
sendMessage()
-将消息发送到队列中
- 参数:
qname
-(必需)队列的名称消息
-(必需)消息ID延迟
-可选覆盖此消息的延迟
(如果未指定,则使用队列的默认值)quiet
-如果设置为true
并且禁用异常,则不生成错误日志条目编码
如果设置为真
,则强制将消息编码为json字符串。如果为false,请尝试自动检测是否需要对消息进行编码
- 返回
- 已发送消息的消息ID
receiveMessage()
-从队列接收消息并将其标记为不可见
- 参数:
qname
-(必需)队列的名称vt
-可选覆盖此消息的可见性超时(如果未指定,则使用队列的默认值)quiet
-如果设置为true
并且禁用异常,则不生成错误日志条目
- 返回以下字段的字典:
id
-消息id消息
-消息内容rc
-接收计数-接收此消息的次数ts
-最初发送消息的Unix时间戳
popMessage()
-从队列接收消息并将其从队列中删除
- 参数:
qname
-(必需)队列的名称quiet
-如果设置为true
并且禁用异常,则不生成错误日志条目
- 返回以下字段的字典:
id
-消息id消息
-消息内容rc
-接收计数-接收此消息的次数ts
-最初发送消息的Unix时间戳
delete message()
-从队列中删除消息
- 参数:
qname
-(必需)队列的名称id
-(必需)消息idquiet
-如果设置为true
并且禁用异常,则不生成错误日志条目
- 返回:
真
如果邮件已删除