Python 协程与异步

像烟花也是过一生,像樱花也是过一生,只要亮过和盛开过不就好了吗?

协程

协程定义协程的底层架构是在pep342 中定义,并在python2.5 实现的

优点

  1. 无需线程上下文切换的开销

  2. 无需原子操作锁定及同步的开销

  3. 方便切换控制流,简化编程模型

  4. 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理

缺点

  1. 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。

  2. 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

协程 2.7

协程:单线程里面不断切换这个单线程中的微进程,即通过代码来实现让一个线程中的更小进程来回切换,相对于多线程多进程可以节省线程切换的时间。

代码实现

协程在Python中使用yield生成器实现,每次执行到yield位置代码停止,返回一个数据,随后在别的地方可以接手这个数据后,代码恢复继续执行

# -*- coding: utf-8 -*-
# @Time    : 2018/6/23 0023 10:19
# @Author  : Langzi
# @Blog    : www.langzi.fun
# @File    : 协程.py
# @Software: PyCharm
import sys
import time
reload(sys)
sys.setdefaultencoding('utf-8')

def fun_1():
    while 1:
        n = yield 'FUN_1 执行完毕,切换到FUN_2'
        # 函数运行到yield会暂停函数执行,存储这个值。并且有next():调用这个值,与send():外部传入一个值
        if not n:
            return
        time.sleep(1)
        print 'FUN_1 函数开始执行'

def fun_2(t):
    t.next()
    while 1:
        print '-'*20
        print 'FUN_2 函数开始执行'
        time.sleep(1)
        ret = t.send('over')
        print ret
    t.close()

if __name__ == '__main__':
    n = fun_1()
    fun_2(n)

可以看到,没有使用多线程处理,依然在两个函数中不断切换循环。

总结一下:

1. 第一个生产者函数中,使用yield,后面的代码暂时不会执行
2. 第一个函数执行到yield后,程序执行第二个函数,首先接受参数t,调用yield的下一个值,t.next()
3. 然后第二个函数继续执行,执行完后给第一个函数发送一些数据,ret=t.send(None),其中ret就是第一个函数中yield的值
4. 最后关闭,t.close()
5. 把第一个函数的运行结果(其实就是当执行到yield的值)传递给第二个函数,第二个函数继续执行,然后把返回值继续传递给第一个函数。

协程 3.5

在Python3中新增asyncio库,在 3.5+ 版本中, asyncio 有两样语法非常重要, async, await 弄懂了它们是如何协同工作的, 我们就完全能发挥出这个库的功能了。

基本用法

我们要时刻记住,asyncio 不是多进程, 也不是多线程, 单单是一个线程, 但是是在 Python 的功能间切换着执行. 切换的点用 await 来标记, 使用async关键词将其变成协程方法, 比如 async def function():。其中,async 定义一个协程,await 用来挂起阻塞方法的执行。

概念

  1. event_loop事件循环:程序开启一个无限的循环,当把一些函数注册到事件循环上时,满足事件发生条件即调用相应的函数。
  2. coroutine协程对象:指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象,协程对象需要注册到事件循环,由事件循环调用。
  3. task任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
  4. future:代表将来执行或没有执行的任务的结果,它和task上没有本质的区别
  5. async/await关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

代码演示

先看看不是异步的

import time
def job(t):
    print('Start job ', t)
    time.sleep(t)               # wait for "t" seconds
    print('Job ', t, ' takes ', t, ' s')
def main():
    [job(t) for t in range(1, 3)]
t1 = time.time()
main()
print("NO async total time : ", time.time() - t1)

返回结果:

Start job  1
Job  1  takes  1  s
Start job  2
Job  2  takes  2  s
NO async total time :  3.008603096008301

从上面可以看出, 我们的 job 是按顺序执行的, 必须执行完 job 1 才能开始执行 job 2, 而且 job 1 需要1秒的执行时间, 而 job 2 需要2秒. 所以总时间是 3 秒多. 而如果我们使用 asyncio 的形式, job 1 在等待 time.sleep(t) 结束的时候, 比如是等待一个网页的下载成功, 在这个地方是可以切换给 job 2, 让它开始执行.

然后是异步的

import asyncio
async def job(t):                   # async 形式的功能
    print('Start job ', t)
    await asyncio.sleep(t)          # 等待 "t" 秒, 期间切换其他任务
    print('Job ', t, ' takes ', t, ' s')
