这个项目有一个发送者和一个接收者,发送者通过工人(接收者)队列上的rabbitmq发送命令,接收者使用os或python2.7执行命令。

cronio的Python项目详细描述


克罗尼奥

简介

This project has a sender and a receiver, the sender sends commands through RabbitMQ on the queue of a worker (receiver), the receiver executes them either with OS or Python2.7

Following a test I did, I discovered many issues maintaining the code, I started recode the whole thing because it was not an easy thing to understand, my plan is to finish it as soon as possible and be able to make as easy as possible to understand.

目标

  • [X]prototype-在操作系统或python中发送一些命令并执行它们,如果有日志或错误,则返回日志
  • [X]包装结构
  • [X]依赖命令,即依赖性:[1,2,3200,命令ID]
  • [X]具有一个发件人的多个工作进程,依赖于不同的工作进程来运行具有否定的任务。 即,在工人1上完成任务A,完成后在工人2上完成任务B。如果未完成,请对工作线程3执行任务C。 检查示例/sender_complex_multi_worker.py
  • []执行时间,即使用python crontab将是一件好事
  • []需要用Docker测试envs,它可以设置并从这个应用程序中读取。py

要求

  1. stomp python库
  2. sqlalchemy(使用sqlite)
  3. rabbitmq服务器(有关使用它提升容器的详细信息,请参阅myrabbitmq\文件夹。-欢迎使用!) 你可以使用我们的docker图片-默认用户名和密码是guest。 如果需要dockerfile,可以转到文件夹的存储库myrabbitmq。

安装所需的软件包(1,2)

pip install -r requirements.txt 

安装

PYPI

pip install cronio

示例

有关代码,请参见examples\directory

工人:

python worker1.py # this will start the worker process, see inline comments

发件人:

python sender_complex_multi_workers_example2.py # this has everything in it

发件人的自定义侦听器

根据您当前在监听员工消息方面的需要进行相应的修改。

WORKER_ID_1 = "worker_1"
CS_1 = CronioSender({
# To Enable Viewer Log, uncomment the below in worker and sender:
# 'CRONIO_SENDER_EXCHANGE_LOG_INFO': "cronio_log_info",
# 'CRONIO_SENDER_EXCHANGE_LOG_ERROR': "cronio_log_error",
'CRONIO_SENDER_AMQP_USERNAME': "sender1",
'CRONIO_SENDER_AMQP_PASSWORD': "somepass",
'CRONIO_SENDER_WORKER_QUEUE': "cronio_queue"+WORKER_ID_1,
'CRONIO_SENDER_AMQP_HOST': 'localhost',
'CRONIO_SENDER_AMQP_VHOST': '/',
'CRONIO_SENDER_AMQP_PORT': 61613,
'CRONIO_SENDER_AMQP_USE_SSL': False,
'CRONIO_SENDER_LOGGER_LEVEL':  logging.DEBUG, #logging.DEBUG
'CRONIO_SENDER_LOGGER_FORMATTER': '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
'CRONIO_SENDER_RUNTIME_SECONDS': 5})

class CronioSenderListener1(stomp.ConnectionListener):
	def on_error(self, headers, message):
		pprint.pprint(headers)
		pprint.pprint(message)
		CS_1.logger_sender.debug('Cronio Sender Listener received an error "%s"' % message)
	def on_message(self, headers, message):
		# Use the below to your discretion on receiving new messages
		# CS_1.logger_sender.debug(' %s' % str(headers['message-id']))
		# CS_1.logger_sender.debug(' Log %s' % str(message))
		# CS_1.logger_sender.debug('ACK Log - Remove from Queue %s' % str(headers['message-id']))
		# MESSAGE IS IN JSON
		message_obj = json.loads(message)
		pprint.pprint(message_obj)
		# if headers['subscription'] == "api_log":
		# 	pprint.pprint(message_obj)
		# if headers['subscription'] == "api_log":
			# This here is where the magic happens
			# print "API LOG =================================="
			# pprint.pprint(message_obj)
			# print "API LOG ENDS ============================="
		# else:
			# a bunch of other messages
			# print "view_log - or error"

		# remove from queue
		CS_1.conn.ack(headers['message-id'],1)

CS_1.conn.disconnect()
CS_1.conn.remove_listener('default')
CS_1.cronio_sender_listener = CronioSenderListener1() 
CS_1.initConnectSenderSTOMP()
CS_1.conn.subscribe(destination=CS_1.CRONIO_SENDER_API_LOG, id="api_log1", ack='client')

