Python请求线程/进程与I

2024-04-18 18:43:59 发布

您现在位置:Python中文网/ 问答频道 /正文

我通过HTTP连接到一个本地服务器(OSRM),以提交路由并返回驱动器时间。我注意到,I/O比线程慢,因为计算的等待时间似乎比发送请求和处理JSON输出所需的时间短(我认为,当服务器需要一些时间来处理您的请求时,I/O会更好—gt;您不希望它阻塞,因为您必须等待,这不是我的情况)。线程受到全局解释器锁的影响,因此(和下面的证据)我最快的选择是使用多处理。在

多处理的问题是它太快了以至于耗尽了我的套接字,我得到了一个错误(请求每次都发出一个新的连接)。我可以(串行)使用请求。会话()对象使连接保持活动状态,但我无法使此并行工作(每个进程都有自己的会话)。在

我目前最接近的代码是这个多处理代码:

conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=cpu_count())

def ReqOsrm(url_input):
    ul, qid = url_input      
    try:
        response = conn_pool.request('GET', ul)
        json_geocode = json.loads(response.data.decode('utf-8'))
        status = int(json_geocode['status'])
        if status == 200:
            tot_time_s = json_geocode['route_summary']['total_time']
            tot_dist_m = json_geocode['route_summary']['total_distance']
            used_from, used_to = json_geocode['via_points']
            out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
            return out
        else:
            print("Done but no route: %d %s" % (qid, req_url))
            return [qid, 999, 0, 0, 0, 0, 0, 0]
    except Exception as err:
        print("%s: %d %s" % (err, qid, req_url))
        return [qid, 999, 0, 0, 0, 0, 0, 0]

# run:
pool = Pool(cpu_count())
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()

但是,我无法使HTTPConnectionPool正常工作,它每次都创建新的套接字(我想),然后给我错误:

HTTPConnectionPool(host='127.0.0.1', port=5005): Max retries exceeded with url: /viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))


我的目标是从本地运行的OSRM-routing server获得距离计算(尽可能快)。在

我有一个问题分为两部分-基本上,我试图转换一些代码使用多处理.池()以便更好地编写代码(适当的异步函数-这样执行就不会中断,并且运行得越快越好)。在

我遇到的问题是,我尝试的每件事似乎都比多处理慢(我在下面给出了几个我尝试过的例子)。在

一些潜在的方法有:gevents、grequests、tornado、requests futures、asyncio等。

A-多处理.池()

我最初是这样开始的:

^{pr2}$

在那里我连接到一个本地服务器(localhost,端口:5005)它在8个线程和supports parallel execution上启动。在

经过一番搜索,我意识到我得到的错误是因为请求是opening a new connection/socket for each-request。所以这实际上太快了,过了一段时间就让套接字筋疲力尽了。似乎解决这个问题的方法是使用请求。会话()—但是,我无法在多处理(每个进程都有自己的会话)下使用它。

问题1。

在某些计算机上运行良好,例如:

enter image description here

与以后相比:45%的服务器使用率和每秒1700个请求

但是,在某些情况下,我不完全理解为什么:

HTTPConnectionPool(host='127.0.0.1', port=5000): Max retries exceeded with url: /viaroute?loc=49.34343,3.30199&loc=49.56655,3.25837&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))

我的猜测是,由于请求在套接字使用时锁定它——有时服务器响应旧请求的速度太慢,因此会生成新的请求。服务器支持排队,但是请求不支持,而不是添加到队列我得到错误?在

问题2。

我发现:

Blocking Or Non-Blocking?

With the default Transport Adapter in place, Requests does not provide any kind of non-blocking IO. The Response.content property will block until the entire response has been downloaded. If you require more granularity, the streaming features of the library (see Streaming Requests) allow you to retrieve smaller quantities of the response at a time. However, these calls will still block.

If you are concerned about the use of blocking IO, there are lots of projects out there that combine Requests with one of Python’s asynchronicity frameworks.

Two excellent examples are grequests and requests-futures.

B-请求期货

为了解决这个问题,我需要重写代码以使用异步请求,因此我尝试使用以下方法:

