如何使用httplib2进行双向证书认证
我正在使用httplib2库,从我的服务器向另一个网络服务发送请求。我们想要使用双向证书认证。我知道如何为发出的连接设置证书(h.set_certificate
),但是我该如何检查回应服务器使用的证书呢?
这个问题似乎表明httplib2本身并不支持这个功能,只给出了一些模糊的建议,告诉我可以去哪里查找。
这可能吗?我是不是得去更底层的地方动手呢?
2 个回答
2
也许自从你提问以来,情况已经有所变化。我现在可以使用 httplib2 版本0.7来实现相互认证,下面是我的做法:
import httplib2
h=httplib2.Http(ca_certs='ca.crt')
h.add_certificate(key='client_private_key.pem', cert='cert_client.pem', domain='')
try: resp, cont = h.request('https://mytest.com/cgi-bin/test.cgi')
except Exception as e: print e
注意这个 domain=''
参数,这是我唯一能让它正常工作的方式。
5
这是我同事Dave St. Germain写的代码,用来解决这个问题:
import ssl
import socket
from httplib2 import has_timeout
import httplib2
import socks
class CertificateValidationError(httplib2.HttpLib2Error):
pass
def validating_sever_factory(ca_cert_file):
# we need to define a closure here because we don't control
# the arguments this class is instantiated with
class ValidatingHTTPSConnection(httplib2.HTTPSConnectionWithTimeout):
def connect(self):
# begin copypasta from HTTPSConnectionWithTimeout
"Connect to a host on a given (SSL) port."
if self.proxy_info and self.proxy_info.isgood():
sock = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM)
sock.setproxy(*self.proxy_info.astuple())
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if has_timeout(self.timeout):
sock.settimeout(self.timeout)
sock.connect((self.host, self.port))
# end copypasta
try:
self.sock = ssl.wrap_socket(sock,
self.key_file,
self.cert_file,
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=ca_cert_file
)
except ssl.SSLError:
# we have to capture the exception here and raise later because
# httplib2 tries to ignore exceptions on connect
import sys
self._exc_info = sys.exc_info()
raise
else:
self._exc_info = None
# this might be redundant
server_cert = self.sock.getpeercert()
if not server_cert:
raise CertificateValidationError(repr(server_cert))
def getresponse(self):
if not self._exc_info:
return httplib2.HTTPSConnectionWithTimeout.getresponse(self)
else:
raise self._exc_info[1], None, self._exc_info[2]
return ValidatingHTTPSConnection
def do_request(url,
method='GET',
body=None,
headers=None,
keyfile=None,
certfile=None,
ca_certs=None,
proxy_info=None,
timeout=30):
"""
makes an http/https request, with optional client certificate and server
certificate verification.
returns response, content
"""
kwargs = {}
h = httplib2.Http(proxy_info=proxy_info, timeout=timeout)
is_ssl = url.startswith('https')
if is_ssl and ca_certs:
kwargs['connection_type'] = validating_sever_factory(ca_certs)
if is_ssl and keyfile and certfile:
h.add_certificate(keyfile, certfile, '')
return h.request(url, method=method, body=body, headers=headers, **kwargs)