序. multiprocessing
1. Process
创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),
import multiprocessingimport timedef worker(interval): n = 5 while n > 0: print("The time is {0}".format(time.ctime())) time.sleep(interval) n -= 1if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.start() print("p.pid:", p.pid) print("p.name:", p.name) print("p.is_alive:", p.is_alive())
1 2 3 4 5 6 7 8 | p.pid: 8736 p.name: Process -1 p.is_alive: True The time is Tue Apr 21 20: 55: 12 2015 The time is Tue Apr 21 20: 55: 15 2015 The time is Tue Apr 21 20: 55: 18 2015 The time is Tue Apr 21 20: 55: 21 2015 The time is Tue Apr 21 20: 55: 24 2015 |
import multiprocessingimport timedef worker_1(interval): print("worker_1") time.sleep(interval) print("end worker_1")def worker_2(interval): print("worker_2") time.sleep(interval) print("end worker_2")def worker_3(interval): print("worker_3") time.sleep(interval) print("end worker_3")if __name__ == "__main__": p1 = multiprocessing.Process(target = worker_1, args = (2,)) p2 = multiprocessing.Process(target = worker_2, args = (3,)) p3 = multiprocessing.Process(target = worker_3, args = (4,)) p1.start() p2.start() p3.start() print("The number of CPU is:" + str(multiprocessing.cpu_count())) for p in multiprocessing.active_children(): print("child p.name:" + p.name + "\tp.id" + str(p.pid)) print("END!!!!!!!!!!!!!!!!!")
1 2 3 4 5 6 7 8 9 10 11 | The number of CPU is: 4 child p.name:Process -3 p.id 7992 child p.name:Process -2 p.id 4204 child p.name:Process -1 p.id 6380 END!!!!!!!!!!!!!!!!! worker_ 1 worker_ 3 worker_ 2 end worker_ 1 end worker_ 2 end worker_ 3 |
import multiprocessingimport timeclass ClockProcess(multiprocessing.Process): def __init__(self, interval): multiprocessing.Process.__init__(self) self.interval = interval def run(self): n = 5 while n > 0: print("the time is {0}".format(time.ctime())) time.sleep(self.interval) n -= 1if __name__ == '__main__': p = ClockProcess(3) p.start()
1 2 3 4 5 | the time is Tue Apr 21 20: 31: 30 2015 the time is Tue Apr 21 20: 31: 33 2015 the time is Tue Apr 21 20: 31: 36 2015 the time is Tue Apr 21 20: 31: 39 2015 the time is Tue Apr 21 20: 31: 42 2015 |
#1.4-1 不加daemon属性
import multiprocessingimport timedef worker(interval): print("work start:{0}".format(time.ctime())); time.sleep(interval) print("work end:{0}".format(time.ctime()));if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.start() print("end!")
1 2 3 | end! work start:Tue Apr 21 21: 29: 10 2015 work end:Tue Apr 21 21: 29: 13 2015 |
#1.4-2 加上daemon属性
import multiprocessingimport timedef worker(interval): print("work start:{0}".format(time.ctime())); time.sleep(interval) print("work end:{0}".format(time.ctime()));if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.daemon = True p.start() print("end!")
1 | end! |
#1.4-3 设置daemon执行完结束的方法
import multiprocessingimport timedef worker(interval): print("work start:{0}".format(time.ctime())); time.sleep(interval) print("work end:{0}".format(time.ctime()));if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.daemon = True p.start() p.join() print("end!")
1 2 3 | work start:Tue Apr 21 22: 16: 32 2015 work end:Tue Apr 21 22: 16: 35 2015 end! |
2. Lock
import multiprocessingimport sysdef worker_with(lock, f): with lock: fs = open(f, 'a+') n = 10 while n > 1: fs.write("Lockd acquired via with\n") n -= 1 fs.close() def worker_no_with(lock, f): lock.acquire() try: fs = open(f, 'a+') n = 10 while n > 1: fs.write("Lock acquired directly\n") n -= 1 fs.close() finally: lock.release() if __name__ == "__main__": lock = multiprocessing.Lock() f = "file.txt" w = multiprocessing.Process(target = worker_with, args=(lock, f)) nw = multiprocessing.Process(target = worker_no_with, args=(lock, f)) w.start() nw.start() print("end")
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | Lockd acquired via with Lockd acquired via with Lockd acquired via with Lockd acquired via with Lockd acquired via with Lockd acquired via with Lockd acquired via with Lockd acquired via with Lockd acquired via with Lock acquired directly Lock acquired directly Lock acquired directly Lock acquired directly Lock acquired directly Lock acquired directly Lock acquired directly Lock acquired directly Lock acquired directly |
3. Semaphore
import multiprocessingimport timedef worker(s, i): s.acquire() print(multiprocessing.current_process().name + "acquire"); time.sleep(i) print(multiprocessing.current_process().name + "release\n"); s.release()if __name__ == "__main__": s = multiprocessing.Semaphore(2) for i in range(5): p = multiprocessing.Process(target = worker, args=(s, i*2)) p.start()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | Process -1 acquire Process -1 release Process -2 acquire Process -3 acquire Process -2 release Process -5 acquire Process -3 release Process -4 acquire Process -5 release Process -4 release |
4. Event
import multiprocessingimport timedef wait_for_event(e): print("wait_for_event: starting") e.wait() print("wairt_for_event: e.is_set()->" + str(e.is_set()))def wait_for_event_timeout(e, t): print("wait_for_event_timeout:starting") e.wait(t) print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))if __name__ == "__main__": e = multiprocessing.Event() w1 = multiprocessing.Process(name = "block", target = wait_for_event, args = (e,)) w2 = multiprocessing.Process(name = "non-block", target = wait_for_event_timeout, args = (e, 2)) w1.start() w2.start() time.sleep(3) e.set() print("main: event is set")
1 2 3 4 5 | wait_for_event: starting wait_for_event_timeout:starting wait_for_event_timeout:e.is_set->False main: event is set wairt_for_event: e.is_set()->True |
5. Queue
import multiprocessingdef writer_proc(q): try: q.put(1, block = False) except: pass def reader_proc(q): try: print(q.get(block = False) ) except: passif __name__ == "__main__": q = multiprocessing.Queue() writer = multiprocessing.Process(target=writer_proc, args=(q,)) writer.start() reader = multiprocessing.Process(target=reader_proc, args=(q,)) reader.start() reader.join() writer.join()
1 | 1 |
6. Pipe
import multiprocessingimport timedef proc1(pipe): while True: for i in xrange(10000): print("send: %s" %(i)) pipe.send(i) time.sleep(1)def proc2(pipe): while True: print("proc2 rev:", pipe.recv()) time.sleep(1)def proc3(pipe): while True: print("PROC3 rev:", pipe.recv()) time.sleep(1)if __name__ == "__main__": pipe = multiprocessing.Pipe() p1 = multiprocessing.Process(target=proc1, args=(pipe[0],)) p2 = multiprocessing.Process(target=proc2, args=(pipe[1],)) #p3 = multiprocessing.Process(target=proc3, args=(pipe[1],)) p1.start() p2.start() #p3.start() p1.join() p2.join() #p3.join()
7. Pool
#coding: utf-8import multiprocessingimport timedef func(msg): print("msg:", msg) time.sleep(3) print "end"if __name__ == "__main__": pool = multiprocessing.Pool(processes = 3) for i in xrange(4): msg = "hello %d" %(i) pool.apply_async(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~") pool.close() pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束 print("Sub-process(es) done.")
1 2 3 4 5 6 7 8 9 10 | mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0 msg: hello 1 msg: hello 2 end msg: hello 3 end end end Sub-process(es) done. |
- apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解区别,看例1例2结果区别)
- close() 关闭pool,使其不在接受新的任务。
- terminate() 结束工作进程,不在处理未完成的任务。
- join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。
执行说明:创建一个进程池pool,并设定进程的数量为3,xrange(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3,所以会出现输出“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()处等待各个进程的结束。
#coding: utf-8import multiprocessingimport timedef func(msg): print("msg:", msg) time.sleep(3) print("end")if __name__ == "__main__": pool = multiprocessing.Pool(processes = 3) for i in xrange(4): msg = "hello %d" %(i) pool.apply(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~") pool.close() pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束 print("Sub-process(es) done.")
1 2 3 4 5 6 7 8 9 10 | msg: hello 0 end msg: hello 1 end msg: hello 2 end msg: hello 3 end Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ Sub-process(es) done. |
import multiprocessingimport timedef func(msg): print("msg:", msg) time.sleep(3) print("end") return "done" + msgif __name__ == "__main__": pool = multiprocessing.Pool(processes=4) result = [] for i in xrange(3): msg = "hello %d" %(i) result.append(pool.apply_async(func, (msg, ))) pool.close() pool.join() for res in result: print(":::", res.get()) print(Sub-process(es) done.")
1 2 3 4 5 6 7 8 9 10 | msg: hello 0 msg: hello 1 msg: hello 2 end end end ::: donehello 0 ::: donehello 1 ::: donehello 2 Sub-process(es) done. |
#coding: utf-8import multiprocessingimport os, time, randomdef Lee(): print("\nRun task Lee-%s" %(os.getpid())) #os.getpid()获取当前的进程的ID start = time.time() time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数 end = time.time() print('Task Lee, runs %0.2f seconds.' %(end - start))def Marlon(): print("\nRun task Marlon-%s" %(os.getpid())) start = time.time() time.sleep(random.random() * 40) end=time.time() print('Task Marlon runs %0.2f seconds.' %(end - start))def Allen(): print("\nRun task Allen-%s" %(os.getpid())) start = time.time() time.sleep(random.random() * 30) end = time.time() print('Task Allen runs %0.2f seconds.' %(end - start))def Frank(): print("\nRun task Frank-%s" %(os.getpid())) start = time.time() time.sleep(random.random() * 20) end = time.time() print('Task Frank runs %0.2f seconds.' %(end - start)) if __name__=='__main__': function_list= [Lee, Marlon, Allen, Frank] print("parent process %s" %(os.getpid())) pool=multiprocessing.Pool(4) for func in function_list: pool.apply_async(func) #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中 print('Waiting for all subprocesses done...') pool.close() pool.join() #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束 print('All subprocesses done.')
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | parent process 7704 Waiting for all subprocesses done... Run task Lee -6948 Run task Marlon -2896 Run task Allen -7304 Run task Frank -3052 Task Lee, runs 1.59 seconds. Task Marlon runs 8.48 seconds. Task Frank runs 15.68 seconds. Task Allen runs 18.08 seconds. All subprocesses done. |