线程轮询SQS并将其添加到Python队列处理时崩溃
我有一段多线程的代码,里面有3个线程从SQS获取数据,然后把这些数据放到一个Python队列里。还有5个线程从这个Python队列中取出消息,处理后再发送到后端系统。
以下是代码:
python_queue = Queue.Queue()
class GetDataFromSQS(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, python_queue):
threading.Thread.__init__(self)
self.python_queue = python_queue
def run(self):
while True:
time.sleep(0.5) //sleep for a few secs before querying again
try:
msgs = sqs_queue.get_messages(10)
if msgs == None:
print "sqs is empty now"!
for msg in msgs:
#place each message block from sqs into python queue for processing
self.python_queue.put(msg)
print "Adding a new message to Queue. Queue size is now %d" % self.python_queue.qsize()
#delete from sqs
sqs_queue.delete_message(msg)
except Exception as e:
print "Exception in GetDataFromSQS :: " + e
class ProcessSQSMsgs(threading.Thread):
def __init__(self, python_queue):
threading.Thread.__init__(self)
self.python_queue = python_queue
self.pool_manager = PoolManager(num_pools=6)
def run(self):
while True:
#grabs the message to be parsed from sqs queue
python_queue_msg = self.python_queue.get()
try:
processMsgAndSendToBackend(python_queue_msg, self.pool_manager)
except Exception as e:
print "Error parsing:: " + e
finally:
self.python_queue.task_done()
def processMsgAndSendToBackend(msg, pool_manager):
if msg != "":
###### All the code related to processing the msg
for individualValue in processedMsg:
try:
response = pool_manager.urlopen('POST', backend_endpoint, body=individualValue)
if response == None:
print "Error"
else:
response.release_conn()
except Exception as e:
print "Exception! Post data to backend: " + e
def startMyPython():
#spawn a pool of threads, and pass them queue instance
for i in range(3):
sqsThread = GetDataFromSQS(python_queue)
sqsThread.start()
for j in range(5):
parseThread = ProcessSQSMsgs(python_queue)
#parseThread.setDaemon(True)
parseThread.start()
#wait on the queue until everything has been processed
python_queue.join()
# python_queue.close() -- should i do this?
startMyPython()
问题: 每隔几天,3个Python工作线程会随机停止运行(我用top -p -H命令监控它们),如果我杀掉这个进程并重新启动脚本,情况就会好转。我怀疑消失的线程是那3个GetDataFromSQS线程。因为GetDataFromSQS线程停止了,其他5个线程虽然还在运行,但因为Python队列里没有数据,所以它们总是处于休眠状态。我不太确定自己哪里做错了,因为我对Python还很陌生,是按照这个教程来创建队列逻辑和线程的 - http://www.ibm.com/developerworks/aix/library/au-threadingpython/
提前感谢你的帮助。希望我能清楚地解释我的问题。
1 个回答
6
这个线程卡住的问题是因为获取SQS队列的连接出现了问题。我使用了IAM来管理凭证,并用boto这个工具包来连接SQS。
问题的根本原因是boto包在读取AWS的认证信息时,有时候会失败。
解决办法是修改boto的配置,增加尝试连接AWS的次数。
[Boto]
metadata_service_num_attempts = 5
( https://groups.google.com/forum/#!topic/boto-users/1yX24WG3g1E )