Python - 在HTTP响应流中查找

11 投票
3 回答
10394 浏览
提问于 2025-04-16 13:05

使用 urllibs(或者 urllibs2)来实现我想要的功能,感觉完全没希望。有没有什么解决办法?

3 个回答

1

我没有找到现成的可以对HTTP网址使用seek()的文件接口,所以我自己做了一个简单的版本:https://github.com/valgur/pyhttpio。这个版本依赖于urllib.request,不过如果需要的话,可能很容易改成用requests

完整的代码如下:

import cgi
import time
import urllib.request
from io import IOBase
from sys import stderr


class SeekableHTTPFile(IOBase):
    def __init__(self, url, name=None, repeat_time=-1, debug=False):
        """Allow a file accessible via HTTP to be used like a local file by utilities
         that use `seek()` to read arbitrary parts of the file, such as `ZipFile`.
        Seeking is done via the 'range: bytes=xx-yy' HTTP header.

        Parameters
        ----------
        url : str
            A HTTP or HTTPS URL
        name : str, optional
            The filename of the file.
            Will be filled from the Content-Disposition header if not provided.
        repeat_time : int, optional
            In case of HTTP errors wait `repeat_time` seconds before trying again.
            Negative value or `None` disables retrying and simply passes on the exception (the default).
        """
        super().__init__()
        self.url = url
        self.name = name
        self.repeat_time = repeat_time
        self.debug = debug
        self._pos = 0
        self._seekable = True
        with self._urlopen() as f:
            if self.debug:
                print(f.getheaders())
            self.content_length = int(f.getheader("Content-Length", -1))
            if self.content_length < 0:
                self._seekable = False
            if f.getheader("Accept-Ranges", "none").lower() != "bytes":
                self._seekable = False
            if name is None:
                header = f.getheader("Content-Disposition")
                if header:
                    value, params = cgi.parse_header(header)
                    self.name = params["filename"]

    def seek(self, offset, whence=0):
        if not self.seekable():
            raise OSError
        if whence == 0:
            self._pos = 0
        elif whence == 1:
            pass
        elif whence == 2:
            self._pos = self.content_length
        self._pos += offset
        return self._pos

    def seekable(self, *args, **kwargs):
        return self._seekable

    def readable(self, *args, **kwargs):
        return not self.closed

    def writable(self, *args, **kwargs):
        return False

    def read(self, amt=-1):
        if self._pos >= self.content_length:
            return b""
        if amt < 0:
            end = self.content_length - 1
        else:
            end = min(self._pos + amt - 1, self.content_length - 1)
        byte_range = (self._pos, end)
        self._pos = end + 1
        with self._urlopen(byte_range) as f:
            return f.read()

    def readall(self):
        return self.read(-1)

    def tell(self):
        return self._pos

    def __getattribute__(self, item):
        attr = object.__getattribute__(self, item)
        if not object.__getattribute__(self, "debug"):
            return attr

        if hasattr(attr, '__call__'):
            def trace(*args, **kwargs):
                a = ", ".join(map(str, args))
                if kwargs:
                    a += ", ".join(["{}={}".format(k, v) for k, v in kwargs.items()])
                print("Calling: {}({})".format(item, a))
                return attr(*args, **kwargs)

            return trace
        else:
            return attr

    def _urlopen(self, byte_range=None):
        header = {}
        if byte_range:
            header = {"range": "bytes={}-{}".format(*byte_range)}
        while True:
            try:
                r = urllib.request.Request(self.url, headers=header)
                return urllib.request.urlopen(r)
            except urllib.error.HTTPError as e:
                if self.repeat_time is None or self.repeat_time < 0:
                    raise
                print("Server responded with " + str(e), file=stderr)
                print("Sleeping for {} seconds before trying again".format(self.repeat_time), file=stderr)
                time.sleep(self.repeat_time)

一个可能的使用示例:

url = "https://www.python.org/ftp/python/3.5.0/python-3.5.0-embed-amd64.zip"
f = SeekableHTTPFile(url, debug=True)
zf = ZipFile(f)
zf.printdir()
zf.extract("python.exe")

补充说明:其实在这个回答中有一个几乎完全相同,但稍微简化的实现:https://stackoverflow.com/a/7852229/2997179

2

