蟒蛇实现

PyRSMQ的Python项目详细描述


rsmq:redis simple message queue for node.js

构建状态coverage statuspypi version

redis简单消息队列

python的轻量级消息队列,不需要专用的队列服务器。只是一个redis服务器。

这是http://github.com/smrchy/rsmq" rel="nofollow">https://github.com/smrchy/rsmq)

的python实现

Pyrsmq发行说明

  • 0.4.1

    • 在consumer中为来自json的消息添加自动解码选项(如果可能)(默认为打开)
  • 0.4.0

    • 能够从包而不是模块导入redismq(即现在可以使用rsmq import redismq中的,而不是rsmq.rsmq import redismq中的
    • 向大多数命令添加quiet选项,以便在禁用异常时隐藏错误
    • 附加单元测试
    • 将非字符串消息自动编码为json以发送消息
    • 添加redismqconsumerredismqconsumerthread以便更容易地创建队列消费者
    • 为简单的生产者/消费者添加示例
  • 0.3.1

    • 修复消息ID生成匹配rsmq算法
  • 0.3.0

    • 使消息ID生成与rsmq算法匹配
    • 允许队列名称中除
    • 之外的任何字符
  • 0.2.1

    • 允许队列名称中使用大写字符
  • 0.2.0-添加Python2支持

    • 一些python 2支持
    • 一些单元测试
    • .exec()更改为.execute()以获得P2兼容性
  • 0.1.0-初始版本

    • 初始端口
    • 缺少"实时"模式
    • 缺少单元测试

rsmq简介

rsmq试图模拟amazon的类似sqs的功能,其中有一个命名队列(name 由redis支持的"namespace"和"qname")组成。使用前必须创建队列。 一旦创建,生产者将把消息放入队列,消费者将检索它们。 消息具有"可见性"属性,在该属性中可以使用任何"可见"消息,但是 "Invisible"消息将一直保留在队列中,直到它们变为可见或被删除。

一旦队列存在,生产者就可以将消息推送到队列中。当推送到队列时,消息得到一个 用于跟踪邮件的唯一ID。该id可用于由producer消费者或控制其"可见性"

在插入过程中,消息可能具有与其相关联的延迟。""延迟"将标记消息 在指定的延迟时间内"不可见",从而防止它被消耗。延迟可能 在消息创建时指定,如果未指定,则在队列属性中设置默认值 使用。

消费者将重试e通过receiveMessage()popMessage()进入队列的下一条消息 命令。如果我们不关心消息传递以外的可靠性,一个简单的好方法 通过popMessage()检索。当使用popMessage()时 在接收的同时删除。

但是,在许多情况下,我们希望确保消息不仅被接收,而且 在被删除之前处理。为此,receiveMessage()是最佳选择。使用receiveMessage()时, 消息保留在队列中,但在一段时间内标记为"不可见"。数量 时间由队列属性vt(可见性超时)指定,也可以由 在receiveMessage()调用中指定自定义vt值。使用receiveMessage()时, consumer'负责在vt超时发生之前删除消息,否则 消息可能会被另一个消费者拾取。消费者还可以根据需要延长超时时间 更多时间,如果处理失败,请清除超时。

使用rsmq队列时,可以指定"实时"模式。"实时"模式添加一个redis pubsub 基于此的通知,允许在将新消息添加到 排队。这可以消除消费者在队列为空时不断轮询队列的需要 (注意:截至本文撰写时,"realtime"尚未在python版本中实现)

python实现说明

注意此项目是为python 3.x编写的。 我不确定在python 2下它会有多稳定

此版本基于Java版本(,而这又是 基于node.js的原始版本。

API

首先,尽最大努力维护两个版本的相同方法/参数/用法 (不可否认,这导致了一个不怎么pythonic的api)

尽管许多原始的api仍然存在,但是添加了一些替代的api以使生命有一点活力 更容易。

例如,虽然可以使用"setter"方法将任何可用参数设置为command,但可以 也可以在创建命令时简单地指定参数。所以这两个命令执行相同的操作:

rqsm.createQueue().qname("my-queue").vt(20).execute()

rqsm.createQueue(qname="my-queue", vt=20).execute()

此外,在创建主控制器时,指定的任何非控制器参数都将变为 通过此控制器创建的所有命令的默认值-例如,如果计划工作 如果只有一个队列使用此控制器,则可以在创建 控制器,不需要在每个命令中指定它。

"消费者"服务实用程序

除了原始rsmq项目中的所有api之外,一个简单易用的消费者实现 包含在这个项目中,作为redismqconsumerredismqconsumerthread类。

重新发现消费者

redismqconsumer实例包装了一个rsmq控制器,并配置了一个处理器方法 每次接收到新消息时调用。processor方法返回true或false 指示邮件是否已成功接收,以及邮件是否已被删除或返回到 在此基础上排队。只要处理器是 运行,减少了处理时间过长和 可见性超时已过。

注意:由于当前未实现实时功能,因此消费者实现是 当前正在使用轮询检查队列项目。

示例用法:

from rsmq.consumer import RedisSMQConsumer

# define Processor
def processor(id, message, rc, ts):
  ''' process the message '''
  # Do something
  return True

# create consumer
consumer = RedisSMQConsumer('my-queue', processor, host='127.0.0.1')

# run consumer
consumer.run()

有关更完整的示例,请参见示例目录。

重新发现qconsumerthread

重新查找qconsumerthread是否SIM卡提供一个扩展线程类的redismqconsumer版本。

创建之后,您可以像其他线程一样启动它,或者使用stop(wait)方法停止它,其中 wait指定在返回之前等待线程停止的最长时间(线程将仍然 如果wait时间已过,请尝试停止)

注意,线程默认设置为一个守护进程线程,因此在退出主线程时 将被停止。如果要禁用守护进程标志,只需在启动线程之前禁用它 使用任何其他线程

示例用法:

from rsmq.consumer import RedisSMQConsumerThread

# define Processor
def processor(id, message, rc, ts):
  ''' process the message '''
  # Do something
  return True

# create consumer
consumer = RedisSMQConsumerThread('my-queue', processor, host='127.0.0.1')

# start consumer
consumer.start()

# do what else you need to, then stop the consumer
# (waiting for 10 seconds for it to stop):
consumer.stop(10)

有关更完整的示例,请参见示例目录。

一般使用方法

正如从其他版本复制的那样,一般的方法是创建一个控制器对象并使用它 对象来创建、配置和执行命令

错误处理

命令遵循其他版本的模式,并在出错时引发异常。

异常都在扩展redismqexception()并包括:

  • invalidParameterValue()-指定的参数无效
  • queue already exists()-尝试创建已存在的队列
  • queuedoesnotexist()-尝试使用/删除不存在的队列
  • nomessageinqueue()-尝试从没有可见消息的队列中检索消息

但是,如果不希望使用异常,可以在每个命令或 通过对相关对象使用.exceptions(false),以每个控制器为基础。例如, 只有当队列不存在而不引发异常时,以下操作才会创建队列:

rsmq.createQueue().exceptions(False).execute()

用法

示例用法

在本例中,我们将创建一个名为"my queue"的新队列,删除以前的版本,如果 存在,然后以2秒的延迟发送消息。然后我们将展示 延迟前的消息过期,超时后获取消息

from pprint import pprint
import time

from rsmq import RedisSMQ


# Create controller.
# In this case we are specifying the host and default queue name
queue = RedisSMQ(host="127.0.0.1", qname="myqueue")


# Delete Queue if it already exists, ignoring exceptions
queue.deleteQueue().exceptions(False).execute()

# Create Queue with default visibility timeout of 20 and delay of 0
# demonstrating here both ways of setting parameters
queue.createQueue(delay=0).vt(20).execute()

# Send a message with a 2 second delay
message_id = queue.sendMessage(delay=2).message("Hello World").execute()

pprint({'queue_status': queue.getQueueAttributes().execute()})

# Try to get a message - this will not succeed, as our message has a delay and no other
# messages are in the queue
msg = queue.receiveMessage().exceptions(False).execute()

# Message should be False as we got no message
pprint({"Message": msg})

print("Waiting for our message to become visible")
# Wait for our message to become visible
time.sleep(2)

pprint({'queue_status': queue.getQueueAttributes().execute()})
# Get our message
msg = queue.receiveMessage().execute()

# Message should now be there
pprint({"Message": msg})

# Delete Message
queue.deleteMessage(id=msg['id'])

pprint({'queue_status': queue.getQueueAttributes().execute()})
# delete our queue
queue.deleteQueue().execute()

# No action
queue.quit()

redismq控制器api用法

用法:rsmq.rqsm.redismq([选项])

  • 选项(所有选项都作为关键字选项提供):
    • redis连接参数:
      • 客户端-提供一个现有的、已配置的redis客户端 或
      • 主机-redis主机名(默认值:127.0.0.1
      • 端口-redis端口(默认值:6379
      • 选项-附加的redis客户端选项。默认值:
        • 编码UTF-8
        • 解码U响应
    • 控制器选项
      • ns-名称空间-所有redis键前面都有<;ns>;:。默认值:rsmq
      • 实时-如果设置为true,则启用实时选项。默认值:false
      • 异常-如果设置为true,则抛出所有命令的异常。默认值:true
    • 默认命令选项。其他任何内容都作为默认值传递给每个命令。实例:
      • qname-默认队列名称

控制器方法
  • 异常(真/假)-启用/禁用异常
  • setclient(client)-指定新的redis client对象
  • ns(命名空间)-设置新命名空间
  • quit()-断开与redis的连接。这主要是为了与其他版本兼容。做得不多

控制器命令
  • create queue()-创建新队列

    • 参数:
      • qname-(必需)队列的名称
      • vt-默认可见性超时(秒)。默认值:30
      • 延迟-默认延迟(插入时的可见性超时)。默认值:0
      • <代码>最大值ze-最大消息大小(1024-65535,默认值:65535)
      • quiet-如果设置为true并且禁用异常,则不生成错误日志条目
    • 返回
      • true如果创建了队列
  • deleteQueue()-删除现有队列

    • 参数:
      • qname-(必需)队列的名称
      • quiet-如果设置为true并且禁用异常,则不生成错误日志条目
    • 返回
      • 如果队列已删除
  • setQueueAttributes()-更新队列属性。如果未指定值,则不会更新该值。

    • 参数:
      • qname-(必需)队列的名称
      • vt-默认可见性超时(秒)。默认值:30
      • 延迟-默认延迟(插入时的可见性超时)。默认值:0
      • 最大大小-最大消息大小(1024-65535,默认值:65535)
      • quiet-如果设置为true并且禁用异常,则不生成错误日志条目
    • 返回
      • getQueueAttributes()调用的输出
  • getQueueAttributes()-获取队列属性和统计信息

    • 参数:
      • qname-(必需)队列的名称
      • quiet-如果设置为true并且禁用异常,则不生成错误日志条目
    • 返回包含以下字段的词典:
      • vt-默认可见性超时
      • 延迟-默认插入延迟
      • 最大大小-消息的最大大小
      • totalRecv-已使用的消息数。注意,每次检索邮件时,此值都会递增,因此,如果邮件未被删除并再次显示,则会在此处多次显示。
      • TotalSent-发送到队列的消息数。
      • 已创建-创建队列时的Unix时间戳(从epoch开始的秒数)
      • 已修改-上次更新队列的Unix时间戳(从epoch开始的秒数)
      • msgs-当前队列中的消息总数
      • hiddenmsgs-队列中不可见的消息数
  • listQueues()-列出此命名空间中的所有队列

    • 参数:
    • 返回
      • 此命名空间中的所有队列名称均为set()
  • 更改消息可见性()-更改消息可见性

    • 参数:
      • qname-(必需)队列的名称
      • id-(必需)消息id
      • quiet-如果设置为true并且禁用异常,则不生成错误日志条目
      • ????
    • 返回
        ????
  • sendMessage()-将消息发送到队列中

    • 参数:
      • qname-(必需)队列的名称
      • 消息-(必需)消息ID
      • 延迟-可选覆盖此消息的延迟(如果未指定,则使用队列的默认值)
      • quiet-如果设置为true并且禁用异常,则不生成错误日志条目
      • 编码如果设置为,则强制将消息编码为json字符串。如果为false,请尝试自动检测是否需要对消息进行编码
    • 返回
      • 已发送消息的消息ID
  • receiveMessage()-从队列接收消息并将其标记为不可见

    • 参数:
      • qname-(必需)队列的名称
      • vt-可选覆盖此消息的可见性超时(如果未指定,则使用队列的默认值)
      • quiet-如果设置为true并且禁用异常,则不生成错误日志条目
    • 返回以下字段的字典:
      • id-消息id
      • 消息-消息内容
      • rc-接收计数-接收此消息的次数
      • ts-最初发送消息的Unix时间戳
  • popMessage()-从队列接收消息并将其从队列中删除

    • 参数:
      • qname-(必需)队列的名称
      • quiet-如果设置为true并且禁用异常,则不生成错误日志条目
    • 返回以下字段的字典:
      • id-消息id
      • 消息-消息内容
      • rc-接收计数-接收此消息的次数
      • ts-最初发送消息的Unix时间戳
  • delete message()-从队列中删除消息

    • 参数:
      • qname-(必需)队列的名称
      • id-(必需)消息id
      • quiet-如果设置为true并且禁用异常,则不生成错误日志条目
    • 返回
      • 如果邮件已删除

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java您的SQL语法有错误;检查与您的MySQL服务器版本对应的手册,以了解使用接近“”的正确语法?其中userid   java httpPost方法   部署旧的Spring Boot应用程序版本时java自动回滚Liquibase迁移   java Gson,动态InstanceCreator   用Java在同一页上打印2个JTable   linux如何从Bash shell脚本优雅地关闭Java服务?   ls中的java错误(envir=envir,all.names=private):R中的'envir'参数无效   java Android线程无法注销SensorEventListener   java将Json对象转换为hashmap?   java如何找到从JVM输入的数组的模式?   java算法KMeans   带有mockito的java模拟私有字段未显示此类字段异常   安卓 java。木卫一。FileNotFoundException:/storage/simulated/0/Notes/fact50Result。txt:打开失败的eNONT(没有这样的文件或目录)   java如何在tomcat中向truststore添加多个CA   java在Installrapp中自动配置IOS应用程序。通用域名格式   java列索引超出范围:1,列数:0   java MPAndroidCharts解析日期时间Xaxis   java从数组中查找元素的最小绝对和