rabbitmq kafka redis上构建任务队列的统一框架

cocotask的Python项目详细描述


#在rabbitmq、kafka或redis上构建任务队列!很简单!容易的!快点!!!!

**为什么要创建这个框架?**
*大多数使用rabbitmq/kafka/redis的任务队列都在做同样的事情,但是没有统一的包装器。如果您只需要一个任务队列来分发作业,那么为不同的mq编写代码是浪费时间
*无需担心使用rabbitmq、kafka或redis。选一个就走!如果你想切换到一个不同的底层系统,只需几行配置更改就可以了
*隐藏连接/订阅/发布等所有细节。你只需对任何kafka/rabbitmq/redis使用相同的api!
*尽量减少团队成员手工编写用于Exchange/队列处理的代码的工作量
*团队成员应关注如何处理消息
*缺少良好的管理工具/lib来处理创建多个消费者(尽管很简单)
*最初的想法是使用芹菜,但它不支持Kafka和Windows平台。它还与python紧密耦合。
<;hr>;

在本地计算机上安装rabbitmq或kafka(docker或pure rabbitmq)。
-rabbitmq:https://www.rabbitmq.com/
-kafka:https://kafka.apache.org/quickstart(对于kafka,必须手动创建名为“test\u topic\u 1”的主题才能运行测试。为了尝试多个使用者,您需要将分区设置为2或以上,而不是1)
-redis:https://redis.io/download

2。确保它在上面运行python 3.5


3。pip安装pika kafka python redis jsmin

4。pip安装cocotask

5。现在您的机器上已经安装了cocotask。要测试,可以转到./test文件夹
-run:`python producer_test.py`(这将发布一个字符串。代码非常简单)
-在另一个窗口的./test下,运行:**`cocotask./config.json userworkers test worker 4`**
您将看到worker启动并处理我们刚刚发布的1条消息。

6。所以关键是我们如何使用cocotask命令工具。参数为:
-config-path:path to config file
-module-name:module of your customized worker class
-class-name:your own worker class的类名(在上面的示例中,它是userworkers.testworker,因此模块名是userworkers,类名是testworker)
-number of worker s:customer workers
-logging level(可选):python日志级别信息/调试/etc.
-module path(可选):查找您定义的模块的相对路径。默认值为“.”

7。开发自己的工人阶级,并尝试从Cocotask Import CocobaseWorker创建自己的工人阶级:





def process(self,正文:
打印(正文)
```
检查userworkers/test_worker.py以供参考。

**从cocotask import cococoproducermanager以pm身份发布消息**
```
“r”)作为f:
config=json.load(f)


producer=pm.create_instance(config)

producer.send('12345678')
producer.close()
`````
**在test/config.json中从rabbitmq切换到kafka或反向**
```
{{
“mq类型”:“rmq”,#如果您的底层mq是kafka

“rmq”:{

},

“kafka”:{

},

“redis”:{

}

`````
**我们确实支持kafka的sasl明文和redis中的简单身份验证,如配置文件的注释所示。查看他们的网站以了解如何设置身份验证**

只要包含必需的字段,就可以将自己的dictionary对象构建为配置。简单明了。



欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
具有x86javapath的x64机器上x86java上的java JNI未满足链接错误   java将Pixmap的一部分上传到GPU   图像Java位图RLE8格式   java Android studio谷歌广告崩溃应用程序   java如何创建包含未知数量对象的变量?   Java计算给定int数组的所有可能组合   java JDBC classnotfound异常   httpclient中的java将HttpEntity转换为字符串的最优雅/正确的方法是什么?   如何从Java程序运行nano?   java在安卓中调用自定义类/方法   调用方法和JOptionPane后,允许代码继续执行所需的java计时器或其他想法   关于侦听器的向量Java并发问题   线程池执行器Java线程池   java配置DTO上的Swagger javax验证约束   Java中用于按钮功能的swing操作命令   ServletOutputStream中的java设置状态代码   java打印输入数组的平均值