asyncio 并發(fā)計算如何收集每個任務的返回值?
有 10 個獨立的任務,每個任務都有返回值,且要執(zhí)行 1-3 秒才能結束。
如果順序執(zhí)行,那么就需要 10-30 秒才能全部完成。能不能在 3 秒內(nèi)完成,且獲得每個任務的返回值??
使用 asyncio.gather() 函數(shù)
用 asyncio.create() 迅速產(chǎn)生任務(這個幾乎不花時間),把每個任務放到一個叫做 aws 的列表中,最后將 aws 作為參數(shù)傳入 asyncio.gather() 。 asyncio.gather() 會收集每個任務的結果,做成一個列表的形式,存儲在 result 中。

result = asyncio.run(many_jobs_using_gather(10)) 的結果顯示如下:
[(1, 'Job (#0): 1 seconds'), (2, 'Job (#1): 2 seconds'), (2, 'Job (#2): 2 seconds'), (3, 'Job (#3): 3 seconds'), (2, 'Job (#4): 2 seconds'), (2, 'Job (#5): 2 seconds'), (2, 'Job (#6): 2 seconds'), (3, 'Job (#7): 3 seconds'), (1, 'Job (#8): 1 seconds'), (1, 'Job (#9): 1 seconds')]

可以看到,結果 result 是一個元祖的列表:每個元祖的第一個元素代表 match() 函數(shù)的返回值?delay, 第二個元素代表返回值?what。注意,結果是按照任務產(chǎn)生的順序存儲的,Job(#0), Job(#1), ..., 符合預期。

使用 asyn with 與 Task Group
在 Python 3.11 中, 引入了 Task Group, 也可以使用 Task Group, 代碼如下。這里使用 task.result() 來獲得每個任務完成后的結果。

代碼清單
import asyncio
import random
import time
async def match(delay, what):
? ? await asyncio.sleep(delay)
? ? print(what)
? ? return delay, what
async def many_jobs_using_task_group():
? ? tasks = []
? ? async with asyncio.TaskGroup() as tg: # work for Python 3.11
? ? ? ? for i in range(10):
? ? ? ? ? ? wait_seconds = random.randint(1,3)
? ? ? ? ? ? task = tg.create_task(match(wait_seconds, f'Job (#{i}): {wait_seconds} seconds'))
? ? ? ? ? ? tasks.append(task)
? ? ? ? ? ? print(f"Started at {time.strftime('%X')}")
? ? # The await is implicit when the context manager exits.
? ? print(f"Finished at {time.strftime('%X')}")?
? ? result = []
? ? for task in tasks:
? ? ? ? result.append(task.result()) # get the returned result from each task
? ? return result
async def many_jobs_using_gather(n):
? ? aws = [] # awaitables
? ? for i in range(n):
? ? ? ? wait_seconds = random.randint(1,3)
? ? ? ? task = asyncio.create_task(match(wait_seconds, f'Job (#{i}): {wait_seconds} seconds')) ? ? ? ?
? ? ? ? aws.append(task)
? ? ? ? print(f"Started at {time.strftime('%X')}")
? ? print('Gather results ...')
? ? result = await asyncio.gather(*aws) # a list
? ? print(f"Finished at {time.strftime('%X')}")?
? ? return result
#result = asyncio.run(many_jobs_using_task_group())
result = asyncio.run(many_jobs_using_gather(10))
print(result)