一个简单的python sqs实用程序包
awsPySqsListener的Python项目详细描述
这个包负责监听sqs所涉及的样板文件。 队列,以及将消息发送到队列。感谢@eligro91,这个包现在支持python 3.6!
安装
pip install awsPySqsListener
收听队列
使用侦听器非常简单-只需从
SqsListener类并实现handle_message()方法。
队列将在运行时创建,如果它还不存在。
您还可以指定一个错误队列来自动将任何错误推送到。
下面是一个基本代码示例:
标准侦听器
from sqs_listener import SqsListener class MyListener(SqsListener): def handle_message(self, body, attributes, messages_attributes): run_my_function(body['param1'], body['param2']) listener = MyListener('my-message-queue', error_queue='my-error-queue', region_name='us-east-1') listener.listen()
错误侦听器
from sqs_listener import SqsListener class MyErrorListener(SqsListener): def handle_message(self, body, attributes, messages_attributes): save_to_log(body['exception_type'], body['error_message'] error_listener = MyErrorListener('my-error-queue') error_listener.listen()
可用的kwargs选项如下:
- error_queue(str)-要推送错误的队列的名称。
- force_delete(boolean)-删除从队列接收的消息,无论处理程序函数是否成功。默认情况下,只有在handler函数无异常返回时,消息才会被删除
- interval(int)-两次轮询之间的秒数。默认设置为60
- visibility_timeout(str)-消息被读取后不可见的秒数(“在飞行中”)。在这个时间间隔之后,如果没有同时删除它,它将重新出现在队列中。默认设置为“600”(10分钟)
- 错误可见性超时(str)-与错误队列的上一个参数相同。仅当设置了{TT5} $参数且队列尚未存在时才适用。
作为守护进程运行
通常,在生产环境中,您需要监听带有守护进程的sqs队列。
这可以通过继承包的Daemon类并重写run()方法来轻松实现。
源根文件夹中的sample_daemon.py文件为实现这一点提供了一个清晰的示例。用这个例子,
可以使用命令python sample_daemon.py start将侦听器作为守护进程运行。同样,命令
python sample_daemon.py stop将停止进程。您很可能需要使用sudo运行启动脚本。
日志记录
侦听器和启动程序实例将其所有消息推送到一个名为“sqs\u listener”的logger实例。
为了查看消息,需要将记录器重定向到stdout或日志文件。
例如:
logger = logging.getLogger('sqs_listener') logger.setLevel(logging.INFO) sh = logging.StreamHandler(sys.stdout) sh.setLevel(logging.INFO) formatstr = '[%(asctime)s - %(name)s - %(levelname)s] %(message)s' formatter = logging.Formatter(formatstr) sh.setFormatter(formatter) logger.addHandler(sh)
或日志文件:
logger = logging.getLogger('sqs_listener') logger.setLevel(logging.INFO) sh = logging.FileHandler('mylog.log') sh.setLevel(logging.INFO) formatstr = '[%(asctime)s - %(name)s - %(levelname)s] %(message)s' formatter = logging.Formatter(formatstr) sh.setFormatter(formatter) logger.addHandler(sh)
发送消息
为了发送消息,用队列的名称实例化一个SqsLauncher。默认情况下,异常将
如果队列不存在,就进行提升,但如果参数为$,则可以自动创建它。
设置为true。在这种情况下,还可以通过
第三个参数。
实例化后,使用launch_message()方法发送消息。消息体应该是dict,
并且可以按照SQS docs中的说明指定额外的kwarg。
该方法返回来自sqs的响应。
启动程序示例
from sqs_launcher import SqsLauncher launcher = SqsLauncher('my-queue') response = launcher.launch_message({'param1': 'hello', 'param2': 'world'})
重要注意事项
- 此外,还必须设置环境变量AWS_ACCOUNT_ID。 到具有有效aws凭据的环境(通过环境变量或凭据文件)
- 对于主队列和错误队列,如果队列没有 存在(在指定区域中),它将在运行时创建。
- 错误队列只接收消息正文中的两个值:exception_type和error_message。两者都是str
- 如果侦听器执行的函数涉及到连接到数据库,则应在函数末尾显式关闭连接。否则,可能会出现如下错误:OperationalError(2006, 'MySQL server has gone away')
贡献
离开回购协议并提出请求。