可能最好的办法就是把数据写到一个文件里(或者用一个字符串来存,使用StringIO),然后在这个文件(或字符串)里查找你需要的内容。

24

我不太确定C#的实现是怎么工作的,但因为网络流一般是不能随意跳转的,我猜它可能会把所有数据下载到一个本地文件或者内存对象中,然后再在里面进行跳转。在Python中,可以按照Abafei的建议,把数据写入一个文件或者StringIO,然后再从那里进行跳转。

不过,如果你想像Abafei的回答中提到的那样,只获取文件的某个特定部分(而不是在返回的数据中来回跳转),还有另一种方法。urllib2可以用来获取网页的某个部分(在HTTP术语中称为“范围”),前提是服务器支持这种功能。

range头部

当你向服务器发送请求时,请求的参数会通过各种头部传递。其中一个就是Range头部,这在RFC2616的第14.35节中定义(这是定义HTTP/1.1的规范)。这个头部允许你做一些事情,比如从第10,000个字节开始获取所有数据,或者获取第1,000到第1,500个字节之间的数据。

服务器支持

服务器并不一定要支持范围获取。有些服务器会在响应中返回Accept-Ranges头部(在RFC2616的第14.5节中),以报告它们是否支持范围获取。可以通过HEAD请求来检查这一点。不过,其实没有必要这么做;如果服务器不支持范围,它会返回整个页面,我们可以像之前一样在Python中提取所需的数据部分。

检查是否返回了范围

如果服务器返回了范围,它必须在响应中发送Content-Range头部(在RFC2616的第14.16节中)。如果这个头部出现在响应的头部中,我们就知道返回了范围;如果没有这个头部,则表示返回了整个页面。

使用urllib2的实现

urllib2允许我们在请求中添加头部,这样我们就可以请求服务器返回一个范围,而不是整个页面。下面的脚本接受一个URL、一个起始位置和(可选的)长度作为命令行参数,并尝试获取页面的指定部分。

import sys
import urllib2

# Check command line arguments.
if len(sys.argv) < 3:
    sys.stderr.write("Usage: %s url start [length]\n" % sys.argv[0])
    sys.exit(1)

# Create a request for the given URL.
request = urllib2.Request(sys.argv[1])

# Add the header to specify the range to download.
if len(sys.argv) > 3:
    start, length = map(int, sys.argv[2:])
    request.add_header("range", "bytes=%d-%d" % (start, start + length - 1))
else:
    request.add_header("range", "bytes=%s-" % sys.argv[2])

# Try to get the response. This will raise a urllib2.URLError if there is a
# problem (e.g., invalid URL).
response = urllib2.urlopen(request)

# If a content-range header is present, partial retrieval worked.
if "content-range" in response.headers:
    print "Partial retrieval successful."

    # The header contains the string 'bytes', followed by a space, then the
    # range in the format 'start-end', followed by a slash and then the total
    # size of the page (or an asterix if the total size is unknown). Lets get
    # the range and total size from this.
    range, total = response.headers['content-range'].split(' ')[-1].split('/')

    # Print a message giving the range information.
    if total == '*':
        print "Bytes %s of an unknown total were retrieved." % range
    else:
        print "Bytes %s of a total of %s were retrieved." % (range, total)

# No header, so partial retrieval was unsuccessful.
else:
    print "Unable to use partial retrieval."

# And for good measure, lets check how much data we downloaded.
data = response.read()
print "Retrieved data size: %d bytes" % len(data)

使用这个,我可以获取Python主页的最后2,000个字节:

blair@blair-eeepc:~$ python retrieverange.py http://www.python.org/ 17387
Partial retrieval successful.
Bytes 17387-19386 of a total of 19387 were retrieved.
Retrieved data size: 2000 bytes

或者获取主页中间的400个字节:

blair@blair-eeepc:~$ python retrieverange.py http://www.python.org/ 6000 400
Partial retrieval successful.
Bytes 6000-6399 of a total of 19387 were retrieved.
Retrieved data size: 400 bytes

不过,Google主页不支持范围获取:

blair@blair-eeepc:~$ python retrieverange.py http://www.google.com/ 1000 500
Unable to use partial retrieval.
Retrieved data size: 9621 bytes

在这种情况下,就需要在进一步处理之前,在Python中提取感兴趣的数据。

撰写回答