Python 3.8异步并发编程指南( 四 )


回调处理任务(实际上是未来)时,一旦 Future 在任务上设置了值,就可以执行"回调"函数 。
【Python 3.8异步并发编程指南】以下示例通过修改上一create_task示例代码来演示这一点:
import asyncioasync def foo():await asyncio.sleep(10)return "Foo!"def got_result(future):print(f"got the result! {future.result()}")async def hello_cc():task = asyncio.create_task(foo())task.add_done_callback(got_result)print(task)await asyncio.sleep(5)print("Hello Chongchong!")await asyncio.sleep(10)print(task)asyncio.run(hello_cc())请注意,在上面的程序中,我们增加了一个函数,该got_result
函数期望接收"未来"类型,从而调用.result()"未来" 。另请注意,要调用此函数,我们会将其传递给.add_done_callback()在create_task返回的任务上调用它 。
此程序的输出是:
<Task pending coro=<foo() running at gather.py:4> cb=[got_result() at gather.py:9]>Hello Chongchong!got the result! Foo!<Task finished coro=<foo() done, defined at gather.py:4> result='Foo!'>任务池在处理大量并发操作时,最好利用线程(和/或子进程)的"池"来防止耗尽应用程序的主机资源 。
这也是concurrent.futures模块的来历 。它提供了一个称为执行器的概念来帮助它,它可以独立运行或集成到现有的异步事件循环中 。
执行器有两种类型的执行器:ThreadPoolExecutor和ProcessPoolExecutor
让我们看一下在其中一个执行器中执行代码的第一种方法,方法是使用异步事件循环来计划执行器的运行 。
为此,需要调用事件循环的.run_in_executor()函数,并将执行器类型作为第一个参数传递 。如果None被提供,则使用默认执行器(即 ThreadPoolExecutor)下面的实例来自Python官方文档:
import asyncioimport concurrent.futuresdef blocking_io():with open("/dev/urandom", "rb") as f:return f.read(100)def cpu_bound():return sum(i * i for i in range(10 ** 7))async def main():loop = asyncio.get_running_loop()result = await loop.run_in_executor(None, blocking_io)print("default thread pool", result)with concurrent.futures.ThreadPoolExecutor() as pool:result = await loop.run_in_executor(pool, blocking_io)print("custom thread pool", result)with concurrent.futures.ProcessPoolExecutor() as pool:result = await loop.run_in_executor(pool, cpu_bound)print("custom process pool", result)asyncio.run(main())在其中一个执行器中执行代码的第二种方法是将要执行的代码直接发送到池 。这意味着我们不必获取当前事件循环将池传递到其中(如前面的示例所示),但它附带了一个警告,即父程序不会等待任务完成,除非您明确告诉它(我接下来将演示) 。考虑到这一点,让我们来看看这个替代方法 。它涉及调用执行者的方法submit():
import concurrent.futuresimport timedef slow_op(*args):print(f"arguments: {args}")time.sleep(5)print("slow operation complete")return 123def do_something():with concurrent.futures.ProcessPoolExecutor() as pool:future = pool.submit(slow_op, "a", "b", "c")for fut in concurrent.futures.as_completed([future]):assert future.done() and not future.cancelled()print(f"got the result from slow_op: {fut.result()}")if __name__ == "__main__":print("program started")do_something()print("program complete")这里值得注意的一点是,如果我们没有使用with语句(如上例中所示),则意味着一旦池完成其工作,将不会关闭它,因此(取决于程序是否继续运行),可能会发现资源没有被清理 。
要解决此问题,可以调用.shutdown()通过其父类向两种类型的执行器公开的方法 。concurrent.futures.Executor
下面是一个这样做的示例,但现在使用线程池执行器:
import concurrent.futuresTHREAD_POOL = concurrent.futures.ThreadPoolExecutor(max_workers=5)def slow_op(*args):print(f"arguments: {args}")print("some kind of slow operation")return 123def do_something():future = THREAD_POOL.submit(slow_op, "a", "b", "c")THREAD_POOL.shutdown()assert future.done() and not future.cancelled()print(f"got the result from slow_op: {future.result()}")if __name__ == "__main__":print("program started")do_something()print("program complete")现在,我尚未在示例中还未使用time.sleep(),因为我们使用的是线程池,并且time.sleep()是 CPU 绑定操作,如果直接使用将阻止线程完成 。
这意味着我们的示例可能总是导致slow_op()函数完成之前,我们开始检查 future.done() 。所以,是的,这不是最好的例子 。通过合并一个不阻止的真正缓慢的操作,可以更现实地测试它 。


推荐阅读