python异步框架中协程之间的并行

前言

python中的异步协程框架有很多,比如 tornado, gevent, asyncio, twisted 等。协程带来的是低消耗的并发,在等待IO事件的时候可以把控制权交给其它的协程,这个是它并发能力的保障。但是光有并发还是不够的,高并发并不能保证低延迟,因为一个业务逻辑的流程可能包含多个异步IO的请求,如果这些异步IO的请求是一个一个逐步执行的,虽然server的吞吐量还是很高,但是每个请求的延迟就会很大。为了解决这类问题,每个框架都有各自不同的方式,下面我们就来分别看看,它们都是怎么管理互不相关协程之间的并行的。

如果对于异步IO不了解的朋友,可以先去了解下相关资料:epoll, select, aio等。

asyncio

python3.4及以上
在我的博客里有一篇关于asyncio库的译文,里面最后一部分就有介绍它是如何管理互不相关的协程的。这里我们还是引用它,并给他增加了计时的功能来更好地阐述协程是如何并行的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import asyncio
import random
import time


@asyncio.coroutine
def get_url(url):
wait_time = random.randint(1, 4)
yield from asyncio.sleep(wait_time)
print('URL {} took {}s to get!'.format(url, wait_time))
return url, wait_time


@asyncio.coroutine
def process_as_results_come_in():
before = time.time()
coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
for coroutine in asyncio.as_completed(coroutines):
url, wait_time = yield from coroutine
print('Coroutine for {} is done'.format(url))
after = time.time()
print('total time: {} seconds'.format(after - before))


@asyncio.coroutine
def process_once_everything_ready():
before = time.time()
coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
results = yield from asyncio.gather(*coroutines)
print(results)
after = time.time()
print('total time: {} seconds'.format(after - before))


def main():
loop = asyncio.get_event_loop()
print("First, process results as they come in:")
loop.run_until_complete(process_as_results_come_in())
print("\nNow, process results once they are all ready:")
loop.run_until_complete(process_once_everything_ready())


if __name__ == '__main__':
main()

代码输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ python3 asyncio_test.py
First, process results as they come in:
URL URL3 took 1s to get!
Coroutine for URL3 is done
URL URL2 took 2s to get!
Coroutine for URL2 is done
URL URL1 took 3s to get!
Coroutine for URL1 is done
total time: 3.001127004623413 seconds

Now, process results once they are all ready:
URL URL1 took 1s to get!
URL URL3 took 1s to get!
URL URL2 took 4s to get!
[('URL1', 1), ('URL2', 4), ('URL3', 1)]
total time: 4.004215955734253 seconds

asyncio里面,给了两种方案,都是可以做到协程的并行。可以看到函数执行的总时间几乎等于其中最慢的协程的运行时间。第一种方案中把协程包了起来asyncio.as_completed(coroutines),其实这里直接用coroutines也是可以执行的,不过那样的话协程之间就不是并行的了,而是串行执行了,整个函数的运行时间就是所有协程的时间之和。

  • as_completed的方式:协程一旦运行结束,马上处理结果
  • gather的方式:所有协程完成后统一返回结果并处理
    as_completed的方式灵活性更大,它也可以稍作修改,变成gather的方式(把所有结果保存起来,最后再处理,只不过暂时保存起来的过程交给用户处理了,而gather则是由框架代为处理)。还不知道gather内部的实现方式是否用到了as_completed暂时在这里先留个问题,以后再来回答

tornado

python2.7及以上
tornado的代码就简短很多,直接yield一个coroutine的列表出去就好了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import random
import time
from tornado import gen
from tornado.ioloop import IOLoop


@gen.coroutine
def get_url(url):
wait_time = random.randint(1, 4)
yield gen.sleep(wait_time)
print('URL {} took {}s to get!'.format(url, wait_time))
raise gen.Return((url, wait_time))

@gen.coroutine
def process_once_everything_ready():
before = time.time()
coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
result = yield coroutines
after = time.time()
print(result)
print('total time: {} seconds'.format(after - before))

@gen.coroutine
def process_once_everything_ready_2():
before = time.time()
coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
results = yield gen.Multi(coroutines)
after = time.time()
print(results)
print('total time: {} seconds'.format(after - before))

@gen.coroutine
def process_as_results_come_in():
before = time.time()
coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
wait_iterator = gen.WaitIterator(*coroutines)
while not wait_iterator.done():
url, wait_time = yield wait_iterator.next()
print('Coroutine for {} is done'.format(url))
after = time.time()
print('total time: {} seconds'.format(after - before))

@gen.coroutine
def amazing_test():
before = time.time()
coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
yield gen.sleep(5)
after = time.time()
print('total time: {} seconds'.format(after - before))

@gen.coroutine
def process_together_or_once_depend_on_the_sequence():
before = time.time()
coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
for future in coroutines:
url, wait_time = yield future
print('Coroutine for {} is done'.format(url))
after = time.time()
print('total time: {} seconds'.format(after - before))

