Python多进程开发

Posted on | 2183 words | ~5 mins

用Python有一段时间了,但是延续其他语言开发经验,用线程较多。然而Python自身GIL机制导致计算密集型的运算用多线程反而低效。故专门研究了一下Python多进程的开发,在这里分享一些心得。

由于Python GIL机制的存在,Python在多线程执行时,并无法充分利用多核CPU的优势。甚至多线程执行计算密集型操作时,甚至慢于单线程重复执行操作。所以在日常开发中,如果要并行的操作是计算密集型,应该尽量用多进程取代多线程。尽管Python的多线程和多进程库在API设计层面,设计成了一致方法签名,方便开发者快速将程序在多线程和多进程间切换,但其实两种调用方案间有不少细微的差别,值得我们关注。

多线程和多进程

下面是Python使用多线程的代码

 1import os
 2import threading
 3import time
 4
 5
 6def foo(start, end):
 7    print('Process: {}, Thread: {}'.format(os.getpid(), threading.currentThread().name))
 8
 9
10async_target = threading.Thread(target=foo, args=(0, 3, ))
11async_target.start()
12async_target.join()

代码输出中可以看到两个print命令在同一个进程13077,但不同的线程中完成。

1Process: 13077, Thread: MainThread
2Process: 13077, Thread: Thread-1

将该代码调整为多进程,十分简单:

1import multiprocessing
2async_target = multiprocessing.Process(target=foo, args=(0, 3, ))
3async_target.start()
4async_target.join()

这次,两个print命令发生在不同的进程,分别是13077和13079,但均在各进程的主线程(MainThread)中完成

1Process: 13077, Thread: MainThread
2Process: 13079, Thread: MainThread

线程池

Python的线程和进程都支持Pool。如果你在Python 2.7中尝试如下代码

1from multiprocessing.pool import ThreadPool
2pool = ThreadPool(3)
3pool.map(foo, ((0, 3, ), (1, 4, ), (2, 5, )))

会得到如下错误

1Process: 20254, Thread: MainThread
2Traceback (most recent call last):
3File "parallel.py", line 22, in <module>
4    pool.map(foo, ((0, 3, ), (1, 4, ), (2, 5, )))
5File "/usr/lib/python2.7/multiprocessing/pool.py", line 253, in map
6    return self.map_async(func, iterable, chunksize).get()
7File "/usr/lib/python2.7/multiprocessing/pool.py", line 572, in get
8    raise self._value
9TypeError: foo() takes exactly 2 arguments (1 given)

这是因为(0, 3, )没能自动unpack给foo,所以我们需要加一个workaround函数帮忙unpack一下。

1def unpack_foo(args):
2    foo(*args)
3
4from multiprocessing.pool import ThreadPool
5pool = ThreadPool(3)
6pool.map(unpack_foo, ((0, 3, ), (1, 4, ), (2, 5, )))

输出

1Process: 20843, Thread: Thread-2
2Process: 20843, Thread: Thread-3
3Process: 20843, Thread: Thread-4

不过到了python 3.3,有了starmap方法,我们日子轻松很多

1from multiprocessing.pool import ThreadPool
2pool = ThreadPool(3)
3pool.starmap(foo, ((0, 3, ), (1, 4, ), (2, 5, )))

进程池

回到正题,如果使用进程池,代码只需做很小的改动,将ThreadPool改为multiprocessing.Pool,其他不用修改

1pool = multiprocessing.Pool(3)
2pool.starmap(foo, ((0, 3, ), (1, 4, ), (2, 5, )))

可能你已经注意到了ThreadPool是放在multiprocessing.pool这个包内的。这是因为先有的进程池模式。而ThreadPool只是通过使用假进程multiprocessing.dummy复用进程池逻辑实现的。

计数问题

在多线程或者多进程环境下,如果同样的计数功能(多个线程或者多个进程同时修改一个计数器),代码是否也会很相似呢?先看多线程计数

 1counter = 0
 2
 3def increment():
 4    global counter
 5    counter += 1
 6
 7pool = ThreadPool(3)
 8for i in range(5):
 9    pool.apply_async(increment)
10pool.close()
11pool.join()
12
13print('counter={}'.format(counter))

