将文件尾部内容发送到消息队列
我在一台Linux机器上通过Python的subprocess启动了一个进程(具体是在AWS EC2上),这个进程会生成一些文件。我需要实时监控这些文件的变化,并把每次生成的JSON格式的输出发送到相应的AWS SQS队列。我该怎么做呢?
编辑
根据这个回答的建议,asyncproc和PEP3145,我可以用以下方法来实现:
from asyncproc import Process
import Queue
import os
import time
# Substitute AWS SQS for Queue
sta_queue = Queue.Queue()
msg_queue = Queue.Queue()
running_procs = {'status':(Process(['/usr/bin/tail', '--retry', '-f','test.sta']),sta_queue),'message':(Process(['/usr/bin/tail', '--retry', '-f', 'test.msg' ]),msg_queue)}
def handle_proc(p,q):
latest = p.read()
if latest:
# If nothing new, latest will be an empty string
q.put(latest)
retcode = p.wait(flags=os.WNOHANG)
return retcode
while len(running_procs):
proc_names = running_procs.keys()
for proc_name in proc_names:
proc, q = running_procs[proc_name]
retcode = handle_proc(proc, q)
if retcode is not None: # Process finished.
del running_procs[proc_name]
time.sleep(1.0)
print("Status queue")
while not sta_queue.empty():
print(sta_queue.get())
print("Message queue")
while not msg_queue.empty():
print(msg_queue.get())
我觉得这样应该就够了,除非有人能提供更好的解决方案。
更多编辑
我可能想得太复杂了。虽然上面的方案很好,但我觉得最简单的解决办法是: - 检查文件是否存在 - 如果文件存在,就把它们复制到AWS S3的一个存储桶里,并通过AWS SQS发送一条消息,告诉大家文件已经被复制。每60秒重复一次 - 消费者应用程序会定期检查SQS,最终会收到文件已被复制的消息 - 消费者应用程序从S3下载文件,并用最新的内容替换之前的内容。这个过程会一直重复,直到任务完成
不过,关于在子进程中处理异步输入输出的问题仍然存在。
1 个回答
0
你可以使用 subprocess.Popen 这个类来运行 tail 命令,并读取它的输出。
try:
process = subprocess.Popen(['tail', '-f', filename], stdout=PIPE)
except (OSError, ValueError):
pass # TODO: handle errors
output = process.stdout.read()
如果你想要更简单的方式,可以用 subprocess.check_output 这个函数,它可以用一行代码实现这个功能。这个功能是在 Python 2.7 版本中新增的。
try:
output = subprocess.check_output(['tail', '-f', filename], stdout=PIPE)
except CalledProcessError:
pass # TODO: handle errors
如果你想了解非阻塞的输入输出,可以查看 这个问题。