if __name__ == '__main__':
print("First, process results as they come in:")
IOLoop.current().run_sync(process_as_results_come_in)
print("\nNow, process results once they are all ready:")
IOLoop.current().run_sync(process_once_everything_ready)
print("\nprocess results once they are all ready 2:")
IOLoop.current().run_sync(process_once_everything_ready_2)
print("\nAmazing test:")
IOLoop.current().run_sync(amazing_test)
print("\nprocess_together_or_once_depend_on_the_sequence:")
IOLoop.current().run_sync(process_together_or_once_depend_on_the_sequence)


输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
$ python3 tornado_test.py
First, process results as they come in:
URL URL2 took 2s to get!
Coroutine for URL2 is done
URL URL1 took 3s to get!
URL URL3 took 3s to get!
Coroutine for URL1 is done
Coroutine for URL3 is done
total time: 3.00437688828 seconds

Now, process results once they are all ready:
URL URL2 took 1s to get!
URL URL3 took 1s to get!
URL URL1 took 4s to get!
[('URL1', 4), ('URL2', 1), ('URL3', 1)]
total time: 4.000649929046631 seconds

process results once they are all ready 2:
URL URL2 took 1s to get!
URL URL3 took 1s to get!
URL URL1 took 4s to get!
[('URL1', 4), ('URL2', 1), ('URL3', 1)]
total time: 4.004122018814087 seconds

Amazing test:
URL URL3 took 1s to get!
URL URL1 took 2s to get!
URL URL2 took 4s to get!
total time: 5.004791975021362 seconds

nprocess_together_or_once_depend_on_the_sequence:
URL URL1 took 1s to get!
URL URL3 took 1s to get!
Coroutine for URL1 is done
URL URL2 took 2s to get!
Coroutine for URL2 is done
Coroutine for URL3 is done
total time: 2.0033011436462402 seconds

tornado的例子,参考了官方文档我给了5个(为啥框架提供这么多?选择太多反而不好,不是吗)。在这里,所有例子总的运行时间都是等于最长的协程的运行时间,这点合理!在这5个例子中,我给了一个 Amazing test ,因为自己乱试,看到这个效果的时候,感觉对tornado的三观有点被毁了,不过后来还是搞懂了,下面我就来解释一下。

tornado的协程中yield出去的对象是什么?起初,我以为就是一个协程的实例(也就是generator呀),跟asyncio一样。通过一个协程函数创建一个实例,如果它不运行,或者不把它yield出去,那它应该就永远不会被运行。在asyncio中是这样的(声明了一个协程,如果不用yield from把它交给调度器,那它就不会运行),但是在tornado中,一旦一个协程的实例被声明了,比如上面代码中的get_url('123'),那么它就已经被注册进事件循环了,该表达式返回的是一个Future,所以你yield出去的就是一个Future对象。正是因为它已经被注册了,所以当你调用gen.sleep(5)的时候,注册的三个事件一一被执行完,只是由于它们返回的Future并没有交给调度器,调度器不知道完成以后回调给哪里,所以执行完了就结束了。如果不调用gen.sleep(5),那么还未等所有事件执行完,主线程就结束了,也不会打印出三个事件的运行结果。可以看出,在tornado中Future是调度器与协程的通信官,它在协程之间被传来传去,而在asyncio中,协程(生成器)自身担当这个角色被传递。代码中的coroutine的变量名其实不准确,应该是future,只是当初试的时候还以为是coroutine,故此命名。

再来看看代码中的其它例子。gen模块提供了 WaitIterator 方法来封装一系列的future,这样就可以做到一旦有请求返回,就立即处理,同时也满足请求之间的并发。

对于process_once_everything_ready,我给了两个用法,其实准确地说,最后剩下的一种方式也是这样的机制。这里我再提一个问题:gen.Multi方法存在的意义是什么呢? 毕竟直接yield一个future的list就可以达到一样的效果(并行+所有请求结果统一处理)。

上面讲了这么多,最后一种方式其实也是可以理解了:声明了协程以后已自动纳入事件循环,与yield与否无关,同时因为list是有序的,for循环按顺序一个一个去yield,所以即使URL3完成得比URL2快,在主协程中future被yield的顺序永远是(1,2,3),结果处理的顺序当然也就是(1,2,3)了。

现在tornado也已经集成了asyncio以及twisted模块,可以利用它们的方式去做并行,这里就不展开了。

总结

  • 在协程框架中的sleep,都不能用原来time模块中的sleep了,不然它会阻塞整个线程,而所有协程都是运行在同一个线程中的。可以看到两个框架都会sleep作了封装gen.sleep()asyncio.sleep(),内部的实现上,它们都是注册了一个定时器在eventloop中,把CPU的控制权交给其它协程。
  • 从协程的实现原理层面去说,也是比较容易理解这种并行方式的。两个框架都是把一个生成器对象的列表yield出去,交给调度器,再由调度器分别执行并注册回调,所以才能够实现并行。