一个简单的python sqs实用程序包

awsPySqsListener的Python项目详细描述


这个包负责监听sqs所涉及的样板文件。 队列,以及将消息发送到队列。感谢@eligro91,这个包现在支持python 3.6!

安装

pip install awsPySqsListener

收听队列

使用侦听器非常简单-只需从 SqsListener类并实现handle_message()方法。 队列将在运行时创建,如果它还不存在。 您还可以指定一个错误队列来自动将任何错误推送到。

下面是一个基本代码示例:

标准侦听器

from sqs_listener import SqsListener

class MyListener(SqsListener):
    def handle_message(self, body, attributes, messages_attributes):
        run_my_function(body['param1'], body['param2'])

listener = MyListener('my-message-queue', error_queue='my-error-queue', region_name='us-east-1')
listener.listen()

错误侦听器

from sqs_listener import SqsListener
class MyErrorListener(SqsListener):
    def handle_message(self, body, attributes, messages_attributes):
        save_to_log(body['exception_type'], body['error_message']

error_listener = MyErrorListener('my-error-queue')
error_listener.listen()
可用的kwargs选项如下:
  • error_queue(str)-要推送错误的队列的名称。
  • force_delete(boolean)-删除从队列接收的消息,无论处理程序函数是否成功。默认情况下,只有在handler函数无异常返回时,消息才会被删除
  • interval(int)-两次轮询之间的秒数。默认设置为60
  • visibility_timeout(str)-消息被读取后不可见的秒数(“在飞行中”)。在这个时间间隔之后,如果没有同时删除它,它将重新出现在队列中。默认设置为“600”(10分钟)
  • 错误可见性超时(str)-与错误队列的上一个参数相同。仅当设置了{TT5} $参数且队列尚未存在时才适用。

作为守护进程运行

通常,在生产环境中,您需要监听带有守护进程的sqs队列。 这可以通过继承包的Daemon类并重写run()方法来轻松实现。

源根文件夹中的sample_daemon.py文件为实现这一点提供了一个清晰的示例。用这个例子, 可以使用命令python sample_daemon.py start将侦听器作为守护进程运行。同样,命令 python sample_daemon.py stop将停止进程。您很可能需要使用sudo运行启动脚本。

日志记录

侦听器和启动程序实例将其所有消息推送到一个名为“sqs\u listener”的logger实例。 为了查看消息,需要将记录器重定向到stdout或日志文件。

例如:
logger = logging.getLogger('sqs_listener')
logger.setLevel(logging.INFO)

sh = logging.StreamHandler(sys.stdout)
sh.setLevel(logging.INFO)

formatstr = '[%(asctime)s - %(name)s - %(levelname)s]  %(message)s'
formatter = logging.Formatter(formatstr)

sh.setFormatter(formatter)
logger.addHandler(sh)

或日志文件:
logger = logging.getLogger('sqs_listener')
logger.setLevel(logging.INFO)

sh = logging.FileHandler('mylog.log')
sh.setLevel(logging.INFO)

formatstr = '[%(asctime)s - %(name)s - %(levelname)s]  %(message)s'
formatter = logging.Formatter(formatstr)

sh.setFormatter(formatter)
logger.addHandler(sh)

发送消息

为了发送消息,用队列的名称实例化一个SqsLauncher。默认情况下,异常将 如果队列不存在,就进行提升,但如果参数为$,则可以自动创建它。 设置为true。在这种情况下,还可以通过 第三个参数。

实例化后,使用launch_message()方法发送消息。消息体应该是dict, 并且可以按照SQS docs中的说明指定额外的kwarg。 该方法返回来自sqs的响应。

启动程序示例

from sqs_launcher import SqsLauncher

launcher = SqsLauncher('my-queue')
response = launcher.launch_message({'param1': 'hello', 'param2': 'world'})

重要注意事项

  • 此外,还必须设置环境变量AWS_ACCOUNT_ID。 到具有有效aws凭据的环境(通过环境变量或凭据文件)
  • 对于主队列和错误队列,如果队列没有 存在(在指定区域中),它将在运行时创建。
  • 错误队列只接收消息正文中的两个值:exception_typeerror_message。两者都是str
  • 如果侦听器执行的函数涉及到连接到数据库,则应在函数末尾显式关闭连接。否则,可能会出现如下错误:OperationalError(2006, 'MySQL server has gone away')

贡献

离开回购协议并提出请求。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
ArrayList中实体对象上的JAVA泛型   带Redis的爪哇芹菜vs单用Redis   java在设备面向横向时隐藏标题栏/通知栏   java JXTreeTable:如何使用ComponentProvider为一列设置渲染器   java创建异常的成本与记录异常的成本相比   java在方法参数中使用setter传递新对象   java在一个类中的方法与另一个类中的方法交互时遇到问题   java如何迭代2个大小相等的ArrayList   Java getDesktop()。open在Windows中工作,但在Mac中不工作   从tomcat切换到jetty后的java“无法启动嵌入式容器”,Spring引导   java如何使用void方法   java如何在解组时在JAXB的ValidationEventHandler中获取节点值?   如何使用Akka Java API创建不响应的TCP接收器   JavaScriptjQuery在java中将记录上传到数据库时的进度条   如何在重新绘制时在java小程序中显示上一个图像