async def main(loop):                       # async 形式的功能
    tasks = [
    loop.create_task(job(t)) for t in range(1, 3)
    ]                                       # 创建任务, 但是不执行
    await asyncio.wait(tasks)               # 执行并等待所有任务完成
t1 = time.time()
loop = asyncio.get_event_loop()             # 建立 loop
loop.run_until_complete(main(loop))         # 执行 loop,并且等待所有任务结束
loop.close()                                # 关闭 loop
print("Async total time : ", time.time() - t1)

返回结果:

Start job  1
Start job  2
Job  1  takes  1  s
Job  2  takes  2  s
Async total time :  2.001495838165283

从结果可以看出, 我们没有等待 job 1 的结束才开始 job 2, 而是 job 1 触发了 await 的时候就切换到了 job 2 了. 这时, job 1 和 job 2 同时在等待 await asyncio.sleep(t), 所以最终的程序完成时间, 取决于等待最长的 t, 也就是 2秒. 这和上面用普通形式的代码相比(3秒), 的确快了很多.由于协程对象不能直接运行,在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象。所谓task对象是Future类的子类,保存了协程运行后的状态,用于未来获取协程的结果。

简单的例子:

import asyncio
import requests
async def scan(url):
    r = requests.get(url).status_code
    return r

task = asyncio.ensure_future(scan('http://www.langzi.fun'))
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print(task.result())

调用协程有好几种方法,这里就只看我这种即可,主要是后面三行。把任务赋值给task,然后loop为申请调度(这么理解),然后执行。因为requests这个库是同步堵塞的,所以没办法变成异步执行,这个时候学学aiohttp,一个唯一有可能在异步中取代requests的库。

绑定回调

就是让第一个函数执行后,执行的结果传递给第二个函数继续执行

例子:

import asyncio
import requests

async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status

def callback(task):
    print('Status:', task.result())

coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)

在这里我们定义了一个 request() 方法,请求了百度,返回状态码,但是这个方法里面我们没有任何 print() 语句。随后我们定义了一个 callback() 方法,这个方法接收一个参数,是 task 对象,然后调用 print() 方法打印了 task 对象的结果。这样我们就定义好了一个 coroutine 对象和一个回调方法,我们现在希望的效果是,当 coroutine 对象执行完毕之后,就去执行声明的 callback() 方法。

那么它们二者怎样关联起来呢?很简单,只需要调用 add_done_callback() 方法即可,我们将 callback() 方法传递给了封装好的 task 对象,这样当 task 执行完毕之后就可以调用 callback() 方法了,同时 task 对象还会作为参数传递给 callback() 方法,调用 task 对象的 result() 方法就可以获取返回结果了。

多任务协程

就是把所有的任务加载到一个列表中,然后依次执行

上面的例子我们只执行了一次请求,如果我们想执行多次请求应该怎么办呢?我们可以定义一个 task 列表,然后使用 asyncio 的 wait() 方法即可执行,看下面的例子:

import asyncio
import requests

async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print('Tasks:', tasks)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
    print('Task Result:', task.result())

这里我们使用一个 for 循环创建了五个 task,组成了一个列表,然后把这个列表首先传递给了 asyncio 的 wait() 方法,然后再将其注册到时间循环中,就可以发起五个任务了。最后我们再将任务的运行结果输出出来,运行结果如下:

Tasks: [<Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>]
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>

多任务协程实现

上面的多任务协程执行了,但是是依次执行的

举例子测试,访问博客测试速度

import asyncio
import requests
import time

start = time.time()

async def request():
    url = 'http://www.langzi.fun'
    print('Waiting for', url)
    response = requests.get(url)
    print('Get response from', url, 'Result:', response.text)

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)

这个和上面一样,只是把所有的任务量加载一个tasks列表中罢了,并没有异步执行,但是不要慌,继续看代码

import asyncio
import aiohttp
import time

start = time.time()

async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    result = await response.text()
    session.close()
    return result

async def request():
    url = 'http://www.langzi.fun'
    print('Waiting for', url)
    result = await get(url)
    print('Get response from', url, 'Result:', result)

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)

为什么使用aiohttp呢?在之前就说过requests这个库是堵塞的,并不支持异步,而aiohttp是支持异步的网络请求的库。

协程嵌套

使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来。

import asyncio