from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed

(顺便说一下,我启动服务器时可以选择使用所有线程)

主要代码是:

calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
    # Submit requests and process in background
    for i in range(len(url_routes)):
        url_in, qid = url_routes[i]  # url |query-id
        future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
        futures[future] = qid
    # Process the futures as they become complete
    for future in as_completed(futures):
        r = future.result()
        try:
            row = [futures[future]] + r.data
        except Exception as err:
            print('No route')
            row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
        calc_routes.append(row)

其中,我的函数(ReqOsrm)现在重写为:

def ReqOsrm(sess, resp):
    json_geocode = resp.json()
    status = int(json_geocode['status'])
    # Found route between points
    if status == 200:
        tot_time_s = json_geocode['route_summary']['total_time']
        tot_dist_m = json_geocode['route_summary']['total_distance']
        used_from = json_geocode['via_points'][0]
        used_to = json_geocode['via_points'][1]
        out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
    # Cannot find route between points (code errors as 999)
    else:
        out = [999, 0, 0, 0, 0, 0, 0]
    resp.data = out

然而,这段代码比多处理代码慢多了!以前我每秒收到1700个请求,现在我有600秒。我想这是因为我没有充分的CPU利用率,但我不知道如何提高它?在

enter image description here

C-线程

我尝试了另一种方法(creating threads)-但是又一次不确定如何使CPU使用率最大化(理想情况下,我希望看到我的服务器使用50%,不是吗?)公司名称:

def doWork():
    while True:
        url,qid = q.get()
        status, resp = getReq(url)
        processReq(status, resp, qid)
        q.task_done()

def getReq(url):
    try:
        resp = requests.get(url)
        return resp.status_code, resp
    except:
        return 999, None

def processReq(status, resp, qid):
    try:
        json_geocode = resp.json()
        # Found route between points
        if status == 200:
            tot_time_s = json_geocode['route_summary']['total_time']
            tot_dist_m = json_geocode['route_summary']['total_distance']
            used_from = json_geocode['via_points'][0]
            used_to = json_geocode['via_points'][1]
            out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
        else:
            print("Done but no route")
            out = [qid, 999, 0, 0, 0, 0, 0, 0]
    except Exception as err:
        print("Error: %s" % err)
        out = [qid, 999, 0, 0, 0, 0, 0, 0]
    qres.put(out)
    return

#Run:
concurrent = 1000
qres = Queue()
q = Queue(concurrent)

for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in url_routes:
        q.put(url)
        q.join()
    except Exception:
        pass

# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]

这个方法比我认为的requests_futures要快,但是我不知道要设置多少个线程来最大化-

enter image description here

D-龙卷风(不工作)

我现在正在尝试tornado-但是无法完全使其正常工作-如果我使用curl,它会与exist代码-1073741819发生冲突-如果我使用简单的\u httpclient,它可以工作,但是我会得到超时错误:

ERROR:tornado.application:Multiple exceptions in yield list Traceback (most recent call last): File "C:\Anaconda3\lib\site-packages\tornado\gen.py", line 789, in callback result_list.append(f.result()) File "C:\Anaconda3\lib\site-packages\tornado\concurrent.py", line 232, in result raise_exc_info(self._exc_info) File "", line 3, in raise_exc_info tornado.httpclient.HTTPError: HTTP 599: Timeout

def handle_req(r):
    try:
        json_geocode = json_decode(r)
        status = int(json_geocode['status'])
        tot_time_s = json_geocode['route_summary']['total_time']
        tot_dist_m = json_geocode['route_summary']['total_distance']
        used_from = json_geocode['via_points'][0]
        used_to = json_geocode['via_points'][1]
        out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
        print(out)
    except Exception as err:
        print(err)
        out = [999, 0, 0, 0, 0, 0, 0]
    return out

# Configure
# For some reason curl_httpclient crashes my computer
AsyncHTTPClient.configure("tornado.simple_httpclient.SimpleAsyncHTTPClient", max_clients=10)

