通过线程并行化缓慢的api调用

2024-04-19 13:12:22 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在运行一个python脚本,它调用一个依赖于慢api的函数,而慢api又调用另一个同样依赖于同一慢api的函数。我想加快速度。你知道吗

最好的办法是什么?线程模块?如果是,请举例说明。关于线程,我注意到的一点是,你似乎无法从线程中检索返回值。。。我的大部分脚本都是用来打印函数的返回值的。。你知道吗

下面是我试图提高I/O性能的代码

def get_price_eq(currency, rate):

    if isAlt(currency) == False:
        currency = currency.upper()
        price_eq = 'btc_in_usd*USD_in_'+str(currency)+'*'+str(rate)
        #print price_eq
        return price_eq 
    else:
        currency = currency.lower()
        price_eq = 'poloniex'+ str(currency) + '_close' + '*' + str(rate)
        print(price_eq)
        return price_eq

def get_btcprice_fiat(price_eq):

    query = '/api/equation/'+price_eq
    try:
        conn = api.hmac(hmac_key, hmac_secret)
        btcpricefiat = conn.call('GET', query).json()
    except requests.exceptions.RequestException as e:  # This is the correct syntax
            print(e)
    return float(btcpricefiat['data'])

usdbal = float(bal) * get_btcprice_fiat(get_price_eq('USD', 1))
egpbal = float(bal) * get_btcprice_fiat(get_price_eq('EGP', 1))
rsdbal = float(bal) * get_btcprice_fiat(get_price_eq('RSD', 1))
eurbal = float(bal) * get_btcprice_fiat(get_price_eq('EUR', 1))

如您所见,我调用get\u btc\u price,它从一个数据供应商那里调用一个缓慢的api,并传入另一个函数的结果,这个函数使用另一个api调用,我做了4次以上,我在寻找提高这个函数性能的方法?另外,我读到的一件事是,你不能从线程返回值?我的大多数代码返回值,然后打印给用户,我如何处理这个?你知道吗


Tags: 函数apigetratefloat线程pricecurrency
1条回答
网友
1楼 · 发布于 2024-04-19 13:12:22

python3具有Launching parallel tasks功能。这使我们的工作更容易。你知道吗

它对thread poolingProcess pooling有。你知道吗

下面给出了一个见解:

线程池执行器示例

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

处理池执行器

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

对于Python2.7,它将如下所示:

import thread
import time

# Define a function for the thread
def print_time( threadName, delay):
   count = 0
   while count < 5:
      time.sleep(delay)
      count += 1
      print "%s: %s" % ( threadName, time.ctime(time.time()) )

# Create two threads as follows
try:
   thread.start_new_thread( print_time, ("Thread-1", 2, ) )
   thread.start_new_thread( print_time, ("Thread-2", 4, ) )
except:
   print "Error: unable to start thread"

输出:

Thread-1: Thu Jan 22 15:42:17 2009
Thread-1: Thu Jan 22 15:42:19 2009
Thread-2: Thu Jan 22 15:42:19 2009
Thread-1: Thu Jan 22 15:42:21 2009
Thread-2: Thu Jan 22 15:42:23 2009
Thread-1: Thu Jan 22 15:42:23 2009
Thread-1: Thu Jan 22 15:42:25 2009
Thread-2: Thu Jan 22 15:42:27 2009
Thread-2: Thu Jan 22 15:42:31 2009
Thread-2: Thu Jan 22 15:42:35 2009

因此,在您的例子中,Python 3将如下所示:

data = ['USD', 'EGP', 'RSD', 'EUR']
def helper_func(price_eq):
    return float(bal) * get_btcprice_fiat(get_price_eq(price_eq))


def main():
    res_dict = {}
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        for vals, res in zip(PRIMES, executor.map(helper_func, data)):
            res_dict[vals] = res

if __name__ == '__main__':
    main()

因此,在您的情况下,Python2.7将如下所示:

data = ['USD', 'EGP', 'RSD', 'EUR']
final_dict = {}
def helper_func(price_eq):
    final_dict[price_eq] = float(bal) * get_btcprice_fiat(get_price_eq(price_eq))

for val in data:
    try:
        thread.start_new_thread(helper_func, (val))
    except:
        print "Error: unable to start thread for %s" % (val)

相关问题 更多 >