async def myfun(i):
    print('start {}th'.format(i))
    await asyncio.sleep(1)
    print('finish {}th'.format(i))

loop = asyncio.get_event_loop()
myfun_list = [asyncio.ensure_future(myfun(i)) for i in range(10)]
loop.run_until_complete(asyncio.wait(myfun_list))

这种用法和上面一种的不同在于后面调用的是asyncio.gather还是asyncio.wait,当前看成完全等价即可,所以平时使用用上面哪种都可以。

上面是最常看到的两种使用方式,这里列出来保证读者在看其他文章时不会发蒙。

另外,二者其实是有细微差别的

  • gather更擅长于将函数聚合在一起
  • wait更擅长筛选运行状况
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
    print('Waiting: ', x)
    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)
async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    dones, pendings = await asyncio.wait(tasks)

    for task in dones:
        print('Task ret: ', task.result())

start = now()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

print('TIME: ', now() - start)

如果使用的是 asyncio.gather创建协程对象,那么await的返回值就是协程运行的结果。

results = await asyncio.gather(*tasks)

for result in results:
    print('Task ret: ', result)

不在main协程函数里处理结果,直接返回await的内容,那么最外层的run_until_complete将会返回main协程的结果。

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(2)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]

    return await asyncio.gather(*tasks)

start = now()

loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())

for result in results:
    print('Task ret: ', result)

或者返回使用asyncio.wait方式挂起协程。

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]

    return await asyncio.wait(tasks)

start = now()

loop = asyncio.get_event_loop()
done, pending = loop.run_until_complete(main())

for task in done:
    print('Task ret: ', task.result())

也可以使用asyncio的as_completed方法

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    for task in asyncio.as_completed(tasks):
        result = await task
        print('Task ret: {}'.format(result))

start = now()

loop = asyncio.get_event_loop()
done = loop.run_until_complete(main())
print('TIME: ', now() - start)

由此可见,协程的调用和组合十分灵活,尤其是对于结果的处理,如何返回,如何挂起,需要逐渐积累经验和前瞻的设计。

协程停止

上面见识了协程的几种常用的用法,都是协程围绕着事件循环进行的操作。future对象有几个状态:

  • Pending
  • Running
  • Done
  • Cancelled

创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,就需要先把task取消。可以使用asyncio.Task获取事件循环的task

import asyncio

import time

now = lambda: time.time()

async def do_some_work(x):
    print('Waiting: ', x)

    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(2)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

start = now()

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
    print(asyncio.Task.all_tasks())
    for task in asyncio.Task.all_tasks():
        print(task.cancel())
    loop.stop()
    loop.run_forever()
finally:
    loop.close()

print('TIME: ', now() - start)

启动事件循环之后,马上ctrl+c,会触发run_until_complete的执行异常 KeyBorardInterrupt。然后通过循环asyncio.Task取消future。可以看到输出如下:

Waiting:  1
Waiting:  2
Waiting:  2
{<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x101230648>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1032b10a8>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:307> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317d38>()]> cb=[_run_until_complete_cb() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py:176]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317be8>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>}
True
True
True
True
TIME:  0.8858370780944824

True表示cannel成功,loop stop之后还需要再次开启事件循环,最后在close,不然还会抛出异常:

Task was destroyed but it is pending!
task: <Task pending coro=<do_some_work() done,

循环task,逐个cancel是一种方案,可是正如上面我们把task的列表封装在main函数中,main函数外进行事件循环的调用。这个时候,main相当于最外出的一个task,那么处理包装的main函数即可。

import asyncio
import time
now = lambda: time.time()

async def do_some_work(x):
    print('Waiting: ', x)
    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)
async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(2)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    done, pending = await asyncio.wait(tasks)
    for task in done:
        print('Task ret: ', task.result())

start = now()

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(main())
try:
    loop.run_until_complete(task)
except KeyboardInterrupt as e:
    print(asyncio.Task.all_tasks())
    print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())
    loop.stop()
    loop.run_forever()
finally:
    loop.close()

不同线程的事件循环

很多时候,我们的事件循环用于注册协程,而有的协程需要动态的添加到事件循环中。一个简单的方式就是使用多线程。当前线程创建一个事件循环,然后在新建一个线程,在新线程中启动事件循环。当前线程不会被block。

from threading import Thread

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def more_work(x):
    print('More work {}'.format(x))
    time.sleep(x)
    print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)

