一个完整的,基于gevent的,非tornado nsq客户端。

nsqs的Python项目详细描述


该项目正在积极开发中,文档正在演变为 单个零件。

这个项目封装了连接管理、心跳管理和 将传入消息(针对使用者)分派给处理程序。

功能

  • 全功能:
    • 快速压缩
    • 压缩放气
    • TLS压缩
    • 通过TLS进行客户端(“相互”)身份验证
  • 我们依赖于消费者定义的“分类”函数来确定 传入消息的处理程序的名称。这允许事件驱动 消费。这意味着最终用户的锅炉板要少一些。
  • RDY管理的复杂性是由图书馆自动管理的。 这些参数可以重新配置,但是nsqs强调简单性和 直觉,这样你就不必参与机械 想。
  • 标识参数可以直接指定,但许多参数是管理的 根据生产者/消费者的参数自动设置。
  • 邮件处理后在服务器上标记为“已完成” 除非我们被配置成不。
  • 对于消费者,您可以指定主题和频道对联的列表,以及 将连接到每个服务器并根据每个服务器进行订阅。 如果使用查找服务器,则会为每个 列表中的主题(如果没有查找服务器,则假定所有服务器 支持所有主题)。

实现消费者

进口和样板:

import logging
import json
import gevent

import nsq.consumer
import nsq.node_collection
import nsq.message_handler

_logger = logging.getLogger(__name__)

创建消息处理程序:

class _MessageHandler(nsq.message_handler.MessageHandler):
    def __init__(self, *args, **kwargs):
        super(_MessageHandler, self).__init__(*args, **kwargs)
        self.__processed = 0

    def message_received(self, connection, message):
        super(_MessageHandler, self).message_received(connection, message)

        try:
            self.__decoded = json.loads(message.body)
        except:
            _logger.info("Couldn't decode message. Finished: [%s]",
                         message.body)
            return

    def classify_message(self, message):
        return (self.__decoded['type'], self.__decoded)

    def handle_dummy(self, connection, message, context):
        self.__processed += 1

        if self.__processed % 1000 == 0:
            _logger.info("Processed (%d) messages.", self.__processed)

    def default_message_handler(self, message_class, connection, message,
                                classify_context):
        _logger.warning("Squashing unhandled message: [%s] [%s]",
                        message_class, message)

定义节点集合。我们在这里使用nsqlookupd服务器,但是我们可以 与nsqd服务器一起使用servernodes()一样简单:

lookup_node_prefixes = [
    'http://127.0.0.1:4161',
]

nc = nsq.node_collection.LookupNodes(lookup_node_prefixes)

创建使用者对象:

_TOPIC = 'test_topic'
_CHANNEL = 'test_channel'
_MAX_IN_FLIGHT = 500

c = nsq.consumer.Consumer(
        [(_TOPIC, _CHANNEL)],
        nc,
        _MAX_IN_FLIGHT,
        message_handler_cls=_MessageHandler)

启动消费者:

c.start()

循环。例如,只要我们至少连接到一个 服务器:

while c.is_alive:
    gevent.sleep(1)

实现生产者

进口和样板:

import logging
import json
import random

import nsq.producer
import nsq.node_collection
import nsq.message_handler

_logger = logging.getLogger(__name__)

定义节点集合。这是一个生产者,因此它只与nsqd一起工作。 节点:

server_nodes = [
    ('127.0.0.1', 4150),
]

nc = nsq.node_collection.ServerNodes(server_nodes)

创建producer对象:

_TOPIC = 'test_topic'

p = nsq.producer.Producer(_TOPIC, nc)

启动制作程序:

p.start()

发出消息:

for i in range(0, 100000, 10):
    if i % 50 == 0:
        _logger.info("(%d) messages published.", i)

    data = { 'type': 'dummy', 'data': random.random(), 'index': i }
    message = json.dumps(data)
    p.mpublish((message,) * 10)

停止制作:

p.stop()

回调

消费者和生产者都可以接受回调对象。

实例化producer的回调:

import nsq.connection_callbacks
cc = nsq.connection_callbacks.ConnectionCallbacks()

实例化consumer的回调:

import nsq.consumer
cc = nsq.consumer.ConsumerCallbacks()

然后,将对象作为 ccallbacks

以下回调方法可以为生产者或 消费者(同时确保调用原始实现):

  • 连接(连接)

    连接已建立。

  • 识别(连接)

    已处理此连接的标识响应。

  • 断开(连接)

    连接已断开。

  • 收到的消息(连接,消息)

    已收到消息。

consumer有一个附加回调:

  • rdy\u补充(连接、当前、原始)

    RDY需要更新。默认情况下,将重新提交原始RDY。 如果不需要,请重写此回调,不要调用原始回调。

脚注

  • 因为我们依赖gevent,而gevent不是 python3兼容,nsqs不兼容python3。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java生成随机浮点,包括两个边界   java三层体系结构风格是如何工作的?一些简单的例子   多线程可以使用线程。在Java中,在循环中使用sleep(),以便定期执行某些操作?   读取循环上的java HibernateMysql异常   java使用带有Apache Ivy的自定义存储库,未找到解析程序   filenotfoundexception在读取时出现问题。Java中的txt文件   嵌入式tomcat 8.0.21中的java Spring websocket   java为什么我需要创建一个类的引用,然后创建一个B类的对象   java Splashscreen动画在Mac OS中更新时闪烁   JavaSpring3。名为“zoneManagerDelegate”的x Bean必须是[com.ms.adsp.delegate.sapi.zoneManagerDelegate]类型,但实际上是[$Proxy20]类型   java SQLite:没有这样的专栏;不明错误   java将JTable定位到JFrame中JPanel中的(x,y)位置   java在导入组织方面面临挑战。知道。xchart*   xml读取Java中的SVG元素并跳过某些包含文本的元素   java Spring Redis问题:Redis缓存中的GetAllCacheNames不能与RedisCacheManager一起使用   java Vertex Hazelcast:集群问题   java如何编辑osgi托管服务实现使用的属性文件?   java Android活动并行启动?   java AWS Lambda用于将excel转储到数据库中