python使用全局解释器锁(GIL),他会将进程中的线程序列化,也就是多核cpu实际上并不能达到并行提高速度的目的,而使用多进程则是不受限的,所以实际应用中都是推荐多进程的。使用多进程往往是用来处理CPU密集型(科学计算)的需求,如果是IO密集型(文件读取,爬虫等)则可以使用多线程去处理。

尽管Python完全支持多线程编程, 但是解释器的C语言实现部分在完全并行执行时并不是线程安全的。 实际上,解释器被一个全局解释器锁保护着,它确保任何时候都只有一个Python线程执行。 GIL最大的问题就是Python的多线程程序并不能利用多核CPU的优势 (比如一个使用了多个线程的计算密集型程序只会在一个单CPU上面运行)。GIL只会影响到那些严重依赖CPU的程序(比如计算型的)。 如果你的程序大部分只会涉及到I/O,比如网络交互,那么使用多线程就很合适, 因为它们大部分时间都在等待。实际上,你完全可以放心的创建几千个Python线程, 现代操作系统运行这么多线程没有任何压力,没啥可担心的。

multiprocessing常用组件及功能

  • 创建管理进程的模块

    • Process:用于创建进程模块
    • Pool:用于创建管理进行池
    • Queue:用于进程通信,资源共享
    • Value,Array:用于进程通信,资源共享
    • Pipe:用于管道通信
    • Manager:用于资源共享
  • 同步子进程模块

    • Condition
    • Event
    • Lock
    • RLock
    • Semaphore

Multiprocessing进程管理模块

Process模块

  Process模块用来创建子进程,是Multiprocessing核心模块,使用方式与Threading类似,可以实现多进程的创建,启动,关闭等操作。
构造方法

1
Process([group [, target [, name [, args [, kwargs]]]]])

  • group: 线程组,目前还没有实现,库引用中提示必须是None;
  • target: 要执行的方法;
  • name: 进程名;
  • args/kwargs: 要传入方法的参数。

实例方法

  • is_alive():进程是否在运行
  • join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
  • start():进程准备就绪,等待CPU调度。
  • run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
  • terminate():不管任务是否完成,立即停止工作进程。

属性

  • authkey
  • daemon:和线程的setDaemon功能一样,将父进程设置为守护进程,当父进程结束时,子进程也结束
  • exitcode:进程在运行时为none,如果为-N,表示信号N结束
  • name:进程的名字
  • pid:进程号

创建多进程的两种方法

  • 使用Process创建子进程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    from multiprocessing import Process
    import os

    def info(title):
    print(title)
    print('module name:', __name__)
    if hasattr(os, 'getppid'): # only available on Unix
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

    def f(name):
    info('function f')
    print('hello', name)

    if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()
  • 使用Process类继承创建子进程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    from multiprocessing import Process
    import time
    class MyProcess(Process):
    def __init__(self, arg):
    super(MyProcess, self).__init__()
    #multiprocessing.Process.__init__(self)
    self.arg = arg
    def run(self):
    print('nMask', self.arg)
    time.sleep(1)
    if __name__ == '__main__':
    for i in range(10):
    p = MyProcess(i)
    p.start()
    for i in range(10):
    p.join()

Pool模块

  Multiprocessing.Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果还没满,就会创建一个新的进程来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行它。在共享资源时只能使用Multiprocessing.Manager,而不能使用Queue或Array。
用途
Pool类用于需要执行的目标很多,而手动限制进程数量又太繁琐时,如果目标少且不用控制进程数量则可以用Process类。
构造方法

1
Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

  • processes:使用的工作进程的数量,如果processes是None,那么使用os.cpu_count()返回的数量
  • initializer:如果initializer是None,那么每一个工作进行在开始的时候会调用initializer(*initargs)
  • maxtasksperchild:工作进程退出之前可以完成的任务数,完成后一个新的工作进程来代替原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要pool存在工作进程就会一直存活。
  • context:用在制定工作进程启动时的上下文,一般使用multiprocessing.Pool()或一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context

实例方法

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞
  • apply(func[, args[, kwds]])是阻塞的
  • close() 关闭pool,使其不在接受新的任务
  • terminate() 关闭pool,结束工作进程,不在处理未完成的任务
  • join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用

使用方法

  • Pool+map
    1
    2
    3
    4
    5
    6
    7
    8
    9
    rom multiprocessing import Pool
    def test(i):
    print i
    if __name__=="__main__":
    lists=[1,2,3]
    pool=Pool(processes=2) #定义最大的进程数
    pool.map(test,lists) #p必须是一个可迭代变量。
    pool.close()
    pool.join()

initializer的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import multiprocessing
def do_calculation(data):
return data*2
def start_process():
print('Starting',multiprocessing.current_process().name)
if __name__=='__main__':
inputs=list(range(10))
print('Inputs :',inputs)
builtin_output=map(do_calculation,inputs)
print ('Build-In :', builtin_output)
pool_size=multiprocessing.cpu_count()*2
pool=multiprocessing.Pool(processes=pool_size,
initializer=start_process,)
pool_outputs=pool.map(do_calculation,inputs)
pool.close()
pool.join()
print('Pool :',pool_outputs)

异步进程池(非阻塞)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from  multiprocessing import Pool
import time
def f(i):
print(i)
if __name__ == '__main__':
t_start=time.time()
pool = Pool(processes=10)

