将文件尾部内容发送到消息队列

2 投票
1 回答
1722 浏览
提问于 2025-04-16 16:04

我在一台Linux机器上通过Python的subprocess启动了一个进程(具体是在AWS EC2上),这个进程会生成一些文件。我需要实时监控这些文件的变化,并把每次生成的JSON格式的输出发送到相应的AWS SQS队列。我该怎么做呢?

编辑

根据这个回答的建议,asyncprocPEP3145,我可以用以下方法来实现:

from asyncproc import Process
import Queue
import os
import time

# Substitute AWS SQS for Queue
sta_queue = Queue.Queue()
msg_queue = Queue.Queue()
running_procs = {'status':(Process(['/usr/bin/tail', '--retry', '-f','test.sta']),sta_queue),'message':(Process(['/usr/bin/tail', '--retry', '-f', 'test.msg' ]),msg_queue)}

def handle_proc(p,q):
    latest = p.read()
    if latest:
        # If nothing new, latest will be an empty string
        q.put(latest)
    retcode = p.wait(flags=os.WNOHANG)
    return retcode

while len(running_procs):
    proc_names = running_procs.keys()
    for proc_name in proc_names:
        proc, q = running_procs[proc_name]
        retcode = handle_proc(proc, q)
        if retcode is not None: # Process finished.
            del running_procs[proc_name]
    time.sleep(1.0)
print("Status queue")
while not sta_queue.empty():
    print(sta_queue.get())
print("Message queue")
while not msg_queue.empty():
    print(msg_queue.get())

我觉得这样应该就够了,除非有人能提供更好的解决方案。

更多编辑

我可能想得太复杂了。虽然上面的方案很好,但我觉得最简单的解决办法是: - 检查文件是否存在 - 如果文件存在,就把它们复制到AWS S3的一个存储桶里,并通过AWS SQS发送一条消息,告诉大家文件已经被复制。每60秒重复一次 - 消费者应用程序会定期检查SQS,最终会收到文件已被复制的消息 - 消费者应用程序从S3下载文件,并用最新的内容替换之前的内容。这个过程会一直重复,直到任务完成

不过,关于在子进程中处理异步输入输出的问题仍然存在。

1 个回答

0

你可以使用 subprocess.Popen 这个类来运行 tail 命令,并读取它的输出。

try:
    process = subprocess.Popen(['tail', '-f', filename], stdout=PIPE)
except (OSError, ValueError):
    pass    # TODO: handle errors
output = process.stdout.read()

如果你想要更简单的方式,可以用 subprocess.check_output 这个函数,它可以用一行代码实现这个功能。这个功能是在 Python 2.7 版本中新增的。

try:
    output = subprocess.check_output(['tail', '-f', filename], stdout=PIPE)
except CalledProcessError:
    pass    # TODO: handle errors

如果你想了解非阻塞的输入输出,可以查看 这个问题

撰写回答