@gen.coroutine
def run_experiment(urls):
    http_client = AsyncHTTPClient()
    responses = yield [http_client.fetch(url) for url, qid in urls]
    responses_out = [handle_req(r.body) for r in responses]
    raise gen.Return(value=responses_out)

# Initialise
_ioloop = ioloop.IOLoop.instance()
run_func = partial(run_experiment, url_routes)
calc_routes = _ioloop.run_sync(run_func)

E-异步/aiohttp

决定尝试使用asyncio和aiohttp来尝试另一种方法(虽然让tornado工作起来很好)。在

import asyncio
import aiohttp

def handle_req(data, qid):
    json_geocode = json.loads(data.decode('utf-8'))
    status = int(json_geocode['status'])
    if status == 200:
        tot_time_s = json_geocode['route_summary']['total_time']
        tot_dist_m = json_geocode['route_summary']['total_distance']
        used_from = json_geocode['via_points'][0]
        used_to = json_geocode['via_points'][1]
        out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
    else:
        print("Done, but not route for {0} - status: {1}".format(qid, status))
        out = [qid, 999, 0, 0, 0, 0, 0, 0]
    return out

def chunked_http_client(num_chunks):
    # Use semaphore to limit number of requests
    semaphore = asyncio.Semaphore(num_chunks)
    @asyncio.coroutine
    # Return co-routine that will download files asynchronously and respect
    # locking fo semaphore
    def http_get(url, qid):
        nonlocal semaphore
        with (yield from semaphore):
            response = yield from aiohttp.request('GET', url)
            body = yield from response.content.read()
            yield from response.wait_for_close()
        return body, qid
    return http_get

def run_experiment(urls):
    http_client = chunked_http_client(500)
    # http_client returns futures
    # save all the futures to a list
    tasks = [http_client(url, qid) for url, qid in urls]
    response = []
    # wait for futures to be ready then iterate over them
    for future in asyncio.as_completed(tasks):
        data, qid = yield from future
        try:
            out = handle_req(data, qid)
        except Exception as err:
            print("Error for {0} - {1}".format(qid,err))
            out = [qid, 999, 0, 0, 0, 0, 0, 0]
        response.append(out)
    return response

# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))

这可以工作,但仍然比多处理慢!在

enter image description here


Tags: toinfromjsonurlfortimestatus
3条回答

看看问题顶部的多处理代码。似乎每次调用rekosrm时都会调用一个HttpConnectionPool()。因此,将为每个url创建一个新的池。相反,使用initializerargs参数为每个进程创建一个池。在

conn_pool = None

def makePool(host, port):
    global conn_pool
    pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1)

def ReqOsrm(url_input):
    ul, qid = url_input

    try:
        response = conn_pool.request('GET', ul)
        json_geocode = json.loads(response.data.decode('utf-8'))
        status = int(json_geocode['status'])
        if status == 200:
            tot_time_s = json_geocode['route_summary']['total_time']
            tot_dist_m = json_geocode['route_summary']['total_distance']
            used_from, used_to = json_geocode['via_points']
            out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
            return out

        else:
            print("Done but no route: %d %s" % (qid, req_url))
            return [qid, 999, 0, 0, 0, 0, 0, 0]

    except Exception as err:
        print("%s: %d %s" % (err, qid, req_url))
        return [qid, 999, 0, 0, 0, 0, 0, 0]

if __name__ == "__main__":
    # run:
    pool = Pool(initializer=makePool, initargs=('127.0.0.1', 5005))
    calc_routes = pool.map(ReqOsrm, url_routes)
    pool.close()
    pool.join()

requestfutures版本似乎有一个缩进错误。循环 for future in as_completed(futures):在外循环下缩进 for i in range(len(url_routes)):。因此,在外循环中发出一个请求,然后在外循环的下一次迭代之前,内循环等待该未来返回。这使得请求以串行方式而不是并行地运行。在

我认为代码应该如下:

^{2}$

问题1

您会得到错误,因为这种方法:

def ReqOsrm(url_input):
    req_url, query_id = url_input
    try_c = 0
    #print(req_url)
    while try_c < 5:
        try:
            response = requests.get(req_url)
            json_geocode = response.json()
            status = int(json_geocode['status'])
            # Found route between points
            if status == 200:
            ....

