如何读取持续被追加行更新的文件?

2 投票
2 回答
3461 浏览
提问于 2025-04-18 18:15

在我的终端上,我正在运行:

curl --user dhelm:12345 \https://stream.twitter.com/1.1/statuses/sample.json > raw-data.txt

curl的输出是实时流式传输的Twitter数据,这些数据被写入到一个文件raw-data.txt中。

在Python中,

 import json
 posts = []

 for line in open("/Users/me/raw-data.txt"):
    try:
        posts.append(json.loads(line))
    except:
        pass

我正在用Python读取这个文件,并使用json解码器把结果添加到posts中。

现在,问题是我不希望我的程序在Python脚本读取到文件末尾时就结束。相反,我希望在终端上运行的curl继续向文件raw-data.txt添加更多内容时,我的程序能够继续读取。

2 个回答

3

我不知道这个在语言规范中是否有保证,但我知道在Unix系统上,至少在CPython 2.x和3.3以上的版本中是有效的。所以如果你不在乎3.0到3.2的版本(或者可以自己测试一下),也不在乎Windows系统(或者可以自己测试一下)……

当你到达文件的结束(EOF)时,你的for line in f循环会结束。但这并不会关闭文件,或者做其他事情;它只是让文件指针停在了EOF的位置。如果你再尝试循环,而此时有新的数据被写入,你会得到这些新数据。

所以,你可以这样做:

with open("/Users/me/raw-data.txt") as f:
    while True:
        for line in f:
            try:
                posts.append(json.loads(line))
            except:
                pass

这样做的问题是,当你到达EOF时,它会尽可能快地循环,检查自己是否仍然在EOF位置。所以你真正想要做的是在有新数据到来之前让程序等待。你可以在一些Unix平台上使用select来实现,但并不是所有平台都支持。你也可以使用特定平台的文件通知API,或者使用一个跨平台的API封装。

如果你使用的是Python 3.4及以上版本,你可以使用标准库中的selectors模块,这样可以在Solaris、Linux、OS X以及其他支持kqueue的*BSD系统上工作,而在一些只支持select的Unix平台上也可以使用……但在Windows上会失败,并且在某些Unix系统上会尽可能快地循环。你可以通过在找不到合适的选择器时拒绝启动来解决这个问题。

或者,如果情况最糟糕,你可以在EOF时稍微等待一下(可能采用指数退避的方式,但只到一个合理的短时间限制)。这就是tail -f在没有通知检测机制的平台上所做的。

所以:

import selectors
import time

if selectors.DefaultSelector in (selectors.SelectSelector, selectors.PollSelector):
    def init(f): pass
    def wait(): time.sleep(1)
else:
    def init(f):
        sel = selectors.DefaultSelector()
        sel.register(f, selectors.EVENT_READ, None)
    def wait():
        sel.select()

with open("/Users/me/raw-data.txt") as f:
    init(f)
    while True:
        for line in f:
            try:
                posts.append(json.loads(line))
            except:
                pass
         wait()
5

我觉得这就是一个XY问题。因为你想不到怎么在Python里逐行读取HTTP请求,所以你决定用curl来下载文件,然后再在Python里读取这个文件。这样做的话,你就得面对一个问题:在请求还没结束的时候,可能会遇到文件结束符(EOF),因为你已经追上curl了。所以,你自己让事情变得更复杂了,完全没有必要。

虽然用标准库(stdlib)也能做到流式下载,但过程有点麻烦;不过使用requests库就简单多了。所以,我们就用这个:

import json
import requests
from requests.auth import HTTPBasicAuth

posts = []
url = 'https://stream.twitter.com/1.1/statuses/sample.json'
r = requests.get(url, auth=('dhelm', '12345'), stream=True)
for line in r.iter_lines():
    try:
        posts.append(json.loads(line))
    except:
        pass

这就是整个程序。

撰写回答