线程轮询SQS并将其添加到Python队列处理时崩溃

6 投票
1 回答
3627 浏览
提问于 2025-04-19 04:04

我有一段多线程的代码,里面有3个线程从SQS获取数据,然后把这些数据放到一个Python队列里。还有5个线程从这个Python队列中取出消息,处理后再发送到后端系统。

以下是代码:

python_queue = Queue.Queue()

class GetDataFromSQS(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self, python_queue):
        threading.Thread.__init__(self)
        self.python_queue = python_queue

    def run(self):
        while True:
            time.sleep(0.5) //sleep for a few secs before querying again
            try:
                msgs = sqs_queue.get_messages(10)
                if msgs == None:
                    print "sqs is empty now"!
                for msg in msgs:
                    #place each message block from sqs into python queue for processing
                    self.python_queue.put(msg)
                    print "Adding a new message to Queue. Queue size is now %d" % self.python_queue.qsize()
                    #delete from sqs
                    sqs_queue.delete_message(msg)
            except Exception as e:
                print "Exception in GetDataFromSQS :: " +  e


class ProcessSQSMsgs(threading.Thread):
    def __init__(self, python_queue):
        threading.Thread.__init__(self)
        self.python_queue = python_queue
        self.pool_manager = PoolManager(num_pools=6)

    def run(self):
        while True:
            #grabs the message to be parsed from sqs queue
            python_queue_msg = self.python_queue.get()
            try:
                processMsgAndSendToBackend(python_queue_msg, self.pool_manager)
            except Exception as e:
                print "Error parsing:: " + e
            finally:
                self.python_queue.task_done()

def processMsgAndSendToBackend(msg, pool_manager):
    if msg != "":
        ###### All the code related to processing the msg
        for individualValue in processedMsg:
            try:
                response = pool_manager.urlopen('POST', backend_endpoint, body=individualValue)
                if response == None:
                    print "Error"
                else:
                    response.release_conn()
            except Exception as e:
                print "Exception! Post data to backend: " + e


def startMyPython():
    #spawn a pool of threads, and pass them queue instance
    for i in range(3):
        sqsThread = GetDataFromSQS(python_queue)
        sqsThread.start()

    for j in range(5):
        parseThread = ProcessSQSMsgs(python_queue)
        #parseThread.setDaemon(True)
        parseThread.start()

    #wait on the queue until everything has been processed
    python_queue.join()
    # python_queue.close() -- should i do this?

startMyPython()

问题: 每隔几天,3个Python工作线程会随机停止运行(我用top -p -H命令监控它们),如果我杀掉这个进程并重新启动脚本,情况就会好转。我怀疑消失的线程是那3个GetDataFromSQS线程。因为GetDataFromSQS线程停止了,其他5个线程虽然还在运行,但因为Python队列里没有数据,所以它们总是处于休眠状态。我不太确定自己哪里做错了,因为我对Python还很陌生,是按照这个教程来创建队列逻辑和线程的 - http://www.ibm.com/developerworks/aix/library/au-threadingpython/

提前感谢你的帮助。希望我能清楚地解释我的问题。

1 个回答

6

这个线程卡住的问题是因为获取SQS队列的连接出现了问题。我使用了IAM来管理凭证,并用boto这个工具包来连接SQS。

问题的根本原因是boto包在读取AWS的认证信息时,有时候会失败。

解决办法是修改boto的配置,增加尝试连接AWS的次数。

[Boto]
metadata_service_num_attempts = 5

( https://groups.google.com/forum/#!topic/boto-users/1yX24WG3g1E )

撰写回答