启动上述代码之后,当前线程不会被block,新线程中会按照顺序执行call_soon_threadsafe方法注册的more_work方法,后者因为time.sleep操作是同步阻塞的,因此运行完毕more_work需要大致6 + 3

简单案例

import asyncio
import aiohttp
import time

start = time.time()

async def get(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url, timeout=5) as resp:
            result = (await resp.read())
            return result

async def request(url):
    result = await get(url)
    print('Get response from', url, 'Result:', result)


urls = [
'http://www.php.cn',
'http://www.phphtm.com',
'http://www.thinkphp.cn',
'http://www.phpboke.com',
'https://www.52pojie.cn',
'http://www.douco.com'
]

tasks = [asyncio.ensure_future(request(url=_)) for _ in urls]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)

新线程协程

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def do_some_work(x):
    print('Waiting {}'.format(x))
    await asyncio.sleep(x)
    print('Done after {}s'.format(x))

def more_work(x):
    print('More work {}'.format(x))
    time.sleep(x)
    print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

上述的例子,主线程中创建一个new_loop,然后在另外的子线程中开启一个无限事件循环。主线程通过run_coroutine_threadsafe新注册协程对象。这样就能在子线程中进行事件循环的并发操作,同时主线程又不会被block。一共执行的时间大概在6s左右。

骚方法

requests实现异步爬虫一

如同前面介绍如何在asyncio中使用requests模块一样,如果想在asyncio中使用其他阻塞函数,该怎么实现呢?虽然目前有异步函数支持asyncio,但实际问题是大部分IO模块还不支持asyncio。 阻塞函数(例如io读写,requests网络请求)阻塞了客户代码与asycio事件循环的唯一线程,因此在执行调用时,整个应用程序都会冻结。

解决方案:

这个问题的解决方法是使用事件循环对象的run_in_executor方法。asyncio的事件循环在背后维护着一个ThreadPoolExecutor对象,我们可以调用run_in_executor方法,把可调用对象发给它执行,即可以通过run_in_executor方法来新建一个线程来执行耗时函数。

run_in_executor方法

AbstractEventLoop.run_in_executor(executor, func, *args)
  • executor 参数应该是一个 Executor 实例。如果为 None,则使用默认 executor。
  • func 就是要执行的函数
  • args 就是传递给 func 的参数

实际例子(使用time.sleep()):

import asyncio
import time
async def run(url):
    print("start ",url)
    loop = asyncio.get_event_loop()
    try:
        await loop.run_in_executor(None,time.sleep,1)
    except Exception as e:
        print(e)
    print("stop ",url)
url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]
tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

说明:有了run_in_executor方法,我们就可以使用之前熟悉的模块创建协程并发了,而不需要使用特定的模块进行IO异步开发。

requests实现异步爬虫二

与之前学过的多线程、多进程相比,asyncio模块有一个非常大的不同:传入的函数不是随心所欲

  • 比如我们把上面myfun函数中的sleep换成time.sleep(1),运行时则不是异步的,而是同步,共等待了10秒
  • 如果我换一个myfun,比如换成下面这个使用request抓取网页的函数

下面是代码案例:

import asyncio
import requests
from bs4 import BeautifulSoup

async def get_title(a):
    url = 'https://movie.douban.com/top250?start={}&filter='.format(a*25)
    r = requests.get(url)
    soup = BeautifulSoup(r.content, 'html.parser')
    lis = soup.find('ol', class_='grid_view').find_all('li')
    for li in lis:
        title = li.find('span', class_="title").text
        print(title)

loop = asyncio.get_event_loop()
fun_list = (get_title(i) for i in range(10))
loop.run_until_complete(asyncio.gather(*fun_list))

依然不会异步执行。

到这里我们就会想,是不是异步只对它自己定义的sleep(await asyncio.sleep(1))才能触发异步?

对于上述函数,asyncio库只能通过添加线程的方式实现异步,下面我们实现time.sleep时的异步

import asyncio
import time

def myfun(i):
    print('start {}th'.format(i))
    time.sleep(1)
    print('finish {}th'.format(i))

async def main():
    loop = asyncio.get_event_loop()
    futures = (
        loop.run_in_executor(
            None,
            myfun, 
            i)
        for i in range(10)
        )
    for result in await asyncio.gather(*futures):
        pass

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

