一个完整的,基于gevent的,非tornado nsq客户端。

nsqs的Python项目详细描述


该项目正在积极开发中,文档正在演变为 单个零件。

这个项目封装了连接管理、心跳管理和 将传入消息(针对使用者)分派给处理程序。

功能

  • 全功能:
    • 快速压缩
    • 压缩放气
    • TLS压缩
    • 通过TLS进行客户端(“相互”)身份验证
  • 我们依赖于消费者定义的“分类”函数来确定 传入消息的处理程序的名称。这允许事件驱动 消费。这意味着最终用户的锅炉板要少一些。
  • RDY管理的复杂性是由图书馆自动管理的。 这些参数可以重新配置,但是nsqs强调简单性和 直觉,这样你就不必参与机械 想。
  • 标识参数可以直接指定,但许多参数是管理的 根据生产者/消费者的参数自动设置。
  • 邮件处理后在服务器上标记为“已完成” 除非我们被配置成不。
  • 对于消费者,您可以指定主题和频道对联的列表,以及 将连接到每个服务器并根据每个服务器进行订阅。 如果使用查找服务器,则会为每个 列表中的主题(如果没有查找服务器,则假定所有服务器 支持所有主题)。

实现消费者

进口和样板:

import logging
import json
import gevent

import nsq.consumer
import nsq.node_collection
import nsq.message_handler

_logger = logging.getLogger(__name__)

创建消息处理程序:

class _MessageHandler(nsq.message_handler.MessageHandler):
    def __init__(self, *args, **kwargs):
        super(_MessageHandler, self).__init__(*args, **kwargs)
        self.__processed = 0

    def message_received(self, connection, message):
        super(_MessageHandler, self).message_received(connection, message)

        try:
            self.__decoded = json.loads(message.body)
        except:
            _logger.info("Couldn't decode message. Finished: [%s]",
                         message.body)
            return

    def classify_message(self, message):
        return (self.__decoded['type'], self.__decoded)

    def handle_dummy(self, connection, message, context):
        self.__processed += 1

        if self.__processed % 1000 == 0:
            _logger.info("Processed (%d) messages.", self.__processed)

    def default_message_handler(self, message_class, connection, message,
                                classify_context):
        _logger.warning("Squashing unhandled message: [%s] [%s]",
                        message_class, message)

定义节点集合。我们在这里使用nsqlookupd服务器,但是我们可以 与nsqd服务器一起使用servernodes()一样简单:

lookup_node_prefixes = [
    'http://127.0.0.1:4161',
]

nc = nsq.node_collection.LookupNodes(lookup_node_prefixes)

创建使用者对象:

_TOPIC = 'test_topic'
_CHANNEL = 'test_channel'
_MAX_IN_FLIGHT = 500

c = nsq.consumer.Consumer(
        [(_TOPIC, _CHANNEL)],
        nc,
        _MAX_IN_FLIGHT,
        message_handler_cls=_MessageHandler)

启动消费者:

c.start()

循环。例如,只要我们至少连接到一个 服务器:

while c.is_alive:
    gevent.sleep(1)

实现生产者

进口和样板:

import logging
import json
import random

import nsq.producer
import nsq.node_collection
import nsq.message_handler

_logger = logging.getLogger(__name__)

定义节点集合。这是一个生产者,因此它只与nsqd一起工作。 节点:

server_nodes = [
    ('127.0.0.1', 4150),
]

nc = nsq.node_collection.ServerNodes(server_nodes)

创建producer对象:

_TOPIC = 'test_topic'

p = nsq.producer.Producer(_TOPIC, nc)

启动制作程序:

p.start()

发出消息:

for i in range(0, 100000, 10):
    if i % 50 == 0:
        _logger.info("(%d) messages published.", i)

    data = { 'type': 'dummy', 'data': random.random(), 'index': i }
    message = json.dumps(data)
    p.mpublish((message,) * 10)

停止制作:

p.stop()

回调

消费者和生产者都可以接受回调对象。

实例化producer的回调:

import nsq.connection_callbacks
cc = nsq.connection_callbacks.ConnectionCallbacks()

实例化consumer的回调:

import nsq.consumer
cc = nsq.consumer.ConsumerCallbacks()

然后,将对象作为 ccallbacks

以下回调方法可以为生产者或 消费者(同时确保调用原始实现):

  • 连接(连接)

    连接已建立。

  • 识别(连接)

    已处理此连接的标识响应。

  • 断开(连接)

    连接已断开。

  • 收到的消息(连接,消息)

    已收到消息。

consumer有一个附加回调:

  • rdy\u补充(连接、当前、原始)

    RDY需要更新。默认情况下,将重新提交原始RDY。 如果不需要,请重写此回调,不要调用原始回调。

脚注

  • 因为我们依赖gevent,而gevent不是 python3兼容,nsqs不兼容python3。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
如何使用java在linux上编写系统日志   如何在同一个现有变量上多次更改变量的值?(爪哇)   易失性字符串Java   java需要帮助通过PreparedStatement编写适当的搜索查询   JavaMaven项目是否获得其他Maven项目的版本?   java如何在Eclipse中使用Drool应用程序抑制信息和警告调试信息   Java中FileReader和FileInputStream的区别是什么?   java如何为此编写HQL查询?   java方法根本不返回任何内容   VLCJ通过单个java程序控制多个音频文件   java为什么这个println命令不开始一个新行?   java如何创建自己的文件扩展名。odt或。医生?   声明字符串后,java在条件语句中设置int值   通过k8s作业文件将cmd参数传递给docker容器中的java应用程序