在Python asyncio中获取第一个可用的锁/信号量

6 投票
1 回答
3536 浏览
提问于 2025-04-18 17:32

在Python 3.4中使用新的asyncio,怎么才能从一组锁或信号量中获取第一个可用的锁或信号量呢?

我尝试的方法是使用wait(return_when=FIRST_COMPLETED),然后在成功获取一个锁后,取消所有仍在等待的acquire()操作。不过,我有点担心这样做可能会引发一些微妙的错误或者竞争条件,我感觉还有更优雅的解决办法。

import asyncio as aio

@aio.coroutine
def run():
    sem1, sem2 = (aio.Semaphore(), aio.Semaphore())
    print('initial:', sem1, sem2)
    a = aio.async(sleep(sem1, 1)) # acquire sem1
    print('just after sleep:', sem1, sem2)
    done, pending = yield from aio.wait([sem1.acquire(), sem2.acquire()], return_when=aio.FIRST_COMPLETED)
    print('done:', done)
    print('pending:', pending)
    for task in pending:
        task.cancel()
    print('after cancel:', sem1, sem2)
    yield from aio.wait([a])
    print('after wait:', sem1, sem2)

@aio.coroutine
def sleep(sem, i):
    with (yield from sem):
        yield from aio.sleep(i)

if __name__ == "__main__":
    aio.get_event_loop().run_until_complete(run())

上面的代码给出的结果(内存地址已编辑):

initial: <asyncio.locks.Semaphore object at 0x1 [unlocked,value:1]> <asyncio.locks.Semaphore object at 0x2 [unlocked,value:1]>
just after sleep: <asyncio.locks.Semaphore object at 0x1 [unlocked,value:1]> <asyncio.locks.Semaphore object at 0x2 [unlocked,value:1]>
done: {Task(<acquire>)<result=True>}
pending: {Task(<acquire>)<PENDING>}
after cancel: <asyncio.locks.Semaphore object at 0x1 [locked,waiters:1]> <asyncio.locks.Semaphore object at 0x2 [locked]>
after wait: <asyncio.locks.Semaphore object at 0x1 [unlocked,value:1]> <asyncio.locks.Semaphore object at 0x2 [locked]>

1 个回答

4

如果我理解你的问题没错的话,你想要有两个不同的锁池,一个是每个代理允许X个连接,另一个是允许Y个全局连接。其实可以用一个Semaphore对象来很简单地实现这个功能,具体可以参考这个链接

class asyncio.Semaphore(value=1, *, loop=None)

信号量管理一个内部计数器,每次调用acquire()时计数器减1,每次调用release()时计数器加1。这个计数器永远不会低于零;当acquire()发现计数器为零时,它会阻塞,等待其他线程调用release()

所以,与其使用一堆初始化为默认值1的Semaphore对象来实现这个池,不如只初始化一个Semaphore对象,并把它的value设置为你想要同时运行的最大任务数。

proxy_sem = Semaphore(value=5) # 5 connections will be able to hold this semaphore concurrently
global_sem = Semaphore(value=15) # 15 connections will be able to hold this semaphore

然后在你的代码中,始终在获取全局锁之前先获取代理的信号量。

with (yield from proxy_sem):
     with (yield from global_sem):

这样,你在等待代理特定的锁时就不会占用全局锁,这样可以避免阻塞其他代理的连接,如果它们能够获取到全局锁的话。

补充:

这里有一个完整的例子,展示了如何在不需要代理特定锁的情况下实现这个功能。相反,你为每个代理运行一个协程,所有的协程都从同一个队列中获取任务。代理协程通过跟踪它们启动的活跃任务数量来限制同时运行的任务数,只有在活跃任务数低于限制时才会启动新任务。当一个代理协程启动一个任务时,这个任务负责获取全局信号量。以下是代码:

import asyncio
import random

PROXY_CONN_LIMIT = 5
GLOBAL_CONN_LIMIT = 20
PROXIES = ['1.2.3.4', '1.1.1.1', '2.2.2.2', '3.3.3.3', '4.4.4.4']

@asyncio.coroutine
def do_network_stuff(item, proxy_info):
    print("Got {}. Handling it with proxy {}".format(item, proxy_info))
    # Wait a random amount of time to simulate actual work being done.
    yield from asyncio.sleep(random.randint(1,7))

@asyncio.coroutine
def handle_item(item, proxy_info, global_sem):
    with (yield from global_sem):  # Get the global semaphore
       yield from do_network_stuff(item, proxy_info)