上面run_in_executor其实开启了新的线程,再协调各个线程。调用过程比较复杂,只要当模板一样套用即可。

上面10次循环仍然不是一次性打印出来的,而是像分批次一样打印出来的。这是因为开启的线程不够多,如果想要实现一次打印,可以开启10个线程,代码如下

import concurrent.futures as cf # 多加一个模块
import asyncio
import time

def myfun(i):
    print('start {}th'.format(i))
    time.sleep(1)
    print('finish {}th'.format(i))

async def main():
    with cf.ThreadPoolExecutor(max_workers = 10) as executor: # 设置10个线程
        loop = asyncio.get_event_loop()
        futures = (
            loop.run_in_executor(
                executor, # 按照10个线程来执行
                myfun, 
                i)
            for i in range(10)
            )
        for result in await asyncio.gather(*futures):
            pass

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

用这种方法实现requests异步爬虫代码如下

import concurrent.futures as cf
import asyncio
import requests
from bs4 import BeautifulSoup

def get_title(i):
    url = 'https://movie.douban.com/top250?start={}&filter='.format(i*25)
    r = requests.get(url)
    soup = BeautifulSoup(r.content, 'html.parser')
    lis = soup.find('ol', class_='grid_view').find_all('li')
    for li in lis:
        title = li.find('span', class_="title").text
        print(title)

async def main():
    with cf.ThreadPoolExecutor(max_workers = 10) as executor:
        loop = asyncio.get_event_loop()
        futures = (
            loop.run_in_executor(
                executor,
                get_title, 
                i)
            for i in range(10)
            )
        for result in await asyncio.gather(*futures):
            pass

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

深度解析

理论和目标

常规生成器(在 PEP 255 中引入)的实现,使得编写复杂数据变得更优雅,它们的行为类似于迭代器。
当时没有提供async for使用的异步生成器。 编写异步数据生成器变得非常复杂,因为必须定义一个实现 aiteranext 的方法,才能在 async for 语句中使用它。
为了说明异步生成器的重要性,专门做了性能测试,测试结果表明使用异步生成器要比使用异步迭代器快 2 倍多。
下面的代码是演示了在迭代的过程中等待几秒:

class Ticker:
    """Yield numbers from 0 to `to` every `delay` seconds."""

    def __init__(self, delay, to):
        self.delay = delay
        self.i = 0
        self.to = to

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        if i >= self.to:
            raise StopAsyncIteration
        self.i += 1
        if i:
            await asyncio.sleep(self.delay)
        return i

我们那可以使用下面的代码实现同样的功能:

async def ticker(delay, to):
    """Yield numbers from 0 to `to` every `delay` seconds."""
    for i in range(to):
        yield i
        await asyncio.sleep(delay)

详细说明

异步生成器

我们直到在函数中使用一个或多个 yield 该函数将变成一个生成器。

def func():            # 方法
    return

def genfunc():         # 生成器方法
    yield

我们提议使用类似的功能实现下面异步生成器:

async def coro():      # 一个协程方法
    await smth()

async def asyncgen():  # 一个异步生成器方法
    await smth()
    yield 42

调用异步生成器函数的结果是异步生成器对象,它实现了 PEP 492 中定义的异步迭代协议。
注意:在异步生成器中使用非空 return 语句会引发 SyntaxError 错误。

对异步迭代协议的支持

该协议需要实现两种特殊方法:
__aiter__ 方法返回一个异步迭代器。
__anext__ 方法返回一个 awaitable 对象,它使用 StopIteration 异常来捕获 yield 的值,使用 StopAsyncIteration 异常来表示迭代结束。
异步生成器定义了这两种方法。 让我们实现一个一个简单的异步生成器:

import asyncio
async def genfunc():
    yield 1
    yield 2

gen = genfunc()

async def start():
    assert gen.__aiter__() is gen
    assert await gen.__anext__() == 1
    assert await gen.__anext__() == 2
    await gen.__anext__()  # This line will raise StopAsyncIteration.

if __name__ == '__main__':
    asyncio.run(start())

终止

PEP 492 提到需要使用事件循环或调度程序来运行协程。 因为异步生成器是在协程使用的,所以还需要创建一个事件循环来运行。
异步生成器可以有 try..finally 块,也可以用 async with 异步上下文管理代码快。 重要的是提供一种保证,即使在部分迭代时,也可以进行垃圾收集,生成器可以安全终止。

