如何读取持续被追加行更新的文件?
在我的终端上,我正在运行:
curl --user dhelm:12345 \https://stream.twitter.com/1.1/statuses/sample.json > raw-data.txt
curl的输出是实时流式传输的Twitter数据,这些数据被写入到一个文件raw-data.txt中。
在Python中,
import json
posts = []
for line in open("/Users/me/raw-data.txt"):
try:
posts.append(json.loads(line))
except:
pass
我正在用Python读取这个文件,并使用json解码器把结果添加到posts中。
现在,问题是我不希望我的程序在Python脚本读取到文件末尾时就结束。相反,我希望在终端上运行的curl继续向文件raw-data.txt添加更多内容时,我的程序能够继续读取。
2 个回答
我不知道这个在语言规范中是否有保证,但我知道在Unix系统上,至少在CPython 2.x和3.3以上的版本中是有效的。所以如果你不在乎3.0到3.2的版本(或者可以自己测试一下),也不在乎Windows系统(或者可以自己测试一下)……
当你到达文件的结束(EOF)时,你的for line in f
循环会结束。但这并不会关闭文件,或者做其他事情;它只是让文件指针停在了EOF的位置。如果你再尝试循环,而此时有新的数据被写入,你会得到这些新数据。
所以,你可以这样做:
with open("/Users/me/raw-data.txt") as f:
while True:
for line in f:
try:
posts.append(json.loads(line))
except:
pass
这样做的问题是,当你到达EOF时,它会尽可能快地循环,检查自己是否仍然在EOF位置。所以你真正想要做的是在有新数据到来之前让程序等待。你可以在一些Unix平台上使用select
来实现,但并不是所有平台都支持。你也可以使用特定平台的文件通知API,或者使用一个跨平台的API封装。
如果你使用的是Python 3.4及以上版本,你可以使用标准库中的selectors
模块,这样可以在Solaris、Linux、OS X以及其他支持kqueue
的*BSD系统上工作,而在一些只支持select
的Unix平台上也可以使用……但在Windows上会失败,并且在某些Unix系统上会尽可能快地循环。你可以通过在找不到合适的选择器时拒绝启动来解决这个问题。
或者,如果情况最糟糕,你可以在EOF时稍微等待一下(可能采用指数退避的方式,但只到一个合理的短时间限制)。这就是tail -f
在没有通知检测机制的平台上所做的。
所以:
import selectors
import time
if selectors.DefaultSelector in (selectors.SelectSelector, selectors.PollSelector):
def init(f): pass
def wait(): time.sleep(1)
else:
def init(f):
sel = selectors.DefaultSelector()
sel.register(f, selectors.EVENT_READ, None)
def wait():
sel.select()
with open("/Users/me/raw-data.txt") as f:
init(f)
while True:
for line in f:
try:
posts.append(json.loads(line))
except:
pass
wait()
我觉得这就是一个XY问题。因为你想不到怎么在Python里逐行读取HTTP请求,所以你决定用curl
来下载文件,然后再在Python里读取这个文件。这样做的话,你就得面对一个问题:在请求还没结束的时候,可能会遇到文件结束符(EOF),因为你已经追上curl
了。所以,你自己让事情变得更复杂了,完全没有必要。
虽然用标准库(stdlib)也能做到流式下载,但过程有点麻烦;不过使用requests
库就简单多了。所以,我们就用这个:
import json
import requests
from requests.auth import HTTPBasicAuth
posts = []
url = 'https://stream.twitter.com/1.1/statuses/sample.json'
r = requests.get(url, auth=('dhelm', '12345'), stream=True)
for line in r.iter_lines():
try:
posts.append(json.loads(line))
except:
pass
这就是整个程序。