for i in range(100):
'''
for循环执行步骤
1.循环遍历,将100个子进程添加到进程池(相对父进程会阻塞)
2.每次执行10个子进程,等一个子进程执行完后,立马启动新的子进程(相对父进程不阻塞)
apply_async为异步进程池写法
异步指的是启动子进程的过程,与父进程本身的执行(print)是异步的,而for循环中往进程池添加子进程的过程,与父进程本身的执行却是同步的
'''
pool.apply_async(func=f, args=(i,)) #维持执行的进程总数为10,当一个进程执行完后启动一个新进程.
print('end')
pool.close()
pool.join()
t_end=time.time()
t=t_end-t_start
print('the program time is :%s' %t)

注意:调用join之前,先调用close或者terminate方法,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束。

同步进程池(阻塞)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from  multiprocessing import Pool
import time
def f(i):
print(i)
if __name__ == '__main__':
t_start=time.time()
pool = Pool(processes=10)

for i in range(100):
'''
for循环执行步骤
1.遍历100个可迭代对象,往进程池放一个子进程
2.执行这个子进程,等子进程执行完毕,再往进程池放一个子进程,再执行(同时只执行一个子进程)
for循环执行完毕,再执行print函数
'''
pool.apply_async(f, args=(i,)) # 维持执行的进程总数为10,当一个进程执行完后启动一个新进程
print('end')
pool.close()
pool.join()
t_end=time.time()
t=t_end-t_start
print('the program time is :%s' %t)

进程通信

Queue模块

  Queue模块用来控制进程安全,与线程中的Queue用法一样。Queue的功能是将每个核或线程的运算结果放在队里中, 等到每个线程或核运行完毕后再从队列中取出结果, 继续加载运算。原因很简单, 多线程调用的函数不能有返回值, 所以使用Queue存储多个线程运算的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import multiprocessing as mp
# 定义一个被多线程调用的函数,q像一个队列,用来保存每次函数运行的结果
def job(q):
res=0
for i in range(1000):
res+=i+i**2+i**3
q.put(res)

if __name__=='__main__':
# 定义一个多线程队列
q=mp.Queue()
'''
定义两个线程函数,用来处理同一个任务,args的参数只要一个值的时候,参数后面需要加一个逗号,表示args是可迭代的,后面可能还有别的参数,不加逗号会出错
'''
p1=mp.Process(target=job,args=(q,))
p2=mp.Process(target=job,args=(q,))
# 分别启动 连接两个线程
p1.start()
p2.start()
p1.join()
p2.join()
# 上面是分两批处理的 所以这里分两批输出 将结果分别保存
res1=q.get()
res2=q.get()
print(res1,res2)

Queues是线程进程安全的

Pipe模块

1
2
3
4
5
6
7
8
9
10
from multiprocessing import Process,Pipe
def f(conn):
conn.send([42,None,'hello'])
conn.close()
if __name__=='__main__':
parent_conn,child_conn=Pipe()
p=Process(target=f,args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()

第二个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Process, Pipe

def talk(pipe):
pipe.send(dict(name='Bob', spam=42)) # 传输一个字典
reply = pipe.recv() # 接收传输的数据
print('talker got:', reply)

if __name__ == '__main__':
(parentEnd, childEnd) = Pipe() # 创建两个 Pipe() 实例,也可以改成 conf1, conf2
child = Process(target=talk, args=(childEnd,)) # 创建一个 Process 进程,名称为 child
child.start() # 启动进程
print('parent got:', parentEnd.recv()) # parentEnd 是一个 Pip() 管道,可以接收 child Process 进程传输的数据
parentEnd.send({x * 2 for x in 'spam'}) # parentEnd 是一个 Pip() 管道,可以使用 send 方法来传输数据
child.join() # 传输的数据被 talk 函数内的 pip 管道接收,并赋值给 reply
print('parent exit')

Manager模块

  Manager模块常与Pool模块一起使用,作用是共享资源。

Multiprocessing同步进程模块

Lock模块

  作用:当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。
  具体场景:所有的任务在打印的时候都会向同一个标准输出(stdout)输出。这样输出的字符会混合在一起,无法阅读。使用Lock同步,在一个任务输出完成之后,再允许另一个任务输出,可以避免多个任务同时向终端输出。

1
2
3
4
5
6
7
8
9
from multiprocessing import Process,Lock
def l(lock,num):
lock.acquire()
print("Hello Num:%s" % num)
lock.release()
if __name__=="__main__":
lock=Lock()
for num in range(10):
Process(target=l,args=(lock,num)).start()

Semaphore模块

  作用:用来控制对共享资源的访问数量,例如池的最大连接数。

Event模块

  作用:用来实现进程间同步通信。

问题

  1. 启动多进程的代码一定要放在 if name==”main“: 后面吗?
    windows系统下,想要启动一个子进程,必须加上if name==”main“:,linux则不需要。
  2. 父进程中的全局变量能被子进程共享吗?
    不行,因为每个进程享有独立的内存数据,如果想要共享资源,可以使用Manage类,或者Queue等模块。
  3. 子进程能结束其他子进程或父进程吗?如果能,怎么通过子进程去结束所有进程?
    此需求可以稍作修改:所有的子进程都是为了完成一件事情,而当某个子进程完成该事情后,父进程就该结束所有子进程,请问该怎么做?此时结束所有子进程的操作可以交给父进程去做,因为子进程想要结束另外的子进程比较难实现。
    那么问题就又变成了父进程什么时候该结束所有进程?其中一个思路是获取每个子进程的返回值,一旦有返回True(结束的标记),则立马结束所有进程;另外一种思路是使用共享资源,父进程可以一直去判断这个公共资源,一旦子进程将它改变,则结束所有子进程。(推荐使用前者,因为多进程中不推荐使用资源共享)
  4. 子进程中还能再创建子进程吗?
    可以,子进程可以再创建进程,线程中也可以创建进程。

参考:
http://python.jobbole.com/87760/
http://python.jobbole.com/87645/