pool = Pool(cpu_count()-1) 
calc_routes = pool.map(ReqOsrm, url_routes)

为每个请求的URL创建一个新的TCP连接,在某些情况下它会失败,因为系统没有可用的本地端口。要确认您可以在代码执行期间运行netstat,请执行以下操作:

^{2}$

这将为您提供许多到服务器的连接。在

另外,对于这种方法来说,达到1700 RPS看起来是不现实的,因为requests.get是相当昂贵的操作,而且不太可能通过这种方式获得50个RPS。所以,你可能需要再次检查你的RPS计算。在

要避免此错误,您需要使用会话而不是从头开始创建连接:

import multiprocessing
import requests
import time


class Worker(multiprocessing.Process):
    def __init__(self, qin, qout, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self.qin = qin
        self.qout = qout

    def run(self):
        s = requests.session()
        while not self.qin.empty():
            result = s.get(self.qin.get())
            self.qout.put(result)
            self.qin.task_done()

if __name__ == '__main__':
    start = time.time()

    qin = multiprocessing.JoinableQueue()
    [qin.put('http://localhost:8080/') for _ in range(10000)]

    qout = multiprocessing.Queue()

    [Worker(qin, qout).start() for _ in range(multiprocessing.cpu_count())]

    qin.join()

    result = []
    while not qout.empty():
        result.append(qout.get())

    print time.time() - start
    print result

问题2

线程或异步方法不会获得更高的RPS,除非I/O比计算花费更多的时间(例如,高网络延迟、大响应等),因为线程受GIL的影响,因为在同一Python进程中运行,异步lib可能会被长时间运行的计算阻塞。在

尽管线程或异步lib可以提高性能,但是在多个进程中运行相同的线程化或异步代码将使您获得更高的性能。在

谢谢大家的帮助。我想发表我的结论:

由于我的HTTP请求是发送到本地服务器的,它会立即处理请求,所以使用异步方法(与大多数通过internet发送请求的情况相比)没有多大意义。对我来说,代价高昂的因素实际上是发送请求和处理反馈,这意味着我可以使用多个进程获得更好的速度(线程受到GIL的影响)。我还应该使用会话来提高速度(无需关闭和重新打开到同一服务器的连接),并帮助防止端口耗尽。在

以下是使用示例RPS尝试(工作)的所有方法:

串行

S1。串行GET请求(无会话)->215 RPS

def ReqOsrm(data):
    url, qid = data
    try:
        response = requests.get(url)
        json_geocode = json.loads(response.content.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        return [qid, 999, 0, 0]
# Run:      
calc_routes = [ReqOsrm(x) for x in url_routes]

S2。串行GET请求(请求。会话())——335转/秒

^{2}$

S3。串行获取请求(urllib3.HTTPConnectionPool)>;545 RPS

conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=1)
def ReqOsrm(data):
    url, qid = data
    try:
        response = conn_pool.request('GET', url)
        json_geocode = json.loads(response.data.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        return [qid, 999, 0, 0]
# Run:      
calc_routes = [ReqOsrm(x) for x in url_routes]

异步IO

A4。带aiohttp的异步->450 RPM

import asyncio
import aiohttp
concurrent = 100
def handle_req(data, qid):
    json_geocode = json.loads(data.decode('utf-8'))
    tot_time_s = json_geocode['paths'][0]['time']
    tot_dist_m = json_geocode['paths'][0]['distance']
    return [qid, 200, tot_time_s, tot_dist_m]
def chunked_http_client(num_chunks):
    # Use semaphore to limit number of requests
    semaphore = asyncio.Semaphore(num_chunks)
    @asyncio.coroutine
    # Return co-routine that will download files asynchronously and respect
    # locking fo semaphore
    def http_get(url, qid):
        nonlocal semaphore
        with (yield from semaphore):
            with aiohttp.ClientSession() as session:
                response = yield from session.get(url)
                body = yield from response.content.read()
                yield from response.wait_for_close()
        return body, qid
    return http_get
def run_experiment(urls):
    http_client = chunked_http_client(num_chunks=concurrent)
    # http_client returns futures, save all the futures to a list
    tasks = [http_client(url, qid) for url, qid in urls]
    response = []
    # wait for futures to be ready then iterate over them
    for future in asyncio.as_completed(tasks):
        data, qid = yield from future
        try:
            out = handle_req(data, qid)
        except Exception as err:
            print("Error for {0} - {1}".format(qid,err))
            out = [qid, 999, 0, 0]
        response.append(out)
    return response
# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))

A5。无会话线程->330 RPS

from threading import Thread
from queue import Queue
concurrent = 100
def doWork():
    while True:
        url,qid = q.get()
        status, resp = getReq(url)
        processReq(status, resp, qid)
        q.task_done()
def getReq(url):
    try:
        resp = requests.get(url)
        return resp.status_code, resp
    except:
        return 999, None
def processReq(status, resp, qid):
    try:
        json_geocode = json.loads(resp.content.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        out = [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err, qid, url)
        out = [qid, 999, 0, 0]
    qres.put(out)
    return
#Run:
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
for url in url_routes:
    q.put(url)
q.join()
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]

A6。使用HTTPConnectionPool执行线程->1550 RPS

from threading import Thread
from queue import Queue
from urllib3 import HTTPConnectionPool
concurrent = 100
conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=concurrent)
def doWork():
    while True:
        url,qid = q.get()
        status, resp = getReq(url)
        processReq(status, resp, qid)
        q.task_done()
def getReq(url):
    try:
        resp = conn_pool.request('GET', url)
        return resp.status, resp
    except:
        return 999, None
def processReq(status, resp, qid):
    try:
        json_geocode = json.loads(resp.data.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        out = [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err, qid, url)
        out = [qid, 999, 0, 0]
    qres.put(out)
    return
#Run:
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
for url in url_routes:
    q.put(url)
q.join()
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]

A7。请求期货->520 RPS

from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed
concurrent = 100
def ReqOsrm(sess, resp):
    try:
        json_geocode = resp.json()
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        out = [200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err)
        out = [999, 0, 0]
    resp.data = out
#Run:
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=concurrent)) as session:
    # Submit requests and process in background
    for i in range(len(url_routes)):
        url_in, qid = url_routes[i]  # url |query-id
        future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
        futures[future] = qid
    # Process the futures as they become complete
    for future in as_completed(futures):
        r = future.result()
        try:
            row = [futures[future]] + r.data
        except Exception as err:
            print('No route')
            row = [futures[future], 999, 0, 0]
        calc_routes.append(row)

