为什么?异步运行不会增加循环中的任务数

2024-04-19 09:34:23 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在学习python asyncio
我能理解什么是事件循环,什么是协同程序….
在较小程度上,未来和任务意味着什么
我认为这个事件循环是通过一些方法来实现的,有些方法是将任务从协同程序中取出,然后在某种队列中对它们进行调度,然后逐个执行这些任务
我的问题是方法run_-in-executor
我试图理解一些python代码,以便将其转换为c++
在我理解的这段代码中:
>>br>>futures.append(executor.submit(do_work, symbol, day, files[symbol]))
然后这个新线程生成事件循环>;
csv.append(day, decompress(day, ***fetch_day(symbol, day)***))

`def fetch_day(symbol, day):
    local_data = threading.local()
    loop = getattr(local_data, 'loop', asyncio.new_event_loop())
    asyncio.set_event_loop(loop)
    ***loop = asyncio.get_event_loop()***#first event loop
    loop.set_debug(True)`  

计划24项任务,然后>;
`定义创建任务(符号,天):

^{pr2}$

每个任务
async def get(url):#each task with total24 of get tasks ***loop = asyncio.get_event_loop()***#i dont know if same loop or new one buffer = BytesIO() id = url[35:].replace('/', " ") start = time.time() Logger.info("Fetching {0}".format(id)) for i in range(ATTEMPTS): try: #z=asyncio.Task.all_tasks[0]############## ***res = await loop.run_in_executor(None, lambda: requests.get(url, stream=True))***#this loop if same loop why not increase number of tasks with each get run of the 24 gets>>we should have 48 futures in total

“这里我不知道它是生成新循环还是使用名为1st thread的同一个循环”
使用循环在executer中运行生成新线程的函数

我想知道它是否在制造新的循环
或者只是从额外的第一个线程使用相同的循环

如果使用相同的循环>;>>>
那么为什么在运行run-in-unu-executer协程之后,任务处理程序的数量没有增加呢

“我的理解是run-in-unu-executer协同程序使新任务添加到循环中,所以为什么它不增加循环的任务数

另一种想法是,还有一组未来独立于循环的任务只用于线程

我怎么能随时知道循环中等待的期货数量?在

代码是来自git hub的duka master

代码如下:

在主.py在

#!/usr/bin/env python3.5
import sys
import logging
import argparse
from datetime import date, timedelta

#from duka.app import app
#from duka.core import valid_date, set_up_signals
#from duka.core.utils import valid_timeframe, TimeFrame
from app import app
from core import valid_date, set_up_signals
from core.utils import valid_timeframe, TimeFrame
VERSION = '0.2.1'


def main():
    parser = argparse.ArgumentParser(prog='duka', usage='%(prog)s [options]')
    parser.add_argument('-v', '--version', action='version',
                        version='Version: %(prog)s-{version}'.format(version=VERSION))
    parser.add_argument('symbols', metavar='SYMBOLS', type=str, nargs='?',
                        help='symbol list using format EURUSD EURGBP', default=["GBPJPY"])
    parser.add_argument('-d', '--day', type=valid_date, help='specific day format YYYY-MM-DD (default today)',
                        default=date.today() - timedelta(1))
    parser.add_argument('-s', '--startdate', type=valid_date, help='start date format YYYY-MM-DD (default today)')
    parser.add_argument('-e', '--enddate', type=valid_date, help='end date format YYYY-MM-DD (default today)')
    parser.add_argument('-t', '--thread', type=int, help='number of threads (default 20)', default=5)
    parser.add_argument('-f', '--folder', type=str, help='destination folder (default .)', default='.')
    parser.add_argument('-c', '--candle', type=valid_timeframe,
                        help='use candles instead of ticks. Accepted values M1 M2 M5 M10 M15 M30 H1 H4',
                        default=TimeFrame.TICK)
    parser.add_argument('--header', action='store_true', help='include CSV header (default false)', default=False)
    args = parser.parse_args()

    if args.startdate is not None:
        start = args.startdate
    else:
        start = args.day

    if args.enddate is not None:
        end = args.enddate
    else:
        end = args.day

    # Configure logging to show the name of the thread
    # where the log message originates.
    logging.basicConfig(
        level=logging.DEBUG,
        format='%(threadName)10s %(name)18s: %(message)s',
        stream=sys.stderr,
    )
    #logging.basicConfig(level=logging.DEBUG)

    set_up_signals()
    app(args.symbols, start, end, args.thread, args.candle, args.folder, args.header)


if __name__ == '__main__':
    main()

在应用程序副本在

import concurrent
import threading
import time
from collections import deque
from datetime import timedelta, date

#from ..core import decompress, fetch_day, Logger
#from ..core.csv_dumper import CSVDumper
#from ..core.utils import is_debug_mode, TimeFrame
from core import decompress, fetch_day, Logger
from core.csv_dumper import CSVDumper
from core.utils import is_debug_mode, TimeFrame

SATURDAY = 5
day_counter = 0


def days(start, end):
    if start > end:
        return
    end = end + timedelta(days=1)
    today = date.today()
    while start != end:
        if start.weekday() != SATURDAY and start != today:
            yield start
        start = start + timedelta(days=1)


def format_left_time(seconds):
    if seconds < 0:
        return "--:--:--"
    m, s = divmod(seconds, 60)
    h, m = divmod(m, 60)
    return "%d:%02d:%02d" % (h, m, s)


def update_progress(done, total, avg_time_per_job, threads):
    progress = 1 if total == 0 else done / total
    progress = int((1.0 if progress > 1.0 else progress) * 100)
    remainder = 100 - progress
    estimation = (avg_time_per_job * (total - done) / threads)
    if not is_debug_mode():
        print('\r[{0}] {1}%  Left : {2}  '.format('#' * progress + '-' * remainder, progress,
                                                  format_left_time(estimation)), end='')


def how_many_days(start, end):
    return sum(1 for _ in days(start, end))


def avg(fetch_times):
    if len(fetch_times) != 0:
        return sum(fetch_times) / len(fetch_times)
    else:
        return -1


def name(symbol, timeframe, start, end):
    ext = ".csv"

    for x in dir(TimeFrame):
        if getattr(TimeFrame, x) == timeframe:
            ts_str = x

    name = symbol + "_" + ts_str + "_" + str(start)

    if start != end:
        name += "_" + str(end)

    return name + ext


def app(symbols, start, end, threads, timeframe, folder, header):
    if start > end:
        return
    lock = threading.Lock()
    global day_counter
    total_days = how_many_days(start, end)

    if total_days == 0:
        return

    last_fetch = deque([], maxlen=5)
    update_progress(day_counter, total_days, -1, threads)

    def do_work(symbol, day, csv):
        global day_counter
        star_time = time.time()
        Logger.info("Fetching day {0}".format(day))
        try:
            csv.append(day, decompress(day, ***fetch_day(symbol, day)***))
        except Exception as e:
            print("ERROR for {0}, {1} Exception : {2}".format(day, symbol, str(e)))
        elapsed_time = time.time() - star_time
        last_fetch.append(elapsed_time)
        with lock:
            day_counter += 1
        Logger.info("Day {0} fetched in {1}s".format(day, elapsed_time))

    futures = []

    with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:

        files = {symbol: CSVDumper(symbol, timeframe, start, end, folder, header) for symbol in symbols}

        for symbol in symbols:
            for day in days(start, end):
                ***futures.append(executor.submit(do_work, symbol, day, files[symbol]))***#>>>>>>>>>>first extra thread

        for future in concurrent.futures.as_completed(futures):
            if future.exception() is None:
                update_progress(day_counter, total_days, avg(last_fetch), threads)
            else:
                Logger.error("An error happen when fetching data : ", future.exception())

        Logger.info("Fetching data terminated")
        for file in files.values():
            file.dump()

    update_progress(day_counter, total_days, avg(last_fetch), threads)

在fetch.py在

import asyncio
import datetime
import threading
import time
from functools import reduce
from io import BytesIO, DEFAULT_BUFFER_SIZE

import requests

#from ..core.utils import Logger, is_dst
from core.utils import Logger, is_dst

URL = "https://www.dukascopy.com/datafeed/{currency}/{year}/{month:02d}/{day:02d}/{hour:02d}h_ticks.bi5"
ATTEMPTS = 5


***async def get(url):***#each task with total24 of get tasks
    ***loop = asyncio.get_event_loop()***#i dont know if same loop or new one
    buffer = BytesIO()
    id = url[35:].replace('/', " ")
    start = time.time()
    Logger.info("Fetching {0}".format(id))
    for i in range(ATTEMPTS):
        try:
            #z=asyncio.Task.all_tasks[0]##############
            ***res = await loop.run_in_executor(None, lambda: requests.get(url, stream=True))***#this loop if same loop why not increase number of tasks with each get run of the 24 gets>>we should have 48 futures in total
            if res.status_code == 200:
                for chunk in res.iter_content(DEFAULT_BUFFER_SIZE):
                    buffer.write(chunk)
                Logger.info("Fetched {0} completed in {1}s".format(id, time.time() - start))
                if len(buffer.getbuffer()) <= 0:
                    Logger.info("Buffer for {0} is empty ".format(id))
                return buffer.getbuffer()
            else:
                Logger.warn("Request to {0} failed with error code : {1} ".format(url, str(res.status_code)))
        except Exception as e:
            Logger.warn("Request {0} failed with exception : {1}".format(id, str(e)))
            time.sleep(0.5 * i)

    raise Exception("Request failed for {0} after ATTEMPTS attempts".format(url))


def create_tasks(symbol, day):

    start = 0

    if is_dst(day):
        start = 1

    url_info = {
        'currency': symbol,
        'year': day.year,
        'month': day.month - 1,
        'day': day.day
    }
    tasks = [asyncio.ensure_future(get(URL.format(**url_info, hour=i))) for i in range(0, 24)]

    # if is_dst(day):
    #     next_day = day + datetime.timedelta(days=1)
    #     url_info = {
    #         'currency': symbol,
    #         'year': next_day.year,
    #         'month': next_day.month - 1,
    #         'day': next_day.day
    #     }
    #     tasks.append(asyncio.ensure_future(get(URL.format(**url_info, hour=0))))
    return tasks


def fetch_day(symbol, day):
    local_data = threading.local()
    loop = getattr(local_data, 'loop', asyncio.new_event_loop())
    asyncio.set_event_loop(loop)
    ***loop = asyncio.get_event_loop()***#first event loop
    loop.set_debug(True)
    tasks = create_tasks(symbol, day)
    #z=asyncio.Task.all_tasks[0]##############
    ***loop.run_until_complete(asyncio.wait(tasks))***
    #y=asyncio.Task.all_tasks[0]##############
    def add(acc, task):
        acc.write(task.result())
        return acc

    return reduce(add, tasks, BytesIO()).getbuffer()

其他代码:

在这段代码中,run-in-tu thread在阻止列表中生成futures
那么在前面的代码中,这些任务在哪里???????????在

import asyncio
import concurrent.futures
import logging
import sys
import time


def blocks(n):
    log = logging.getLogger('blocks({})'.format(n))
    log.info('running')
    time.sleep(0.1)
    log.info('done')
    return n ** 2


async def run_blocking_tasks(executor):
    log = logging.getLogger('run_blocking_tasks')
    log.info('starting')

    log.info('creating executor tasks')
    loop = asyncio.get_event_loop()
    blocking_tasks = [
        loop.run_in_executor(executor, blocks, i)
        for i in range(6)
    ]
    log.info('waiting for executor tasks')
    completed, pending = await asyncio.wait(blocking_tasks)
    results = [t.result() for t in completed]
    log.info('results: {!r}'.format(results))

    log.info('exiting')


if __name__ == '__main__':
    # Configure logging to show the name of the thread
    # where the log message originates.
    logging.basicConfig(
        level=logging.INFO,
        format='%(threadName)10s %(name)18s: %(message)s',
        stream=sys.stderr,
    )

    # Create a limited thread pool.
    executor = concurrent.futures.ThreadPoolExecutor(
        max_workers=3,
    )

    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(
            run_blocking_tasks(executor)
        )
    finally:
        event_loop.close()

`


Tags: infromimportinfoloopasyncioformatif
1条回答
网友
1楼 · 发布于 2024-04-19 09:34:23

requestsasyncio不兼容。请改用^{}

import aiohttp
import asyncio
import async_timeout

async def fetch(session, url):
    with async_timeout.timeout(10):
        async with session.get(url) as response:
            return await response.text()

async def main(loop):
    async with aiohttp.ClientSession(loop=loop) as session:
        html = await fetch(session, 'http://python.org')
        print(html)

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))

相关问题 更多 >