rabbitmq kafka redis上构建任务队列的统一框架
cocotask的Python项目详细描述
#在rabbitmq、kafka或redis上构建任务队列!很简单!容易的!快点!!!!
**为什么要创建这个框架?**
*大多数使用rabbitmq/kafka/redis的任务队列都在做同样的事情,但是没有统一的包装器。如果您只需要一个任务队列来分发作业,那么为不同的mq编写代码是浪费时间
*无需担心使用rabbitmq、kafka或redis。选一个就走!如果你想切换到一个不同的底层系统,只需几行配置更改就可以了
*隐藏连接/订阅/发布等所有细节。你只需对任何kafka/rabbitmq/redis使用相同的api!
*尽量减少团队成员手工编写用于Exchange/队列处理的代码的工作量
*团队成员应关注如何处理消息
*缺少良好的管理工具/lib来处理创建多个消费者(尽管很简单)
*最初的想法是使用芹菜,但它不支持Kafka和Windows平台。它还与python紧密耦合。
<;hr>;
在本地计算机上安装rabbitmq或kafka(docker或pure rabbitmq)。
-rabbitmq:https://www.rabbitmq.com/
-kafka:https://kafka.apache.org/quickstart(对于kafka,必须手动创建名为“test\u topic\u 1”的主题才能运行测试。为了尝试多个使用者,您需要将分区设置为2或以上,而不是1)
-redis:https://redis.io/download
2。确保它在上面运行python 3.5
3。pip安装pika kafka python redis jsmin
4。pip安装cocotask
5。现在您的机器上已经安装了cocotask。要测试,可以转到./test文件夹
-run:`python producer_test.py`(这将发布一个字符串。代码非常简单)
-在另一个窗口的./test下,运行:**`cocotask./config.json userworkers test worker 4`**
您将看到worker启动并处理我们刚刚发布的1条消息。
6。所以关键是我们如何使用cocotask命令工具。参数为:
-config-path:path to config file
-module-name:module of your customized worker class
-class-name:your own worker class的类名(在上面的示例中,它是userworkers.testworker,因此模块名是userworkers,类名是testworker)
-number of worker s:customer workers
-logging level(可选):python日志级别信息/调试/etc.
-module path(可选):查找您定义的模块的相对路径。默认值为“.”
7。开发自己的工人阶级,并尝试从Cocotask Import CocobaseWorker创建自己的工人阶级:
def process(self,正文:
打印(正文)
```
检查userworkers/test_worker.py以供参考。
**从cocotask import cococoproducermanager以pm身份发布消息**
```
“r”)作为f:
config=json.load(f)
producer=pm.create_instance(config)
producer.send('12345678')
producer.close()
`````
**在test/config.json中从rabbitmq切换到kafka或反向**
```
{{
“mq类型”:“rmq”,#如果您的底层mq是kafka
“rmq”:{
…
},
“kafka”:{
…
},
“redis”:{
…
}
`````
**我们确实支持kafka的sasl明文和redis中的简单身份验证,如配置文件的注释所示。查看他们的网站以了解如何设置身份验证**
只要包含必需的字段,就可以将自己的dictionary对象构建为配置。简单明了。
**为什么要创建这个框架?**
*大多数使用rabbitmq/kafka/redis的任务队列都在做同样的事情,但是没有统一的包装器。如果您只需要一个任务队列来分发作业,那么为不同的mq编写代码是浪费时间
*无需担心使用rabbitmq、kafka或redis。选一个就走!如果你想切换到一个不同的底层系统,只需几行配置更改就可以了
*隐藏连接/订阅/发布等所有细节。你只需对任何kafka/rabbitmq/redis使用相同的api!
*尽量减少团队成员手工编写用于Exchange/队列处理的代码的工作量
*团队成员应关注如何处理消息
*缺少良好的管理工具/lib来处理创建多个消费者(尽管很简单)
*最初的想法是使用芹菜,但它不支持Kafka和Windows平台。它还与python紧密耦合。
<;hr>;
在本地计算机上安装rabbitmq或kafka(docker或pure rabbitmq)。
-rabbitmq:https://www.rabbitmq.com/
-kafka:https://kafka.apache.org/quickstart(对于kafka,必须手动创建名为“test\u topic\u 1”的主题才能运行测试。为了尝试多个使用者,您需要将分区设置为2或以上,而不是1)
-redis:https://redis.io/download
2。确保它在上面运行python 3.5
3。pip安装pika kafka python redis jsmin
4。pip安装cocotask
5。现在您的机器上已经安装了cocotask。要测试,可以转到./test文件夹
-run:`python producer_test.py`(这将发布一个字符串。代码非常简单)
-在另一个窗口的./test下,运行:**`cocotask./config.json userworkers test worker 4`**
您将看到worker启动并处理我们刚刚发布的1条消息。
6。所以关键是我们如何使用cocotask命令工具。参数为:
-config-path:path to config file
-module-name:module of your customized worker class
-class-name:your own worker class的类名(在上面的示例中,它是userworkers.testworker,因此模块名是userworkers,类名是testworker)
-number of worker s:customer workers
-logging level(可选):python日志级别信息/调试/etc.
-module path(可选):查找您定义的模块的相对路径。默认值为“.”
7。开发自己的工人阶级,并尝试从Cocotask Import CocobaseWorker创建自己的工人阶级:
def process(self,正文:
打印(正文)
```
检查userworkers/test_worker.py以供参考。
**从cocotask import cococoproducermanager以pm身份发布消息**
```
“r”)作为f:
config=json.load(f)
producer=pm.create_instance(config)
producer.close()
`````
**在test/config.json中从rabbitmq切换到kafka或反向**
```
{{
“mq类型”:“rmq”,#如果您的底层mq是kafka
“rmq”:{
…
},
“kafka”:{
…
},
“redis”:{
…
}
`````
**我们确实支持kafka的sasl明文和redis中的简单身份验证,如配置文件的注释所示。查看他们的网站以了解如何设置身份验证**
只要包含必需的字段,就可以将自己的dictionary对象构建为配置。简单明了。