@asyncio.coroutine
def proxy_pool(proxy_info, queue, global_sem):
    tasks = []
    def remove_item(task, *args):
        tasks.remove(task)
    while True:  # Loop infinitely. We'll return when we get a sentinel from main()
        while len(tasks) < PROXY_CONN_LIMIT: # Pull from the queue until we hit our proxy limit
            item = yield from queue.get()
            print(len(tasks))
            if item is None:  # Time to shut down
                if tasks:
                    # Make sure all pending tasks are finished first.
                    yield from asyncio.wait(tasks)
                print("Shutting down {}".format(proxy_info))
                return
            # Create a task for the work item, and add it to our list of
            # tasks.
            task = asyncio.async(handle_item(item, proxy_info, global_sem))
            tasks.append(task)
        # We've hit our proxy limit. Now we wait for at least one task
        # to complete, then loop around to pull more from the queue.
        done, pending = yield from asyncio.wait(tasks,
                                                return_when=asyncio.FIRST_COMPLETED) 
        # Remove the completed tasks from the active tasks list.
        for d in done:
            tasks.remove(d)

@asyncio.coroutine
def main():
    global_sem = asyncio.Semaphore(GLOBAL_CONN_LIMIT)
    queue = asyncio.Queue()
    tasks = []
    # Start the proxy pools.
    for proxy in PROXIES:
        tasks.append(asyncio.async(proxy_pool(proxy, queue, global_sem)))

    # Send work to the proxy pools.
    for i in range(50):
        yield from queue.put(i)

    # Tell the proxy pools to shut down.
    for _ in PROXIES:
        yield from queue.put(None)

    # Wait for them to shut down.
    yield from asyncio.wait(tasks)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

示例输出:

0
1
2
3
4
0
1
2
3
4
0
1
2
3
4
0
1
2
3
4
0
1
2
3
4
Got 0. Handling it with proxy 1.2.3.4
Got 1. Handling it with proxy 1.2.3.4
Got 2. Handling it with proxy 1.2.3.4
Got 3. Handling it with proxy 1.2.3.4
Got 4. Handling it with proxy 1.2.3.4
Got 5. Handling it with proxy 1.1.1.1
Got 6. Handling it with proxy 1.1.1.1
Got 7. Handling it with proxy 1.1.1.1
Got 8. Handling it with proxy 1.1.1.1
Got 9. Handling it with proxy 1.1.1.1
Got 10. Handling it with proxy 2.2.2.2
Got 11. Handling it with proxy 2.2.2.2
Got 12. Handling it with proxy 2.2.2.2
Got 13. Handling it with proxy 2.2.2.2
Got 14. Handling it with proxy 2.2.2.2
Got 15. Handling it with proxy 3.3.3.3
Got 16. Handling it with proxy 3.3.3.3
Got 17. Handling it with proxy 3.3.3.3
Got 18. Handling it with proxy 3.3.3.3
Got 19. Handling it with proxy 3.3.3.3
Got 20. Handling it with proxy 4.4.4.4
Got 21. Handling it with proxy 4.4.4.4
Got 22. Handling it with proxy 4.4.4.4
Got 23. Handling it with proxy 4.4.4.4
4
4
4
4
Got 24. Handling it with proxy 4.4.4.4
Got 25. Handling it with proxy 1.2.3.4
Got 26. Handling it with proxy 2.2.2.2
Got 27. Handling it with proxy 1.1.1.1
Got 28. Handling it with proxy 3.3.3.3
3
4
4
4
4
4
Got 29. Handling it with proxy 4.4.4.4
Got 30. Handling it with proxy 4.4.4.4
Got 31. Handling it with proxy 2.2.2.2
Got 32. Handling it with proxy 1.1.1.1
4
4
4
Got 33. Handling it with proxy 1.2.3.4
Got 34. Handling it with proxy 3.3.3.3
Got 35. Handling it with proxy 1.1.1.1
Got 36. Handling it with proxy 2.2.2.2
Got 37. Handling it with proxy 3.3.3.3
3
4
4
4
4
Got 38. Handling it with proxy 1.2.3.4
4
Got 39. Handling it with proxy 1.2.3.4
Got 40. Handling it with proxy 2.2.2.2
Got 41. Handling it with proxy 1.1.1.1
Got 42. Handling it with proxy 3.3.3.3
Got 43. Handling it with proxy 4.4.4.4
2
3
4
4
4
4
Got 44. Handling it with proxy 1.2.3.4
Got 45. Handling it with proxy 1.2.3.4
Got 46. Handling it with proxy 1.2.3.4
Got 47. Handling it with proxy 1.1.1.1
Got 48. Handling it with proxy 4.4.4.4
Got 49. Handling it with proxy 2.2.2.2
3
4
4
4
Shutting down 3.3.3.3
4
Shutting down 2.2.2.2
Shutting down 1.1.1.1
Shutting down 4.4.4.4
Shutting down 1.2.3.4

撰写回答