Python multiprocessing模块精讲
multiprocessing 模块无须安装,从 Python 2.6 开始系统便自带该模块了。该模块的接口函数和 threading 类似,但是它启动的是进程而不是线程。
使用该模块时需要先将其引入 multiprocessing,方法如下:
import multiprocessing
该模块包含很多类,如Lock,其和多线程中的锁类似,本节对这些类都会有所涉及。
创建进程最简单的方法是创建一个 multiprocessing.Process 的实例,在创建该实例时需要提供入口函数。
下面就是一个最简单的创建进程的例子:
import time, os import multiprocessing # 引入multiprocessing模块 # 子进程要执行的代码,可以看作是进程的入口函数 def process_entry(): print(u"子进程在运行") print(u'子进程的ID = %d' % os.getpid()) if __name__=='__main__': print(u'父进程的ID = %d' % os.getpid()) p = multiprocessing.Process(target=process_entry) time.sleep(1) print(u'启动子进程') p.start() # 此时才真正开始运行子进程 time.sleep(2) print(u'父进程结束')
运行结果如下:
$ python multiprocessingDemo1.py # 运行脚本 父进程的ID = 63998 启动子进程 子进程在运行 子进程的ID = 63999 父进程结束
当然创建进程时也可以带上参数。上例子中没有带任何参数,下面的例子中带了一个参数。
import time, os import multiprocessing # 引入multiprocessing模块 # 子进程要执行的代码,入口函数带有一个参数arg1 def process_entry(arg1): # 子进程的入口函数 print(u"子进程(%d)在运行" % arg1) print(u'子进程(%d)的ID = %d' % (arg1, os.getpid())) if __name__=='__main__': print(u'父进程的ID = %d' % os.getpid()) # 指定了参数为1 p1 = multiprocessing.Process(target=process_entry, args=(1,)) # 指定了参数为2 p2 = multiprocessing.Process(target=process_entry, args=(2,)) time.sleep(1) print(u'启动子进程') # 此时子进程才开始执行 p1.start() p2.start() time.sleep(2) print(u'父进程结束')
运行结果如下:
$ python multiprocessingDemo2.py 父进程的ID = 64055 启动子进程 子进程(1)在运行 子进程(1)的ID = 64056 子进程(2)在运行 子进程(2)的ID = 64057 父进程结束
另外一种创建进程的方法是从 Process 派生一个自己的类,我们只需要定义该类的 run() 函数即可。启动进程时就会运行该 run() 函数。
import time, os import multiprocessing # 用户自建的进程类 class NewProcess(multiprocessing.Process): def __init__(self, arg): # 初始化函数 super(NewProcess, self).__init__() self.arg = arg def run(self): # 入口函数 print(u"子进程(%d)在运行" % self.arg) print(u'子进程(%d)的ID = %d' % (self.arg, os.getpid())) if __name__=='__main__': print(u'父进程的ID = %d' % os.getpid()) p1 = NewProcess(1) p2 = NewProcess(2) time.sleep(1) print(u'启动子进程') p1.start() # 启动子进程 p2.start() time.sleep(2) # 休眠2秒钟 print(u'父进程结束')
运行结果如下:
$ python3 multiprocessingDemo5.py 父进程的ID = 62560 启动子进程 子进程(1)在运行 子进程(1)的ID = 62561 子进程(2)在运行 子进程(2)的ID = 62562 父进程结束
在创建完进程后,可以对其进行很多操作,如启动、退出、查看其运行状态等。本节将介绍与进程相关的一些属性。
这个和操作系统返回的 PID 是一样的。可以通过下面的例子来演示其用法:
import time, os import multiprocessing def child_process_entry(): pid = os.getpid() ppid = os.getppid() print(u"子进程: PID = %d, PPID = %d" % (pid, ppid)) while True: time.sleep(10) main_pid = os.getpid() child_process = multiprocessing.Process(target=child_process_entry) child_process.start() print(u"主进程: PID=%d" % main_pid) print(u"主进程: 子进程的PID=%d" % child_process.pid) child_process.join()
如果是在Linux或者macOS系统上运行,其可能的输出如下:
$ python3 demo2.py # 启动脚本 主进程: PID=52983 # 在主进程中得到自己的PID 主进程: 子进程的PID=52984 # 在主进程中得到子进程的PID # 在子进程中得到子进程的PID和主进程的PID 子进程: PID = 52984, PPID = 52983
我们可以在 shell 中看到这两个进程,如下:
$ ps ax | grep python # 查看所有的Python进程 52983 s001 S+ 0:00.05 python3 demo2.py # 主进程,第一列是PID 52984 s001 S+ 0:00.00 python3 demo2.py # 子进程 53585 s003 S+ 0:00.00 grep python # 这是我们的查询进程,不用管
在这个例子中,使用 os.getpid() 得到当前进程的 PID,使用 os.getppid() 得到当前进程的父进程的 PID。
如果是在 Windows 系统下运行,需要加上下面运行代码:
multiprocessing.freeze_support()
完整代码如下:
import time, os import multiprocessing def child_process_entry(): # 子进程入口 pid = os.getpid() ppid = os.getppid() print(u"子进程: PID = %d, PPID = %d" % (pid, ppid)) while True: time.sleep(10) if __name__ == '__main__': # 主程序 # windows下必须要有的 multiprocessing.freeze_support() # 新加的代码 main_pid = os.getpid() child_process = multiprocessing.Process(target=child_process_entry) child_process.start() print(u"主进程: PID=%d" % main_pid) print(u"主进程: 子进程的PID=%d" % child_process.pid) child_process.join()
运行结果如下:
> python demo2.py 主进程: PID=1008 主进程: 子进程的PID=4432 子进程: PID = 4432, PPID = 1008
此时在 Windows 任务管理器中可以看到两个 Python 进程,如图 1 所示。
图 1 进程ID
可以看到代码返回的 PID 和操作系统中的 PID 是一致的。
另外,属性 ident 也是表示进程 ID 的,它们其实是同一个对象。下面的代码使用 id() 检查并确认它们为同一个对象。
import time, os import multiprocessing def process_entry(arg1): # 子进程要执行的代码 return arg1 * 2 if __name__=='__main__': p1 = multiprocessing.Process(target=process_entry, args=(1,), daemon=True) p1.start() if id(p1.pid) == id(p1.ident): print(u"pid和ident是同一个对象") time.sleep(1)
运行结果如下:
$ python multiprocessing_pid_ident.py pid和ident是同一个对象
Daemon 进程在父进程退出时自动退出,而且不能再创建新的进程。该属性默认值是 False,表示普通的进程。
我们可以在创建进程时通过参数 daemon=True 来创建一个 Daemon 进程。下面先来看看非 Daemon 进程在父进程退出后状态。在下面的例子中,父进程在启动子进程 10 秒钟后就退出了,但是子进程需要运行 15 秒钟才退出。
import time, os import multiprocessing # 子进程要执行的代码 def process_entry(arg1): print(u"子进程(%d)在运行" % arg1) print(u'子进程(%d)的ID = %d' % (arg1, os.getpid())) round = 5 while round > 0: print(u"子进程在运行中") time.sleep(3) round = round - 1 print(u"子进程退出") if __name__=='__main__': print(u'父进程的ID = %d' % os.getpid()) p1 = multiprocessing.Process(target=process_entry, args=(1,),daemon=False) time.sleep(1) print(u'启动子进程') p1.start() time.sleep(10) print(u'父进程结束')
运行结果如下:
$ python multiprocessingDemo3.py 父进程的ID = 65687 启动子进程 子进程(1)在运行 子进程(1)的ID = 65688 子进程在运行中 子进程在运行中 子进程在运行中 子进程在运行中 父进程结束 子进程在运行
可以看到在父进程退出后,子进程继续执行。
如果在创建时指定子进程是 Daemon 进程,那么在父进程执行完毕后子进程会被强制退出。修改上例中的第 15 行代码,原来是:
p1 = multiprocessing.Process(target=process_entry, args=(1,), daemon=False)
现在修改为:
p1 = multiprocessing.Process(target=process_entry, args=(1,), daemon=True)
运行修改后的代码,可以看到下面的输出:
$ python multiprocessingDemo4.py 父进程的ID = 9734 启动子进程 子进程(1)在运行 子进程(1)的ID = 9735 子进程在运行中 子进程在运行中 子进程在运行中 子进程在运行中 父进程结束
可以看到在父进程退出后子进程自动被强制退出。
我们也可以在调用进程的 start() 之前设置该属性,方法如下:
进程对象. Daemon = True
就是进程函数返回的值。下面的代码演示了如何得到进程的返回码。
import time, os, sys import multiprocessing def process_entry(arg1): # 子进程要执行的代码 sys.exit(arg1*2) if __name__=='__main__': p1 = multiprocessing.Process(target=process_entry, args=(1,), daemon=True) p1.start() time.sleep(1) print(u"返回值是%d" % p1.exitcode)
运行结果如下:
$ python multiprocessing_exit_code.py 返回值是2
除了可以获得进程的属性,进程实例对象还提供了一些接口函数,通过这些接口函数可以对进程进行操作,如启动进程、判断进程是否仍然在运行、得到进程退出码、等待进程退出、强制要求进程退出等。
就像前面演示的那样,只有在调用该函数之后,进程才真正运行起来,进程对象也才有了进程的 ID。在调用该函数之前,进程对象的 pid 属性为 None。下面的代码演示这个情况:
import time, os import multiprocessing # 子进程要执行的代码 def process_entry(arg1): print(u"子进程在运行") return arg1 * 2 if __name__=='__main__': p1 = multiprocessing.Process(target=process_entry, args=(1,), daemon=True) if p1.pid is None: print(u"1)在调用start()之前,子进程的ID=None") else: print(u"1)在调用start()之前,子进程的ID=%d" % p1.pid) p1.start() time.sleep(1) print(u"2)在调用start()之后,子进程的ID=%d" % p1.pid) time.sleep(1)
运行结果如下:
$ python multiprocessing_noIDbeforeStart.py 1)在调用start()之前,子进程的ID=None 子进程在运行 2)在调用start()之后,子进程的ID=12430
而且对于某一个进程对象,只能调用一次该函数,如果多次调用,除第一次之外,其他的都会抛出异常。例如下面的代码:
import time, os import multiprocessing # 子进程要执行的代码 def process_entry(arg1): print(u"子进程在运行") return arg1 * 2 if __name__=='__main__': p1 = multiprocessing.Process(target=process_entry, args=(1,), daemon=True) p1.start() time.sleep(1) print(u'再次启动该进程对象,导致错误') p1.start() # 再次启动,抛出异常
运行结果如下:
$ python multiprocessing_restart.py 子进程在运行 再次启动该进程对象,导致错误 Traceback (most recent call last): File “multiprocessing_restart.py”, line 14, in <module> p1.start() File “/anaconda3/lib/python3.7/multiprocessing/process.py”, line 106, in start assert self._popen is None, ‘cannot start a process twice’ AssertionError: cannot start a process twice
在调用 start() 之前,该函数返回 False;在该进程退出后,该函数返回 False;在其他时候,该函数返回 True。
下面的例子演示了该函数在不同时刻返回的值。
import time, os import multiprocessing def process_entry(arg1): # 子进程要执行的代码 max_round = 2 while max_round > 0: time.sleep(3) max_round = max_round - 1 print(u'子进程结束') if __name__=='__main__': p1 = multiprocessing.Process(target=process_entry, args=(1,), daemon=True) print(u"1)子进程是否存活: %s" % p1.is_alive()) p1.start() print(u"2)子进程是否存活: %s" % p1.is_alive()) max_query = 10 while max_query > 0: time.sleep(1) print(u"3)子进程是否存活: %s" % p1.is_alive()) max_query = max_query - 1 print(u'父进程结束')
运行结果如下:
$ python multiprocessing_isalive1.py 1)子进程是否存活: False 2)子进程是否存活: True 3)子进程是否存活: True 3)子进程是否存活: True 3)子进程是否存活: True 3)子进程是否存活: True 3)子进程是否存活: True 子进程结束 3)子进程是否存活: False 3)子进程是否存活: False 3)子进程是否存活: False 3)子进程是否存活: False 3)子进程是否存活: False 父进程结束
该函数在前面的例子中使用过。如果没有指定超时时间则一直等待,直到指定进程退出为止。
import time, os import multiprocessing def process_entry(arg1): # 子进程要执行的代码 print(u"子进程(%d)在运行" % arg1) print(u'子进程(%d)的ID = %d' % (arg1, os.getpid())) max_round = 3 while max_round > 0: print(u"%d) 子进程在运行中" % max_round) time.sleep(3) max_round = max_round - 1 if __name__=='__main__': print(u'父进程的ID = %d' % os.getpid()) p1 = multiprocessing.Process(target=process_entry, args=(1,), daemon=True) time.sleep(1) print(u'启动子进程') p1.start() p1.join() # 这个函数没有返回值 print(u'父进程结束')
运行结果如下:
$ python multiprocessing_joinDemo1.py 父进程的ID = 9940 启动子进程 子进程(1)在运行 子进程(1)的ID = 9941 3) 子进程在运行中 2) 子进程在运行中 1) 子进程在运行中 父进程结束
如果指定了超时时间,则可能在指定进程退出之前该函数就返回了。这时不能使用返回值来判断是因为进程退出了还是超时了,而需要使用 is_alive() 来判断是否是因为超时而返回的。下面的代码演示了这种用法。
import time, os import multiprocessing def process_entry(arg1): # 子进程要执行的代码 print(u"子进程(%d)在运行" % arg1) print(u'子进程(%d)的ID = %d' % (arg1, os.getpid())) max_round = 3 while max_round > 0: print(u"%d) 子进程在运行中" % max_round) time.sleep(3) max_round = max_round - 1 print(u'子进程结束') if __name__=='__main__': print(u'父进程的ID = %d' % os.getpid()) p1 = multiprocessing.Process(target=process_entry, args=(1,), daemon=True) time.sleep(1) print(u'启动子进程') p1.start() while p1.is_alive(): p1.join(1) # 仅仅等待1秒钟 print(u'父进程结束')
运行结果如下:
$ python multiprocessing_joinDemo2.py 父进程的ID = 9985 启动子进程 子进程(1)在运行 子进程(1)的ID = 9986 3) 子进程在运行中 2) 子进程在运行中 1) 子进程在运行中 子进程结束 父进程结束
该函数给指定的进程发送 SIGKILL 信号。如果是 Windows 系统,那么就是调用了 TerminateProcess() 接口函数。
另外一个强制退出的接口函数是 terminate(),其内部实现和接口函数 kill() 会有所不同,但我们可以将它们当作同一个接口函数来使用。下面的代码演示了该接口函数的用法。
import time, os, sys import multiprocessing def process_entry(arg1): # 子进程要执行的代码 while True: # 子进程不会自主,是一个死循环 time.sleep(1) if __name__=='__main__': p1 = multiprocessing.Process(target=process_entry, args=(1,), daemon=True) p1.start() if p1.is_alive(): print(u"子进程在运行中") else: print(u"子进程没有在运行中") print(u"杀死子进程") p1.kill() time.sleep(2) if p1.is_alive(): print(u"子进程在运行中") else: print(u"子进程没有在运行中")
运行该程序,输出如下:
$ python multiprocessing_kill.py 子进程在运行中 杀死子进程 子进程没有在运行中
需要注意的是,并不是所有的子进程都可以被杀死掉的,也不是立即就可以被杀死掉的。就像杀死 Windows 下的某些进程,由于这些进程忽略对外部消息的响应,导致它们很难被杀死。我们也不认为从外部杀死一个进程是一个好的想法。相对来说,让进程自主优雅地退出是一个更好的设计。
如果频繁去创建一个进程,然后销毁它,会导致性能下降。对于这种情况推荐的做法是事先创建一个进程池,在有任务达到时从进程池中取出一个进程来执行相关任务,在任务完成后便归还回去。这样做可以复用部分已有进程资源,达到提升效率的作用。
在 mulitprocessing 模块中有一个 Pool 类可以帮助我们完成该任务。下面是一个使用进程池的例子。
import multiprocessing # 引入multiprocessing模块 import time # 引入time模块 def child_process_entry(): # 定义进程入口函数 print(u"子进程在运行") time.sleep(10) print(u"子进程结束") pool_obj = multiprocessing.Pool(processes = 5) # 建立5个元素池子 for i in range(10): # 添加10个进程 pool_obj.apply_async(child_process_entry) pool_obj.close() # 停止添加进程了 pool_obj.join() # 等待进程都结束
运行该脚本后,输出如下:
$ python3 process_pool1.py # 运行脚本 子进程在运行 # 同时启动5个进程来完成任务 子进程在运行 子进程在运行 子进程在运行 子进程在运行 子进程结束 # 有进程结束工作了 子进程结束 子进程结束 子进程结束 子进程结束 子进程在运行 # 启动新的任务 子进程在运行 子进程在运行 子进程在运行 子进程在运行 子进程结束 子进程结束 子进程结束 子进程结束 子进程结束
可以发现,其一次启动 5 个进程,并在 5 个完成后才启动另外 5 个。也就是说其最多同时运行 5 个进程,并且仅当有进程结束后才将新的进程投入运行。
启动进程时可以指定参数 args,args 是一个元组,其各个元素对应到入口函数的各个参数。下面将前面的例子稍作修改,传入一个参数,用来标识该进程。具体代码如下:
import multiprocessing # 引入multiprocessing模块 import time, random # 引入time,random模块 def child_process_entry(): # 定义进程入口函数 print(u"子进程在运行") time.sleep(random.randint(1, 100)/10.0) # 休息随时间 print(u"子进程结束") pool_obj = multiprocessing.Pool(processes = 5) # 建立5个元素池子 for i in range(10): # 添加10个进程 pool_obj.apply_async(child_process_entry) pool_obj.close() # 停止添加进程了 pool_obj.join() # 等待进程都结束
运行该脚本,输出如下:
$ python3 process_pool1.py # 运行脚本 子进程在运行 # 同时启动5个进程来完成任务 子进程在运行 子进程在运行 子进程在运行 子进程在运行 子进程结束 # 有进程结束工作了 子进程在运行 # 投入一个新的进程 子进程结束 # 有进程结束工作了 子进程在运行 # 投入一个新的进程 子进程结束 # 有进程结束工作了 子进程在运行 # 投入一个新的进程 子进程结束 # 有进程结束工作了 子进程在运行 # 投入一个新的进程 子进程结束 # 有进程结束工作了 子进程在运行 # 投入一个新的进程 子进程结束 子进程结束 子进程结束 子进程结束 子进程结束
由于是多进程同时运行,所以一般子进程和主进程是同时运行的,也就是说它们是异步的,这也是 apply_async 中 async 的来源。
如果要是同步执行,那么主进程会等待子进程结束,所以子进程只能是一个一个地运行。这种同步运行的方式比较少见,因为这没有利用到并发操作的特性。但这里还是要演示一下同步运行的情况,将上面的代码稍作修改,将第9行从
pool_obj.apply_async(child_process_entry, args=(i, ))
修改为
pool_obj.apply(child_process_entry, args=(i, ))
其他部分不做任何修改,运行后的输出如下:
$ python3 process_pool3.py 子进程0在运行 # 第一个进程开始执行 子进程0结束 # 第一个进程执行完毕 子进程1在运行 子进程1结束 子进程2在运行 子进程2结束 子进程3在运行 子进程3结束 子进程4在运行 子进程4结束 子进程5在运行 子进程5结束 子进程6在运行 子进程6结束 子进程7在运行 子进程7结束 子进程8在运行 子进程8结束 子进程9在运行 子进程9结束
可以看到子进程是依次执行的,前一个执行完毕之后才执行下一个。
在前面章节我们也用到了进程之间的一些简单信息交换,如查询进程的运行状态、等待进程退出、得到进程的退出码等。但是在一些复杂的应用场景,则希望进程之间有更多的信息交换。下面介绍 multiprocessing 模块提供的进程之间通信的方式,包括管道、队列和锁。
管道有两头,一般一头给进程 A,一头给进程 B。如果进程 A 对管道进行写入操作,那么进程B就可以通过读操作看到写入的数据。而且管道是双向的,可以进程 A 写进程 B 读,也可以进程 B 写进程 A 读。
下面的例子演示了管道的用法。
import multiprocessing # 引入multiprocessing模块 def process_A(pipe): # 进程A的入口函数 print(u"进程A发送数据hello到B") # 发送数据hello到进程B pipe.send('hello') print(u"进程A等待进程B的输入") data = pipe.recv() # 结束进程B的数据 print(u'进程A收到B的数据%s' % data) print(u'进程A结束') def process_B(pipe): # 进程B的入口函数 print(u'进程B等待进程A的数据') data = pipe.recv() # 接收数据 print(u'进程B收到B的数据%s' % data) print(u'进程B发送hi到进程A') pipe.send('hi') # 发送数据 print(u'进程B结束') pipe = multiprocessing.Pipe() # 建立管道 # 创建进程A p1 = multiprocessing.Process(target=process_A, args=(pipe[0],)) # 创建进程B p2 = multiprocessing.Process(target=process_B, args=(pipe[1],)) p1.start() # 启动进程 p2.start() p1.join() # 等待进程结束 p2.join()
运行后的输出如下:
$ python3 pipe1.py 进程A发送数据hello到B 进程A等待进程B的输入 进程B等待进程A的数据 进程B收到B的数据hello 进程B发送hi到进程A 进程B结束 进程A收到B的数据hi 进程A结束
对于队列有两个操作,一个是写入数据,一个是读出数据。最简单的使用方式是让一个进程往队列中写入数据,让另外一个进程从队列中读出数据。
import multiprocessing import time def process_read(queue): # 读队列子进程 print(u"queue读出进程开始运行") data = queue.get() while(data != "quit"): # 一直读,直到读出了quit print("queue读出进程读出数据%s" % data) data = queue.get() print(u'queue读出进程退出') def process_write(queue): # 写队列子进程 print(u"queue写入进程开始运行") data = ['good', 'morning', 'everyone'] # 写入的数据 for w in data: print("queue写入进程写入数据%s" % w) data = queue.put(w) time.sleep(1) data = queue.put('quit') # 发送quit,让读子进程退出 print(u'queue写入进程退出') queue_obj = multiprocessing.Queue(3) # 创建两个子进程,一个读队列,一个写队列 p1 = multiprocessing.Process(target=process_read, args=(queue_obj,)) # Pass the other end of the pipe to process 2 p2 = multiprocessing.Process(target=process_write, args=(queue_obj,)) p1.start() # 启动进程 p2.start() p1.join() # 等待子进程退出 p2.join() queue_obj.close() # 销毁队列
运行后的输出如下:
$ python3 queue1.py queue读出进程开始运行 # 两个子进程启动了起来 queue写入进程开始运行 queue写入进程写入数据good # 写入good queue读出进程读出数据good # 读出good queue写入进程写入数据morning queue读出进程读出数据morning queue写入进程写入数据everyone # 写入everyone queue读出进程读出数据everyone # 读出everyone queue写入进程退出 # 子进程退出 queue读出进程退出
该对象主要提供了两个接口函数,一个是得到锁 acquire(),另外一个是释放锁 release()。下面的例子演示了 3 个进程,它们都往同一个文件写入数据。由于其每轮操作需要执行 3 次数据写入,而且要求在这 3 次操作之间其他的进程不能往文件写入数据。
可以使用锁来实现该功能,代码如下:
import multiprocessing import time def process_entry (lock, fd, id): # 进程入口函数 for x in range(30): # 30轮输出,每轮进行3次写操作 lock.acquire() # 得到锁 line = "%d: line 1, round: %d\n" % (id, x) time.sleep(0.1) fd.write(line) # 第1次写 fd.flush() line = "%d: line 2, round: %d\n" % (id, x) fd.write(line) # 第2次写 fd.flush() time.sleep(0.1) line = "%d: line 3, round: %d\n" % (id, x) fd.write(line) # 第3次写 fd.flush() lock.release() # 释放锁 time.sleep(0.1) if __name__ == "__main__": file_name = "shared_input2.txt" fd = open(file_name, "a+") lock = multiprocessing.Lock() # 创建锁 p1 = multiprocessing.Process(target=process_entry, args=(lock, fd, 1)) p2 = multiprocessing.Process(target=process_entry, args=(lock, fd, 2)) p3 = multiprocessing.Process(target=process_entry, args=(lock, fd, 3)) p1.start() # 启动进程 p2.start() p3.start() p1.join() # 等待子进程结束 p2.join() p3.join() fd.close() # 关闭文件 print(u"开始检查结果") # 开始检查结果 fd2 = open(file_name, "r") lines = fd2.readlines() fd2.close() line_num = len(lines) # 显示行数 print(u"总的行数: %d" % line_num) # 要求每连续的3行是同一个进程打印出来的 for x in range(int(line_num/3)): # 第1行和第2行不是同一个进程打印出来的 if lines[x*3][:2] != lines[x*3+1][:2]: print("line %d: Error" % x*3+1) # 发现错误,退出 sys.exit(1) # 第1行和第3行不是同一个进程打印出来的 if lines[x*3][:2] != lines[x*3+2][:2]: print("line %d: Error" % x*3+2) # 发现错误,退出 sys.exit(1) print(u"成功通过检查") # 所有行的检查通过
运行后的输出如下:
$ python lock2.py 开始检查结果 总的行数: 270 成功通过检查
也可以使用 with lock 语句,这样就不用再显式调用 acquire() 和 release() 了,只需将 acquire() 和 release() 之间的代码放到 with lock 块中即可。
下面的代码实现和前面一样的功能,但是使用 with lock 语句代替了 acquire() 和 release()。代码如下:
import multiprocessing import time def process_entry(lock, fd, id): for x in range(30): # 30轮操作,每轮3次写操作 with lock: # 每次写入3行,不能被打断 line = "%d: line 1, round: %d\n" % (id, x) time.sleep(0.1) fd.write(line) fd.flush() line = "%d: line 2, round: %d\n" % (id, x) fd.write(line) fd.flush() time.sleep(0.1) line = "%d: line 3, round: %d\n" % (id, x) fd.write(line) fd.flush() time.sleep(0.1) if __name__ == "__main__": file_name = "shared_input.txt" # 打开输出文件 fd = open(file_name, "a+") lock = multiprocessing.Lock() # 创建锁 p1 = multiprocessing.Process(target=process_entry, args=(lock, fd, 1)) p2 = multiprocessing.Process(target=process_entry, args=(lock, fd, 2)) p3 = multiprocessing.Process(target=process_entry, args=(lock, fd, 3)) p1.start() # 启动3个子进程 p2.start() p3.start() p1.join() # 等待子进程结束 p2.join() p3.join() fd.close() # 关闭文件 print(u"开始检查结果") # 检查结果 fd2 = open(file_name, "r") lines = fd2.readlines() fd2.close() line_num = len(lines) print(u"总的行数: %d" % line_num) for x in range(int(line_num/3)): # 如果第1行和第2行不是同一个进程打印 if lines[x*3][:2] != lines[x*3+1][:2]: print("line %d: Error" % x*3+1) # 发现错误,退出 sys.exit(1) # 如果第1行和第3行不是同一个进程打印 if lines[x*3][:2] != lines[x*3+2][:2]: print("line %d: Error" % x*3+2) # 发现错误,退出 sys.exit(1) print(u"成功通过检查") # 所有行检查通过
运行后的输出如下:
$ python lock1.py 开始检查结果 总的行数: 270 成功通过检查
本文链接:http://so.lmcjl.com/news/696/