在python中调用CosmosClient(url、key、proxy_config=d)时,我遇到了错误?

2024-04-27 03:29:47 发布

您现在位置:Python中文网/ 问答频道 /正文

使用客户端配置和执行对Azure Cosmos DB服务的请求时

`
d=ProxyConfiguration()
d.Host='string'
d.Port=int
client = CosmosClient(url,key,proxy_config = d) `

我犯了一个错误

azure.core.exceptions.ServiceRequestError:<urllib3.connection.HTTPSConnection object at 0x0000020EC3BCCD60>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed

将值传递给proxy_config参数的正确方法是什么

CosmoClient


Tags: clientconfighosturl客户端dbstringport
1条回答
网友
1楼 · 发布于 2024-04-27 03:29:47

您可以下载官方SDK进行测试,我认为您应该使用CosmosClientConnection而不是CosmosClient

proxy_tests.py

#The MIT License (MIT)
#Copyright (c) 2014 Microsoft Corporation

#Permission is hereby granted, free of charge, to any person obtaining a copy
#of this software and associated documentation files (the "Software"), to deal
#in the Software without restriction, including without limitation the rights
#to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
#copies of the Software, and to permit persons to whom the Software is
#furnished to do so, subject to the following conditions:

#The above copyright notice and this permission notice shall be included in all
#copies or substantial portions of the Software.

#THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
#OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
#SOFTWARE.

import unittest
import pytest
import platform
import azure.cosmos.documents as documents
import azure.cosmos._cosmos_client_connection as cosmos_client_connection
import test_config
import six
if six.PY2:
    from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
else:
    from http.server import BaseHTTPRequestHandler, HTTPServer
from threading import Thread
from azure.core.exceptions import ServiceRequestError

pytestmark = pytest.mark.cosmosEmulator

@pytest.mark.usefixtures("teardown")
class CustomRequestHandler(BaseHTTPRequestHandler):
    database_name = None
    def _set_headers(self):
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()

    def _send_payload(self):
        self._set_headers()
        payload = "{\"id\":\"" + self.database_name + "\", \"_self\":\"self_link\"}"
        if six.PY2:
            self.wfile.write(payload)
        else:
            self.wfile.write(bytes(payload, "utf-8"))

    def do_GET(self):
        self._send_payload()

    def do_POST(self):
        self._send_payload()

class Server(Thread):
    def __init__(self, database_name, PORT):
        Thread.__init__(self)
        server_address = ('', PORT)
        CustomRequestHandler.database_name = database_name
        self.httpd = HTTPServer(server_address, CustomRequestHandler)

    def run(self):
        self.httpd.serve_forever()

    def shutdown(self):
        self.httpd.shutdown()

class ProxyTests(unittest.TestCase):
    """Proxy Tests.
    """
    host = 'http://localhost:8081'
    masterKey = test_config._test_config.masterKey
    testDbName = 'sample database'
    serverPort = 8089

    @classmethod
    def setUpClass(cls):
        global server
        global connection_policy
        server = Server(cls.testDbName, cls.serverPort)
        server.start()
        connection_policy = documents.ConnectionPolicy() 
        connection_policy.ProxyConfiguration = documents.ProxyConfiguration() 
        connection_policy.ProxyConfiguration.Host = 'http://127.0.0.1' 

    @classmethod
    def tearDownClass(cls):
        server.shutdown()

    def test_success_with_correct_proxy(self):
        if platform.system() == 'Darwin':
            pytest.skip("TODO: Connection error raised on OSX")
        connection_policy.ProxyConfiguration.Port = self.serverPort 
        client = cosmos_client_connection.CosmosClientConnection(self.host, {'masterKey': self.masterKey}, connection_policy)
        created_db = client.CreateDatabase({ 'id': self.testDbName })
        self.assertEqual(created_db['id'], self.testDbName, msg="Database id is incorrect")

    def test_failure_with_wrong_proxy(self):
        connection_policy.ProxyConfiguration.Port = self.serverPort + 1
        try:
            # client does a getDatabaseAccount on initialization, which fails
            client = cosmos_client_connection.CosmosClientConnection(self.host, {'masterKey': self.masterKey}, connection_policy)
            self.fail("Client instantiation is not expected")
        except Exception as e:
            self.assertTrue(type(e) is ServiceRequestError, msg="Error is not a ServiceRequestError")

if __name__ == "__main__":
    #import sys;sys.argv = ['', 'Test.testName']
    unittest.main()

相关问题 更多 >