程序运行会输出counter=5。请注意,我们使用了异步apply_async方法,从而实现并发执行5次increment方法。因此我们需要使用pool.join()方法防止子线程完成工作前,主线程就结束。然而只有在线程池已经关闭(即不再有新的线程进入池子)时,才能执行join。故在join之前,需要执行pool.close()

如果像之前一样,我们简单替换ThreadPoolmultiprocessing.Pool,会发现执行后counter=0。因为counter并不能跨进程共享,所以每个进程都有自己的counter,并且自增1。而主进程没有执行这个操作,故counter依旧是0。跨进程完成计数需要依赖进程间共享数据,如:Value,Array,Manager等。

 1shared_counter = None
 2
 3
 4def init(args):
 5    ''' store the counter for later use '''
 6    global shared_counter
 7    shared_counter = args
 8
 9
10def increment():
11    global shared_counter
12    with shared_counter.get_lock():
13        shared_counter.value += 1
14
15
16init_shared_counter = multiprocessing.Value('i', 0)
17
18pool = multiprocessing.Pool(3, initializer=init, initargs=(init_shared_counter, ))
19for i in range(5):
20    pool.apply_async(increment)
21pool.close()    
22pool.join()
23
24print('counter={}'.format(init_shared_counter.value))

多进程版本和多线程版本差距较大,几个要关注的点:

  1. 首先要创建一个multiprocessing.Value作为计数器。构造函数有两个变量,第一个i表明这是一个signed int类型,第二个0表示该变量的初始值。
  2. 创建multiprocessing.Pool时,需要设置一个初始化函数init,并将计数器作为参数传入。该函数会在每个进程初始化时被调用,即init_shared_counter会传入每一个进程中,并赋值给进程内的全局变量shared_counter
  3. increment函数中可以自增进程内的全局变量shared_counter,该操作会影响所有计数器 init_shared_counter,故需要加锁。

执行以上代码就会得到counter=5的结果。

关于回调

线程和进程结束时,可以调用启动时设置的回调函数。

 1def running():
 2    print('Running Process: {}, Thread: {}'.format(os.getpid(), threading.currentThread().name))
 3
 4
 5def completed(res):
 6    print('Completed Process: {}, Thread: {}'.format(os.getpid(), threading.currentThread().name))
 7
 8
 9print('Main Process: {}, Thread: {}'.format(os.getpid(), threading.currentThread().name))
10pool = ThreadPool(3)
11for i in range(5):
12    pool.apply_async(running, callback=completed)
13pool.close()
14pool.join()

输出

 1Main Process: 24321, Thread: MainThread
 2Running Process: 24028, Thread: Thread-24
 3Running Process: 24028, Thread: Thread-24
 4Running Process: 24028, Thread: Thread-24
 5Running Process: 24028, Thread: Thread-23
 6Running Process: 24028, Thread: Thread-25
 7Completed Process: 24028, Thread: Thread-28
 8Completed Process: 24028, Thread: Thread-28
 9Completed Process: 24028, Thread: Thread-28
10Completed Process: 24028, Thread: Thread-28
11Completed Process: 24028, Thread: Thread-28

所有的完成回调都发生在同一个进程的,同一个线程中,但不是主线程。所以请确保回调执行可以迅速完成,否则会造成回调堆积,回调线程出现阻塞。

再看一下多进程的回调结果。

 1Main Process: 24826, Thread: MainThread
 2Running Process: 24865, Thread: MainThread
 3Running Process: 24866, Thread: MainThread
 4Running Process: 24867, Thread: MainThread
 5Running Process: 24865, Thread: MainThread
 6Running Process: 24867, Thread: MainThread
 7Completed Process: 24826, Thread: Thread-31
 8Completed Process: 24826, Thread: Thread-31
 9Completed Process: 24826, Thread: Thread-31
10Completed Process: 24826, Thread: Thread-31
11Completed Process: 24826, Thread: Thread-31

所有的回调都发生在父进程,同一个线程中,但也不是主线程。

其它

如果使用apply_async放入很多线程或者进程等待Pool有资源处理,有可能出现OOM情况。所以当预期会有超大量等待执行的任务时,建议不要调用async,而是使用apply或者map,执行固定数量的任务;该彼此任务结束后,再循环执行下一个批次。