工作队列

After version 1.1.0, the worker queues are modified in a more standardized way to enable the multiworker dependancy, if you want to do such a thing!: ie.

self.CRONIO_WORKER_ID = 'worker_1'
self.CRONIO_WORKER_PREFIX = '/queue/cronio/'
self.CRONIO_WORKER_QUEUE =  self.CRONIO_WORKER_PREFIX + self.CRONIO_WORKER_ID

Hence that, the CRONIO_WORKER_QUEUE param in class and settings needs to be avoided if you want to have the multiworker dependancy to work. Otherwise we will need to add namespaces for it. Which are going a bit off topic. Ensure that you set CRONIO_WORKER_ID and CRONIO_WORKER_PREFIX on each worker and have the same CRONIO_WORKER_PREFIX in all workers. Avoid using CRONIO_WORKER_ID in the format: worker1 and worker11 otherwise you might end up having difficulty setting permissions for specific workers.

相关性检查

每个工人(即工人1)都有一个特定的日志,该日志在一个工人工作(即工人2)完成时接收通知,这是对另一个工人(即工人1)的依赖。 即

queue_cronio_workers_dependency_worker_1

执行os命令并传递一个命令id(id)

Generate cmd_ids to use for each:

import pprint
cmd_ids = [str(uuid.uuid4()),str(uuid.uuid4()),str(uuid.uuid4()),str(uuid.uuid4()),str(uuid.uuid4()),str(uuid.uuid4())]
pprint.pprint(cmd_ids)

Execute a git command and get the result in the listener

# Use those on the following commands:

# clear database of worker
CS_1.sendCMD('cleardb',WORKER_ID_1,'operation',0)

# git clone a repo
CS_1.sendCMD("git clone https://gitlab.com/doctormo/python-crontab.git", WORKER_ID_1, "os", cmd_ids[1])

or just a simple listing

#execute ls command on the current folder
CS_1.sendCMD("ls", WORKER_ID_1, "os", cmd_ids[2])

Can send files if you want to execute those:

# Absolute Path only
PythonFile = "/opt/cronio/examples/test.py"
CmdFile = "/opt/cronio/examples/test.sh"
CS_1.sendPythonFile(PythonFile,WORKER_ID_1,1)
CS_1.sendCmdFile(CmdFile,WORKER_ID_1,2)


# Clear Database of its commands
CS_1.sendCMD('cleardb',WORKER_ID_1,'operation',cmd_ids[4])

Use workflow to run on the worker.

# Workflow Example - Set of commands related with each other.
commands = [ {"cmd": "ls", "worker_id": "worker_1", type": "os", "cmd_id": 1, "dependencies": None}, {"cmd": "mkdir test_1", "worker_id": "worker_1", type": "os", "cmd_id": 2, "dependencies": None}, {"cmd": "cd test_1", "worker_id": "worker_1", type": "os", "cmd_id": 3, "dependencies": [2]},{"cmd": "print \"hello cronio\"", "worker_id": "worker_1", type": "python", "cmd_id": 4,"dependencies" : None}]
CS_1.sendWorkflow(commands)

Just add the python commands and add \n after each line

CS_1.sendCMD("iter2=[2,3,4,5,6,7]\nfor item2 in iter2:\n\tprint item2",WORKER_ID_1,"python",2)

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
Docker&SeleniumJava:无法在Docker容器上运行的chrome浏览器中上载图像/文件   在python中运行java命令   Java垃圾收集器异常行为   java java是否根据底层操作系统执行字节码级优化?   java是否可以休眠自定义查询返回映射而不是列表?   java Spring引导RabbitMQ接收器Jackson反序列化到POJO   apache flex在ActionScript3中创建对象相等“HashMap”作为java HashMap   java如何在Eclipse集成中切换JProfiler启动器   缓存JSP页面结果的java最佳实践?   java集成jaxb绑定文件,使用CXF生成基于WSDL的客户端   java为什么在上传操作结束之前,客户端没有检测到HttpServletResponse的PrintWriter内容?   java在接口内创建类和在类内创建接口有什么用   java向文件写入错误Android Studio   java合并多个RealmList并对结果列表排序?   谷歌API视觉java。lang.NoSuchMethodError   java如何使用逗号分别存储每个值,然后将它们存储到单独的数组中?