async def square_series(con, to):
    async with con.transaction():
        cursor = con.cursor(
            'SELECT generate_series(0, $1) AS i', to)
        async for row in cursor:
            yield row['i'] ** 2

async for i in square_series(con, 1000):
    if i == 100:
        break

上面代码演示了异步生成器在 async with中 使用,然后使用 async for 对异步生成器对象进行迭代处理,同时我们也可以设置一个中断条件。
square_series() 生成器将被垃圾收集,并没有异步关闭生成器的机制,Python 解释器将无法执行任何操作。
为了解决这个问题,这里提出以下改进建议:

  1. 在异步生成器上实现一个 aclose 方法,返回一个特殊 awaittable 对象。 当 awaitable 抛出 GeneratorExit 异常的时候,抛出到挂起的生成器中并对其进行迭代,直到发生 GeneratorExit 或 StopAsyncIteration。这就是在常规函数中使用 close 方法关闭对象一样,只不过 aclose 需要一个事件循环去执行。
  2. 不要在异步生成器中使用 yield 语句,只能用 await。
  3. 在sys模块中加两个方法:set_asyncgen_hooks() and get_asyncgen_hooks().
    sys.set_asyncgen_hooks() 背后的思想是允许事件循环拦截异步生成器的迭代和终结,这样最终用户就不需要关心终结问题了,一切正常。sys.set_asyncgen_hooks() 可以结束两个参数

firstiter:一个可调用的,当第一次迭代异步生成器时将调用它。
finalizer:一个可调用的,当异步生成器即将被 GC 时将被调用。

当第一迭代异步生成器时,它会引用到当前的 finalizer。
当异步生成器即将被垃圾收集时,它会调用其缓存的 finalizer。假想在事件循环激活异步生成器开始迭代的时候, finalizer 将调用一个 aclose() 方法.

例如,以下是如何修改 asyncio 以允许安全地完成异步生成器:

class BaseEventLoop:

    def run_forever(self):
        ...
        old_hooks = sys.get_asyncgen_hooks()
        sys.set_asyncgen_hooks(finalizer=self._finalize_asyncgen)
        try:
            ...
        finally:
            sys.set_asyncgen_hooks(*old_hooks)
            ...

    def _finalize_asyncgen(self, gen):
        self.create_task(gen.aclose())

第二个参数 firstiter,允许事件循环维护在其控制下实例化的弱异步生成器集。这使得可以实现“shutdown”机制,来安全地打开的生成器并关闭事件循环。
sys.set_asyncgen_hooks() 是特定线程,因此在多个事件循环并行的时候是安全的。
sys.get_asyncgen_hooks() 返回一个带有 firstiter 和 finalizer 字段的类似于类的结构。

asyncio

asyncio 事件循环将使用 sys.set_asyncgen_hooks() API 来维护所有被调度的弱异步生成器,并在生成器被垃圾回收时侯调度它们的 aclose() 方法。

为了确保 asyncio 程序可以可靠地完成所有被调度的异步生成器,我们建议添加一个新的事件循环协程方法 loop.shutdown_asyncgens()。 该方法将使用 aclose() 调用关闭所有当前打开的异步生成器。

在调用loop.shutdown_asyncgens() 方法之后,首次迭代新的异步生成器,事件循环就会发出警告。 我们的想法是,在请求关闭所有异步生成器之后,程序不应该执行迭代新异步生成器的代码。

下面是一个关于如何使用 Ashutdown_asyncgens 的例子:

try:
    loop.run_forever()
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())#关闭所有异步迭代器
    loop.close()

异步生成器对象

该对象以标准 Python 生成器对象为模型。 本质上异步生成器的行为复制了同步生成器的行为,唯一的区别在于 API 是异步的。

定义了以下方法和属性:

  1. agen.__aiter__(): 返回 agen.
  2. agen.__anext__(): 返回一个 awaitable 对象, 调用一次异步生成器的元素。
  3. agen.asend(val): 返回一个 awaitable 对象,它在 agen 生成器中推送 val对象。 当 agen 还没迭代时,val 必须为 None。

上面的方法类似同步生成器的使用。
代码例子:

import asyncio


async def gen():
    await asyncio.sleep(0.1)
    v = yield 42
    print(v)
    await asyncio.sleep(0.2)



