Asyncio 获取邮件 python3

8 投票
2 回答
4909 浏览
提问于 2025-04-18 15:20

我正在测试asyncio模块,不过我需要一些提示或建议,如何以异步的方式获取大邮件。

我有一个包含邮箱用户名和密码的列表。

data = [
    {'usern': 'foo@bar.de', 'passw': 'x'},
    {'usern': 'foo2@bar.de', 'passw': 'y'},
    {'usern': 'foo3@bar.de', 'passw': 'z'} (...)
]

我考虑过:

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([get_attachment(d) for d in data]))
loop.close()

不过,下载邮件附件的过程比较长。

邮件:

@asyncio.coroutine
def get_attachment(d):
    username = d['usern']
    password = d['passw']

    connection = imaplib.IMAP4_SSL('imap.bar.de')
    connection.login(username, password)
    connection.select()

    # list all available mails
    typ, data = connection.search(None, 'ALL')

    for num in data[0].split():
        # fetching each mail
        typ, data = connection.fetch(num, '(RFC822)')
        raw_string = data[0][1].decode('utf-8')
        msg = email.message_from_string(raw_string)
        for part in msg.walk():
            if part.get_content_maintype() == 'multipart':
                continue

            if part.get('Content-Disposition') is None:
                continue

            if part.get_filename():
                body = part.get_payload(decode=True)
                # do something with the body, async?

    connection.close()
    connection.logout()

我该如何以异步的方式处理所有邮件(下载附件)呢?

2 个回答

6

我也有一样的需求:用Python 3完全异步地获取邮件。如果这里还有其他人感兴趣,我在这里发布了一个asyncio的IMAP库:https://github.com/bamthomas/aioimaplib

你可以这样使用它:

import asyncio
from aioimaplib import aioimaplib

@asyncio.coroutine
def wait_for_new_message(host, user, password):
    imap_client = aioimaplib.IMAP4(host=host)
    yield from imap_client.wait_hello_from_server()

    yield from imap_client.login(user, password)
    yield from imap_client.select()

    asyncio.async(imap_client.idle())
    id = 0
    while True:
        msg = yield from imap_client.wait_server_push()
        print('--> received from server: %s' % msg)
        if 'EXISTS' in msg:
            id = msg.split()[0]
            imap_client.idle_done()
            break

    result, data = yield from imap_client.fetch(id, '(RFC822)')
    email_message = email.message_from_bytes(data[0])

    attachments = []
    body = ''
    for part in email_message.walk():
        if part.get_content_maintype() == 'multipart':
            continue
        if part.get_content_maintype() == 'text' and 'attachment' not in part.get('Content-Disposition', ''):
            body = part.get_payload(decode=True).decode(part.get_param('charset', 'ascii')).strip()
        else:
            attachments.append(
                {'type': part.get_content_type(), 'filename': part.get_filename(), 'size': len(part.as_bytes())})

    print('attachments : %s' % attachments)
    print('body : %s' % body)
    yield from imap_client.logout()



if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(wait_for_new_message('my.imap.server', 'user', 'pass'))

大邮件和附件也可以通过asyncio下载。

7

如果你没有基于异步输入输出的imap库,可以使用一个叫做concurrent.futures.ThreadPoolExecutor的东西来在多个线程中处理输入输出。Python在进行输入输出时会释放全局解释器锁(GIL),这样你就能真正实现并发处理:

def init_connection(d):    
    username = d['usern']
    password = d['passw']

    connection = imaplib.IMAP4_SSL('imap.bar.de')
    connection.login(username, password)
    connection.select()
    return connection

local = threading.local() # We use this to get a different connection per thread
def do_fetch(num, d, rfc):
    try:
        connection = local.connection
    except AttributeError:
        connnection = local.connection = init_connection(d)
    return connnection.fetch(num, rfc)

@asyncio.coroutine
def get_attachment(d, pool):
    connection = init_connection(d)    
    # list all available mails
    typ, data = connection.search(None, 'ALL')

    # Kick off asynchronous tasks for all the fetches
    loop = asyncio.get_event_loop()
    futs = [asyncio.create_task(loop.run_in_executor(pool, do_fetch, num, d, '(RFC822)'))
                for num in data[0].split()]

    # Process each fetch as it completes
    for fut in asyncio.as_completed(futs):
        typ, data = yield from fut
        raw_string = data[0][1].decode('utf-8')
        msg = email.message_from_string(raw_string)
        for part in msg.walk():
            if part.get_content_maintype() == 'multipart':
                continue

            if part.get('Content-Disposition') is None:
                continue

            if part.get_filename():
                body = part.get_payload(decode=True)
                # do something with the body, async?

    connection.close()
    connection.logout()    


loop = asyncio.get_event_loop()
pool = ThreadPoolExecutor(max_workers=5)  # You can probably increase max_workers, because the threads are almost exclusively doing I/O.
loop.run_until_complete(asyncio.wait([get_attachment(d, pool) for d in data]))
loop.close()

不过,这种方法没有真正的异步输入输出解决方案那么好,因为创建线程会有一些额外的开销,这会影响扩展性,并且会增加内存使用。你还会因为所有包裹实际输入输出调用的代码而遇到一些GIL的减速。不过,如果你处理的邮件数量不超过几千封,这种方法的表现应该还是可以的。

我们使用run_in_executor来把ThreadPoolExecutor作为asyncio事件循环的一部分,使用asyncio.async来包装返回的协程对象到asyncio.Future中,并且使用as_completed来按照完成的顺序遍历这些未来对象。

编辑

看起来imaplib并不是线程安全的。我已经修改了我的回答,使用了通过threading.local实现的线程本地存储,这样我们可以为每个线程创建一个连接对象,这个对象可以在整个线程的生命周期内重复使用(也就是说,你只需要创建num_workers个连接对象,而不是每次fetch都创建一个新的连接)。

撰写回答