多个流程

第8页。多处理.worker+队列+请求.会话()->1058转/秒

from multiprocessing import *
class Worker(Process):
    def __init__(self, qin, qout, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self.qin = qin
        self.qout = qout
    def run(self):
        s = requests.session()
        while not self.qin.empty():
            url, qid = self.qin.get()
            data = s.get(url)
            self.qout.put(ReqOsrm(data, qid))
            self.qin.task_done()
def ReqOsrm(resp, qid):
    try:
        json_geocode = json.loads(resp.content.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err, qid)
        return [qid, 999, 0, 0]
# Run:
qout = Queue()
qin = JoinableQueue()
[qin.put(url_q) for url_q in url_routes]
[Worker(qin, qout).start() for _ in range(cpu_count())]
qin.join()
calc_routes = []
while not qout.empty():
    calc_routes.append(qout.get())

第9页。多处理.worker+queue+HTTPConnectionPool()->;1230转/秒

第10页。多处理v2(不确定这有什么不同)>;1350 RPS

conn_pool = None
def makePool(host, port):
    global conn_pool
    pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1)
def ReqOsrm(data):
    url, qid = data
    try:
        response = conn_pool.request('GET', url)
        json_geocode = json.loads(response.data.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err, qid, url)
        return [qid, 999, 0, 0]
# Run:
pool = Pool(initializer=makePool, initargs=(ghost, gport))
calc_routes = pool.map(ReqOsrm, url_routes)

总之,对我来说最好的方法是10种(令人惊讶的是6种)

相关问题 更多 >