async def start():
    g = gen()

    await g.asend(None)  # Will return 42 after sleeping
    # for 0.1 seconds.

    await g.asend('hello')  # Will print 'hello' and
    # raise StopAsyncIteration
    # (after sleeping for 0.2 seconds.)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(start())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

4.agen.athrow(typ, [val, [tb]]): 返回一个 awaitable 对象, 这会向 agen 生成器抛出一个异常。

代码如下:

import asyncio

async def gen():
    try:
        await asyncio.sleep(0.1)
        yield 'hello'
    except IndexError:
        await asyncio.sleep(0.2)
        yield 'world'


async def start():
    g = gen()
    v = await g.asend(None)
    print(v)  # Will print 'hello' after
    # sleeping for 0.1 seconds.

    v = await g.athrow(IndexError)
    print(v)  # Will print 'world' after
    # $ sleeping 0.2 seconds.


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(start())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

5.agen.aclose(): 返回一个 awaitable 对象, 调用该方法会抛出一个异常给生成器。

import asyncio


async def gen():
    try:
        await asyncio.sleep(0.1)
        v = yield 42
        print(v)
        await asyncio.sleep(0.2)
    except:
         print("运行结束") 


async def start():
    g = gen()
    v=await g.asend(None)
    print(v)
    await g.aclose() #不做异常处理会报错


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(start())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

6.agen.__name__ and agen.__qualname__:可以返回异步生成器函数的名字。

async def gen():
    try:
        await asyncio.sleep(0.1)
        v = yield 42
        print(v)
        await asyncio.sleep(0.2)
    except:
         print("运行结束")

async def start():
    g = gen()
    print(g.__aiter__())#输出async_generator对象
    print(g.__name__)#输出gen
    print(g.__qualname__)#输出gen

其他的方法

agen.ag_await: 正等待的对象(None). 类似当前可用的 gi_yieldfrom for generators and cr_await for coroutines.
agen.ag_frame, agen.ag_running, and agen.ag_code: 同生成器一样

StopIteration and StopAsyncIteration 被替换为 RuntimeError,并且不上抛。

源码实现细节

异步生成器对象(PyAsyncGenObject)与 PyGenObject 共享结构布局。 除此之外,参考实现还引入了三个新对象:

PyAsyncGenASend:实现 __anext__ 和 asend() 方法的等待对象。
PyAsyncGenAThrow:实现 athrow() 和 aclose() 方法的等待对象。
PyAsyncGenWrappedValue:来自异步生成器的每个直接生成的对象都隐式地装入此结构中。 这就是生成器实现如何使用常规迭代协议从使用异步迭代协议生成的对象中分离出的对象。

PyAsyncGenASend和 PyAsyncGenAThrow 是 awaitable 对象(它们有 __await__ 方法返回 self)类似于 coroutine 的对象(实现__iter__,__ next__,send() 和 throw() 方法)。 本质上,它们控制异步生成器的迭代方式。

PyAsyncGenASend and PyAsyncGenAThrow

PyAsyncGenASend 类似生成器对象驱动 anext and asend() 方法,实装了异步迭代协议。

agen.asend(val) 和 agen.anext() 返回一个 PyAsyncGenASend 对象的一个引用。 (它将引用保存回父类 agen 对象。)

数据流定义如下:

1. 首次调用 PyAsyncGenASend.send(val) 时, val将 推入到父类 agen 对象 (PyGenObject 利用现有对象。)
对 PyAsyncGenASend 对象进行后续迭代,将 None 推送到 agen。
2. 首次调用 _PyAsyncGenWrappedValue 对象时,它将被拆箱,并且以未被装饰的值作为参数会引发 StopIteration 异常。
3. 异步生成器中的 return 语句引发 StopAsyncIteration 异常,该异常通过 PyAsyncGenASend.send() 和 PyAsyncGenASend.throw() 方法传播。
4. PyAsyncGenAThrow与PyAsyncGenASend非常相似。 唯一的区别是PyAsyncGenAThrow.send() 在第一次调用时会向父类 agen 对象抛出异常(而不是将值推入其中。)

新的标准库方法和Types

  1. types.AsyncGeneratorType – 判断是否是异步生成器对象
  2. sys.set_asyncgen_hooks() 和 sys.get_asyncgen_hooks()–
    在事件循环中设置异步生成器终结器和迭代拦截器。
  3. inspect.isasyncgen() 和 inspect.isasyncgenfunction() :方法内省。
  4. asyncio 加入新方法: loop.shutdown_asyncgens().
  5. collections.abc.AsyncGenerator: 抽象基类的添加。

