如何通过Twisted框架发送带基本HTTP认证的HTTP请求

0 投票
3 回答
2369 浏览
提问于 2025-04-18 07:41

我在运行来自 http://twistedmatrix.com/documents/13.2.0/web/howto/client.html#auto4 的代码时,遇到了 <HTML><HEAD><TITLE>401 Unauthorized</TITLE></HEAD> 的错误,但我不知道怎么在请求中添加认证信息。

更新

我把代码更新成了这样:

from base64 import b64encode
authorization = b64encode(b"admin:admin")

d = agent.request(
    'GET',
    'http://172.19.1.76/',
    Headers(
        {
            'User-Agent': ['Twisted Web Client Example'],
            b"authorization": b"Basic " + authorization
        }        
    ),
    None)

但是又出现了以下错误,我不知道在列表中应该提供什么内容。

packages/twisted/web/http_headers.py", line 199, in setRawHeaders
    "instance of %r instead" % (name, type(values)))
TypeError: Header entry 'authorization' should be list but found instance of <type 'str'> instead

更新

请求的内容应该放在一个单独的列表中,像这样:

"authorization": ["Basic " + authorization]

3 个回答

0

这是一个完整的例子:

from twisted.trial import unittest   
from urllib import urlencode
from base64 import b64encode
from twisted.python.log import err
from twisted.web.client import Agent, readBody
from twisted.internet import reactor
from twisted.internet.ssl import ClientContextFactory
from twisted.web.http_headers import Headers
from zope.interface import implements
from twisted.internet.defer import succeed
from twisted.web.iweb import IBodyProducer


class StringProducer(object):
    implements(IBodyProducer)

    def __init__(self, body):
        self.body = body
        self.length = len(body)

    def startProducing(self, consumer):
        consumer.write(self.body)
        return succeed(None)

    def pauseProducing(self):
        pass

    def stopProducing(self):
        pass

class WebClientContextFactory(ClientContextFactory):
    def getContext(self, hostname, port):
        return ClientContextFactory.getContext(self)

class HttpsClientTestCases(unittest.TestCase):
    def test_https_client(self):
        def cbRequest(response):
            print 'Response version:', response.version
            print 'Response code:', response.code
            print 'Response phrase:', response.phrase
            print 'Response headers:[{}]'.format(list(response.headers.getAllRawHeaders()))
            d = readBody(response)
            d.addCallback(cbBody)
            return d

        def cbBody(body):
            print 'Response body:'
            print body

        contextFactory = WebClientContextFactory()
        agent = Agent(reactor, contextFactory)

        authorization = b64encode(b"username:password")
        data = StringProducer({'hello': 'world'})
        d = agent.request(
            'POST',
            'https://....',
            headers = Headers(
                {
                    'Content-Type': ['application/x-www-form-urlencoded'],
                    'Authorization': ['Basic ' + authorization]
                }
            ),
            bodyProducer = data
        )

        d.addCallbacks(cbRequest, err)
        d.addCallback(lambda ignored: reactor.stop())
        return d
2

你可以使用treq身份验证功能,像这样:

d = treq.get(
    'http://...',
    auth=('username', 'password')
)
4

你可以在使用 Agent 发送请求的时候添加一些头信息(注意你提到的例子中的第29行)。

比如,如果你想进行基本的身份验证或者授权,可以试试下面这样的写法:

from base64 import b64encode
authorization = b64encode(b"username:password")
getting = agent.request(..., Headers({b"authorization": [b"Basic " + authorization]}))

撰写回答