如何在Python requests中合并多个适配器?

1 投票
2 回答
60 浏览
提问于 2025-04-13 12:52

我需要多次调用一个不太可靠的网络接口,所以我准备了一个超时重试的策略,并把它应用到一个 requests.Session() 对象上,作为适配器。

不过,我还需要用一个客户端的PKCS12证书和一个公共证书机构(用于验证)来挂载这个接口,这个我可以通过 requests_pkcs12 这个包轻松实现,它也会创建一个适配器。

所以,第一个适配器是这样的,(是我在网上找到的东西拼凑起来的):

# timeout.py

from requests.adapters import HTTPAdapter

DEFAULT_TIMEOUT = 30

class TimeoutAdapter(HTTPAdapter):
    def __init__(self, *args, **kwargs):
        self.timeout = DEFAULT_TIMEOUT
        if "timeout" in kwargs:
            self.timeout = kwargs["timeout"]
        super().__init__(*args, **kwargs)
    
    def send(self, request, **kwargs):
        timeout = kwargs.get("timeout")
        if timeout is None:
            kwargs["timeout"] = self.timeout
        return super().send(request, **kwargs)

## set up adapter
if __name__ == "__main__":
    
    from urllib3.util.retry import Retry
    
    u = 'https://some-webservice.com'
    s = requests.Session()
    retry_strategy = Retry(
        total=10,
        status_forcelist=[429, 500, 502, 503, 504],
        backoff_factor=1)
    s.mount(u, adapter=TimeoutAdapter(max_retries=retry_strategy)

然后,第二个适配器是这样的:

from requests import Session
from requests_pkcs12 import Pkcs12Adapter

pki_adapter = Pkcs12Adapter(
    pkcs12_filename='path/to/cert.p12',
    pkcs12_password='cert-password')

u = 'https://some-webservice.com'
s = requests.Session()
s.mount(u, adapter=pki_adapter)

这两个适配器单独使用都很好,但我该如何把它们合并成一个适配器,这样会话就可以用PKI进行认证,同时也能遵循超时重试的策略呢?

2 个回答

2

你有没有考虑过这样做呢?

from requests.adapters import HTTPAdapter
from requests_pkcs12 import Pkcs12Adapter
from urllib3.util.retry import Retry
import requests
import logging

DEFAULT_TIMEOUT = 30

class TimeoutPkcs12Adapter(HTTPAdapter):
    def __init__(self, pkcs12_filename, pkcs12_password, timeout=DEFAULT_TIMEOUT, **kwargs):
        self.timeout = timeout
        self.pkcs12_adapter = Pkcs12Adapter(pkcs12_filename=pkcs12_filename, pkcs12_password=pkcs12_password)
        super().__init__(**kwargs)

    def init_poolmanager(self, connections, maxsize, block=False, **pool_kwargs):
        # Load certificate details into pool manager
        pool_kwargs['cert_file'] = self.pkcs12_adapter.cert_file
        pool_kwargs['key_file'] = self.pkcs12_adapter.key_file
        super().init_poolmanager(connections, maxsize, block, **pool_kwargs)

    def send(self, request, **kwargs):
        kwargs.setdefault('timeout', self.timeout)
        try:
            return super().send(request, **kwargs)
        except requests.exceptions.RequestException as e:
            logging.error(f"Request failed: {e}")
            raise

if __name__ == "__main__":
    url = 'https://some-webservice.com'
    session = requests.Session()
    retry_strategy = Retry(total=10, status_forcelist=[429, 500, 502, 503, 504], backoff_factor=1)
    adapter = TimeoutPkcs12Adapter(
        pkcs12_filename='path/to/cert.p12',
        pkcs12_password='cert-password',
        max_retries=retry_strategy
    )
    session.mount(url, adapter)

这样可以通过继承来把两个适配器结合起来。

更新:之前的代码有个“钻石问题”。

2

假设你想要TimeoutAdapter的默认超时行为,那么因为TimeoutAdapterPkcs12Adapter都是从HTTPAdapter这个类派生出来的,你可以让前者从后者继承(也就是单继承)。比如:

# timeout.py

from requests import Session
from requests_pkcs12 import Pkcs12Adapter

DEFAULT_TIMEOUT = 30

class TimeoutAdapter(Pkcs12Adapter):
    def __init__(self, *args, **kwargs):
        self.timeout = DEFAULT_TIMEOUT
        if "timeout" in kwargs:
            self.timeout = kwargs["timeout"]
        super().__init__(*args, **kwargs)
    
    def send(self, request, **kwargs):
        timeout = kwargs.get("timeout")
        if timeout is None:
            kwargs["timeout"] = self.timeout
        return super().send(request, **kwargs)

## set up adapter
if __name__ == "__main__":
    
    from urllib3.util.retry import Retry
    
    retry_strategy = Retry(
        total=10,
        status_forcelist=[429, 500, 502, 503, 504],
        backoff_factor=1)

    timeout_adapter = TimeoutAdapter(
        max_retries=retry_strategy,
        pkcs12_filename='path/to/cert.p12',
        pkcs12_password='cert-password'
    )

    u = 'https://some-webservice.com'
    s = requests.Session()
    s.mount(u, adapter=timeout_adapter)

撰写回答