性能展示

常规生成器

import time


def gen():
    i = 0
    while i < 100000000:
        yield i
        i += 1


if __name__ == '__main__':
    start = time.time()
    list(gen())
    end = time.time()
    print("totals time", end - start)

输出

totals time 14.837260007858276

异步迭代器的改进

import time
import asyncio

N = 10 ** 7
class AIter:
    def __init__(self):
        self.i = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        if i >= N:
            raise StopAsyncIteration
        self.i += 1
        return i

async def start():
    [_ async for _ in AIter()]
if __name__ == '__main__':
    s=time.time()
    loop=asyncio.get_event_loop()
    try:
     loop.run_until_complete(start())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()
    e=time.time()
    print("total time",e-s)

输出

total time 5.441649913787842

很明显迭代异步生成器的速度比迭代普通生成器不只是快了两倍。

我们可以做一个更简单的异步生成器

import time
import asyncio

async def ticker(delay, to):
    """Yield numbers from 0 to `to` every `delay` seconds."""
    for i in range(to):
        yield i
        await asyncio.sleep(delay)

async def start():
    async for item in ticker(0.000001,100):
        print(item)
if __name__ == '__main__':
    s=time.time()
    loop=asyncio.get_event_loop()
    try:
     loop.run_until_complete(start())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()
    e=time.time()
    print("total time",e-s)

设计中要注意的事项

内建函数: aiter() and anext()

最初,PEP 492 将 __aiter__ 定义为应返回等待对象的方法,从而产生异步迭代器。
但是,在CPython 3.5.2中,重新定义了 __aiter__ 可以直接返回异步迭代器。
为了避免破坏向后兼容性,决定 Python 3.6 将支持两种方式:__aiter__ 仍然可以在发出 DeprecationWarning 时返回等待状态。由于 Python 3.6 中 __aiter__ 的这种双重性质,我们无法添加内置的 aiter() 的同步实现。 因此,建议等到 Python 3.7。

异步list/dict/set 推导式

将放在单独的 pep 中也就是后来的 pep530.

异步 yield from

对于异步生成器,yield from 也不那么重要,因为不需要提供在协程之上实现另一个协同程序协议的机制。为了组合异步生成器,可以使用 async for简化这个过程:

async def g1():
    yield 1
    yield 2

async def g2():
    async for v in g1():
        yield v

为了 asend() 和 athrow() 是必须的

它们可以使用异步生成器实现类似于 contextlib.contextmanager 的概念。 例如,可以实现以下模式:

@async_context_manager
async def ctx():
    await open()
    try:
        yield
    finally:
        await close()

async with ctx():
    await ...

另一个原因是从 __anext__ 对象返回的对象来推送数据并将异常抛出到异步生成器中,很难正确地执行此操作。 添加显式的asend()和athrow()更获取异常后的数据。
在实现方面,asend() 是 __anext__ 更通用的版本,而 athrow() 与 aclose() 非常相似。 因此,为异步生成器定义这些方法不会增加任何额外的复杂性。

代码示例

async def ticker(delay, to):
    for i in range(to):
        yield i
        await asyncio.sleep(delay)


async def run():
    async for i in ticker(1, 10):
        print(i)


import asyncio
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(run())
finally:
    loop.close()

这代码将打出 0-9,每个数字之间的间隔为 1s。

参考资料

Python 进阶

Python协程

Python3.5协程学习研究

asyncio:高性能异步模块使用介绍

爬虫速度太慢?来试试用异步协程提速吧!

对Python并发编程的思考

aiohttp 简易使用教程

aiohttp 中文文档

python异步asyncio模块的使用

Asyncio 使用经验

Python-aiohttp百万并发

坚持原创技术分享,您的支持将鼓励我继续创作!
------ 本文结束 ------

版权声明

LangZi_Blog's by Jy Xie is licensed under a Creative Commons BY-NC-ND 4.0 International License
由浪子LangZi创作并维护的Langzi_Blog's博客采用创作共用保留署名-非商业-禁止演绎4.0国际许可证
本文首发于Langzi_Blog's 博客( http://langzi.fun ),版权所有,侵权必究。

0%