用Python有一段时间了,但是延续其他语言开发经验,用线程较多。然而Python自身GIL机制导致计算密集型的运算用多线程反而低效。故专门研究了一下Python多进程的开发,在这里分享一些心得。
由于Python GIL机制的存在,Python在多线程执行时,并无法充分利用多核CPU的优势。甚至多线程执行计算密集型操作时,甚至慢于单线程重复执行操作。所以在日常开发中,如果要并行的操作是计算密集型,应该尽量用多进程取代多线程。尽管Python的多线程和多进程库在API设计层面,设计成了一致方法签名,方便开发者快速将程序在多线程和多进程间切换,但其实两种调用方案间有不少细微的差别,值得我们关注。
多线程和多进程
下面是Python使用多线程的代码
1import os
2import threading
3import time
4
5
6def foo(start, end):
7 print('Process: {}, Thread: {}'.format(os.getpid(), threading.currentThread().name))
8
9
10async_target = threading.Thread(target=foo, args=(0, 3, ))
11async_target.start()
12async_target.join()
代码输出中可以看到两个print命令在同一个进程13077,但不同的线程中完成。
1Process: 13077, Thread: MainThread
2Process: 13077, Thread: Thread-1
将该代码调整为多进程,十分简单:
1import multiprocessing
2async_target = multiprocessing.Process(target=foo, args=(0, 3, ))
3async_target.start()
4async_target.join()
这次,两个print命令发生在不同的进程,分别是13077和13079,但均在各进程的主线程(MainThread)中完成
1Process: 13077, Thread: MainThread
2Process: 13079, Thread: MainThread
线程池
Python的线程和进程都支持Pool。如果你在Python 2.7中尝试如下代码
1from multiprocessing.pool import ThreadPool
2pool = ThreadPool(3)
3pool.map(foo, ((0, 3, ), (1, 4, ), (2, 5, )))
会得到如下错误
1Process: 20254, Thread: MainThread
2Traceback (most recent call last):
3File "parallel.py", line 22, in <module>
4 pool.map(foo, ((0, 3, ), (1, 4, ), (2, 5, )))
5File "/usr/lib/python2.7/multiprocessing/pool.py", line 253, in map
6 return self.map_async(func, iterable, chunksize).get()
7File "/usr/lib/python2.7/multiprocessing/pool.py", line 572, in get
8 raise self._value
9TypeError: foo() takes exactly 2 arguments (1 given)
这是因为(0, 3, )没能自动unpack给foo,所以我们需要加一个workaround函数帮忙unpack一下。
1def unpack_foo(args):
2 foo(*args)
3
4from multiprocessing.pool import ThreadPool
5pool = ThreadPool(3)
6pool.map(unpack_foo, ((0, 3, ), (1, 4, ), (2, 5, )))
输出
1Process: 20843, Thread: Thread-2
2Process: 20843, Thread: Thread-3
3Process: 20843, Thread: Thread-4
不过到了python 3.3,有了starmap方法,我们日子轻松很多
1from multiprocessing.pool import ThreadPool
2pool = ThreadPool(3)
3pool.starmap(foo, ((0, 3, ), (1, 4, ), (2, 5, )))
进程池
回到正题,如果使用进程池,代码只需做很小的改动,将ThreadPool改为multiprocessing.Pool,其他不用修改
1pool = multiprocessing.Pool(3)
2pool.starmap(foo, ((0, 3, ), (1, 4, ), (2, 5, )))
可能你已经注意到了ThreadPool是放在multiprocessing.pool这个包内的。这是因为先有的进程池模式。而ThreadPool只是通过使用假进程multiprocessing.dummy复用进程池逻辑实现的。
计数问题
在多线程或者多进程环境下,如果同样的计数功能(多个线程或者多个进程同时修改一个计数器),代码是否也会很相似呢?先看多线程计数
1counter = 0
2
3def increment():
4 global counter
5 counter += 1
6
7pool = ThreadPool(3)
8for i in range(5):
9 pool.apply_async(increment)
10pool.close()
11pool.join()
12
13print('counter={}'.format(counter))
程序运行会输出counter=5
。请注意,我们使用了异步apply_async
方法,从而实现并发执行5次increment
方法。因此我们需要使用pool.join()
方法防止子线程完成工作前,主线程就结束。然而只有在线程池已经关闭(即不再有新的线程进入池子)时,才能执行join。故在join之前,需要执行pool.close()
如果像之前一样,我们简单替换ThreadPool
为multiprocessing.Pool
,会发现执行后counter=0
。因为counter并不能跨进程共享,所以每个进程都有自己的counter,并且自增1。而主进程没有执行这个操作,故counter依旧是0。跨进程完成计数需要依赖进程间共享数据,如:Value,Array,Manager等。
1shared_counter = None
2
3
4def init(args):
5 ''' store the counter for later use '''
6 global shared_counter
7 shared_counter = args
8
9
10def increment():
11 global shared_counter
12 with shared_counter.get_lock():
13 shared_counter.value += 1
14
15
16init_shared_counter = multiprocessing.Value('i', 0)
17
18pool = multiprocessing.Pool(3, initializer=init, initargs=(init_shared_counter, ))
19for i in range(5):
20 pool.apply_async(increment)
21pool.close()
22pool.join()
23
24print('counter={}'.format(init_shared_counter.value))
多进程版本和多线程版本差距较大,几个要关注的点:
- 首先要创建一个
multiprocessing.Value
作为计数器。构造函数有两个变量,第一个i表明这是一个signed int类型,第二个0表示该变量的初始值。 - 创建
multiprocessing.Pool
时,需要设置一个初始化函数init
,并将计数器作为参数传入。该函数会在每个进程初始化时被调用,即init_shared_counter
会传入每一个进程中,并赋值给进程内的全局变量shared_counter
- 在
increment
函数中可以自增进程内的全局变量shared_counter
,该操作会影响所有计数器init_shared_counter
,故需要加锁。
执行以上代码就会得到counter=5
的结果。
关于回调
线程和进程结束时,可以调用启动时设置的回调函数。
1def running():
2 print('Running Process: {}, Thread: {}'.format(os.getpid(), threading.currentThread().name))
3
4
5def completed(res):
6 print('Completed Process: {}, Thread: {}'.format(os.getpid(), threading.currentThread().name))
7
8
9print('Main Process: {}, Thread: {}'.format(os.getpid(), threading.currentThread().name))
10pool = ThreadPool(3)
11for i in range(5):
12 pool.apply_async(running, callback=completed)
13pool.close()
14pool.join()
输出
1Main Process: 24321, Thread: MainThread
2Running Process: 24028, Thread: Thread-24
3Running Process: 24028, Thread: Thread-24
4Running Process: 24028, Thread: Thread-24
5Running Process: 24028, Thread: Thread-23
6Running Process: 24028, Thread: Thread-25
7Completed Process: 24028, Thread: Thread-28
8Completed Process: 24028, Thread: Thread-28
9Completed Process: 24028, Thread: Thread-28
10Completed Process: 24028, Thread: Thread-28
11Completed Process: 24028, Thread: Thread-28
所有的完成回调都发生在同一个进程的,同一个线程中,但不是主线程。所以请确保回调执行可以迅速完成,否则会造成回调堆积,回调线程出现阻塞。
再看一下多进程的回调结果。
1Main Process: 24826, Thread: MainThread
2Running Process: 24865, Thread: MainThread
3Running Process: 24866, Thread: MainThread
4Running Process: 24867, Thread: MainThread
5Running Process: 24865, Thread: MainThread
6Running Process: 24867, Thread: MainThread
7Completed Process: 24826, Thread: Thread-31
8Completed Process: 24826, Thread: Thread-31
9Completed Process: 24826, Thread: Thread-31
10Completed Process: 24826, Thread: Thread-31
11Completed Process: 24826, Thread: Thread-31
所有的回调都发生在父进程,同一个线程中,但也不是主线程。
其它
如果使用apply_async
放入很多线程或者进程等待Pool有资源处理,有可能出现OOM情况。所以当预期会有超大量等待执行的任务时,建议不要调用async,而是使用apply或者map,执行固定数量的任务;该彼此任务结束后,再循环执行下一个批次。