




版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領
文檔簡介
第18章并行計算:線程、進程和協程本章要點:并行處理概述
基于線程的并發處理
基于進程的并行計算
基于線程池/進程池的并發/并行任務
基于asyncio的異步IO編程資源下載提示2課件等資源:掃描封底的“課件下載”二維碼,在公眾號“書圈”中下載。素材(源碼):掃描本書目錄上方的二維碼下載。講解視頻:掃描封底刮刮卡中的二維碼,再掃描書中相應章節中(位于每章最前)的二維碼,作為開源的補充閱讀和學習資源。
案例研究:掃描封底刮刮卡中的二維碼,再掃描書中相應章節中(位于每章最后)的二維碼,可以在線學習。每章練習題:掃描封底刮刮卡中的二維碼,再掃描每章習題部分的二維碼,下載本章練習題電子版。
題庫平臺:教師登錄網站(),聯系客服開通教師權限并行處理概述(1)進程是操作系統中正在執行的不同應用程序的一個實例線程是進程中的一個實體,是被操作系統獨立調度和分派處理器時間的基本單位線程的優缺點并發處理,因而特別適合需要同時執行多個操作的場合解決用戶響應性能和多任務的問題引入了資源共享和同步等問題并行處理概述(2)協程(Coroutine)又稱微線程、纖程,協程不是進程或線程,其執行過程更類似于函數調用Python的asyncio模塊實現的異步IO編程框架中,協程是對使用async關鍵字定義的異步函數的調用一個進程包含多個線程,同樣,一個程序可以包含多個協程。多個線程相對獨立,線程有自己的上下文,切換受系統控制;同樣,多個協程也相對獨立,協程也有自己的上下文,但是其切換由程序自己控制。協程適合于異步IO編程的場合,能有效提高IO的吞吐效率。Python語言與并行處理相關模塊Python標準庫中包括下列與并行處理相關的模塊。_thread和_dummy_thread模塊:底層低級線程API。threading模塊:線程及其同步處理。multiprocessing模塊:多進程處理和進程池。concurrent.futures模塊:啟動并行任務。queue模塊:線程安全隊列,用于線程或進程間信息傳遞。asyncio模塊:異步IO、事件循環、協程和任務處理。基于線程的并發處理threading模塊概述Python標準庫模塊threading提供了與線程相關的操作:創建線程、啟動線程、線程同步通過創建threading.Thread對象實例,可以創建線程;調用Thread對象的start()方法,可啟動線程。也可以創建Thread的派生類,重寫run方法,然后創建其對象實例來創建線程。通過線程對象的daemon屬性,可設置線程為用戶線程或daemon線程當多個線程調用單個對象的屬性和方法時,一個線程可能會中斷另一個線程正在執行的任務,使該對象處于一種無效狀態,因此必須針對這些調用進行同步處理Python語言提供了多種線程同步處理解決方案:Lock/RLock對象、Condition對象、Semaphore對象、Event對象、Barrier對象使用Thread對象創建線程通過創建Thread的對象可以創建線程:Thread(target=None,name=None,args=(),kwargs={})#構造函數通過調用Thread對象的start方法可以啟動線程。Thread對象的常用方法如下。t.start():啟動線程。t.is_alive():判斷線程是否活動。:屬性:線程名。對應于老版本的方法getname()和setname()。t.id:返回線程標識符。threading模塊包含以下若干實用函數。threading.get_ident():返回當前線程的標識符。threading.current_thread():返回當前線程。threading.active_count():返回活動的線程數目。threading.enumerate():返回活動線程的列表。【例18.1】直接使用Thread對象創建和啟動新線程importthreading,time,randomdeftimer(interval):foriinrange(3):time.sleep(random.choice(range(interval)))#隨機睡眠interval秒thread_id=threading.get_ident()#獲取當前線程標識符print('Thread:{0}Time:{1}'.format(thread_id,time.ctime()))if__name__=='__main__':t1=threading.Thread(target=timer,args=(5,))#創建線程t2=threading.Thread(target=timer,args=(5,))#創建線程t1.start();t2.start()#啟動線程自定義派生于Thread的對象通過聲明Thread的派生類,并重寫對象的run方法,然后創建其對象實例,可創建線程。通過對象的start方法,可啟動線程,并自動執行對象的run方法【例18.2】通過聲明Thread派生類,以創建和啟動新線程(td_MyThread.py)importthreading,time,randomclassMyThread(threading.Thread):#繼承threading.Threaddef__init__(self,interval):#構造函數threading.Thread.__init__(self)#調用父類構造函數erval=interval#對象屬性defrun(self):#定義run方法foriinrange(5):time.sleep(random.choice(range(erval)))#隨機睡眠interval秒thread_id=threading.get_ident()#獲取當前線程標識符print('Thread:{0}Time:{1}\n'.format(thread_id,time.ctime()))if__name__=='__main__':t1=MyThread(5)#創建對象t2=MyThread(5)#創建對象t1.start();t2.start()#啟動線程線程加入join()所謂線程加入(t.join()),即讓包含代碼的線程(tc,即當前線程)“加入”到另外一個線程(t)的尾部。在線程(t)執行完畢之前,線程(tc)不能執行【例18.3】線程join示例(td_join.py)importthreading,time,randomclassMyThread(threading.Thread):#繼承threading.Threaddef__init__(self):#構造函數threading.Thread.__init__(self)#調用父類構造函數defrun(self):#定義run方法foriinrange(5):time.sleep(1)#睡眠1秒t=threading.current_thread()#獲取當前線程print('{0}at{1}\n'.format(,time.ctime()))#打印線程名、當前時間print('線程t1結束')deftest():t1=MyThread()#創建線程對象='t1'#設置線程名稱t1.start()#啟動線程print('主線程開始等待線程(t1)2s');t1.join(2)print('主線程等待線程(t1)2s結束')print('主線程開始等待線程結束');t1.join()print('主線程結束')if__name__=='__main__':test()用戶線程和daemon線程線程可以分為用戶線程和daemon線程。用戶線程(非daemon線程)是通常意義的線程,應用程序運行即為主線程,在主線程中可以創建和啟動新線程,默認為用戶線程。只有當所有的非daemon的用戶線程(包括主線程)結束后,應用程序終止daemon線程,又稱守護線程,其優先級是最低的,一般為其它的線程提供服務。通常,daemon線程體是一個無限循環。如果所有的非daemon線程都結束了,則daemon線程自動就會終止importthreading,timeclassMyThread(threading.Thread):#繼承threading.Threaddef__init__(self,interval):#構造函數threading.Thread.__init__(self)#調用父類構造函數erval=interval#對象屬性defrun(self):#定義run方法t=threading.current_thread()#獲取當前線程print('線程'++'開始')time.sleep(erval)#延遲erval秒print('線程'++'結束')classMyThreadDaemon(threading.Thread):#繼承threading.Threaddef__init__(self,interval):#構造函數threading.Thread.__init__(self)#調用父類構造函數erval=interval#對象屬性defrun(self):#定義run方法t=threading.current_thread()#獲取當前線程print('線程'++'開始')whileTrue:time.sleep(erval)#延遲erval秒print('daemon線程'++'正在運行')print('線程'++'結束')deftest():print('主線程開始')t1=MyThread(5)#創建線程對象t2=MyThreadDaemon(1)#創建線程對象='t1';='t2'#設置線程名稱t2.daemon=True#設置為daemont1.start()#啟動線程t2.start()print('主線程結束')if__name__=='__main__':test()【例18.4】用戶線程和Daemon線程示例Timer線程【例18.5】Timer線程示例(td_timer.py)importthreadingdeff():print('HelloTimer!')#創建定時器,1秒后運行globaltimertimer=threading.Timer(1,f)timer.start()timer=threading.Timer(1,f)#創建定時器,1秒后運行timer.start()#啟動定時器使用Python標準庫threading中的Timer線程(Thread的子類),可以很方便實現定時器功能Timer對象包含的主要方法如下:(1)Timer(interval,function,args=None,kwargs=None):構造函數。在指定時間interval后執行函數(2)start():啟動線程,即啟動計時器(3)cancel():取消計時器線程同步(1)基于原語鎖(Lock/RLock對象)的簡單同步【例18.6】使用lock語句同步代碼塊示例(lock.py)。創建工作線程,模擬銀行現金帳戶取款。多個線程同時執行取款操作時,如果不使用同步處理,會造成賬戶余額混亂;嘗試使用同步鎖對象Lock,以保證多個線程同時執行取款操作時,銀行現金帳戶取款的有效和一致【例18.6】使用lock語句同步代碼塊importthreading,time,randomclassAccount(threading.Thread):#繼承threading.Threadlock=threading.Lock()#創建鎖def__init__(self,amount):#構造函數threading.Thread.__init__(self)#調用父類構造函數Account.amount=amount#賬戶金額defrun(self):#定義run方法self.withdraw()#取款defwithdraw(self):Account.lock.acquire()#獲取鎖。注釋不使用同步處理t=threading.current_thread()a=random.choice(range(50,101))ifAccount.amount<a:print('{0}交易失敗。取款前余額:{1},取款額:{2}'.format(,Account.amount,a))Account.lock.release()return0#拒絕交易time.sleep(random.choice(range(5)))#隨機睡眠[0-5)秒prev=Account.amountAccount.amount-=a#取款print('{0}取款前余額:{1},取款額:{2},取款后額:{3}'.format(,prev,a,Account.amount))Account.lock.release()#釋放鎖。注釋不使用同步處理deftest():foriinrange(5):#創建5個線程對象并啟動Account(200).start()if__name__=='__main__':test()線程同步(2)基于條件變量(Condition對象)的同步和通信【例18.7】線程間通信示例(producer_consumer.py)。生產者/消費者模型,使用線程間通信,生產者生產一件、消費者消費一件,二者保持同步。未使用線程同步(把最后1行代碼改為test2()),則結果無法預料【例18.7】線程間通信示例(producer_consumer.py)(1)importthreading,time,randomclassContainer1():#基于同步和通信def__init__(self):#構造函數self.contents=0#容器內容self.available=False#容器內容self.cv=threading.Condition()#條件變量defput(self,value):#生產函數withself.cv:#使用條件變量同步ifself.available:#如果已經生產,則等待self.cv.wait()#等待self.contents=value#生產,設置內容t=threading.current_thread()print('{0}生產{1}'.format(,self.contents))self.available=True#設置容器狀態:已生產self.cv.notify()#通知等待的消費者defget(self):#消費函數withself.cv:#使用條件變量同步ifnotself.available:#如果已經生產,則等待self.cv.wait()#等待t=threading.current_thread()【例18.7】線程間通信示例(producer_consumer.py)(2)print('{0}消費{1}'.format(,self.contents))self.available=False#設置容器狀態:未生產self.cv.notify()#通知等待的生產者classContainer2():#無同步和通信def__init__(self):#構造函數self.contents=0#容器內容self.available=False#容器內容defput(self,value):#生產函數ifself.available:#如果已經生產passelse:self.contents=value#生產,設置內容t=threading.current_thread()print('{0}生產{1}'.format(,self.contents))self.available=True#設置容器狀態:已生產defget(self):#消費函數ifnotself.available:#如果已經生產,則等待passelse:【例18.7】線程間通信示例(producer_consumer.py)(3)else:self.contents=value#生產,設置內容t=threading.current_thread()print('{0}生產{1}'.format(,self.contents))self.available=True#設置容器狀態:已生產defget(self):#消費函數ifnotself.available:#如果已經生產,則等待passelse:t=threading.current_thread()print('{0}消費{1}'.format(,self.contents))self.available=False#設置容器狀態:未生產classProducer(threading.Thread):#生產者類def__init__(self,container):#構造函數threading.Thread.__init__(self)#調用父類構造函數self.container=container#容器defrun(self):#定義run方法foriinrange(1,6):time.sleep(random.choice(range(5)))#隨機睡眠[0-5)秒self.container.put(i)#生產【例18.7】線程間通信示例(producer_consumer.py)(4)classConsumer(threading.Thread):#消費者類def__init__(self,container):#構造函數threading.Thread.__init__(self)#調用父類構造函數self.container=container#容器defrun(self):#定義run方法foriinrange(1,6):time.sleep(random.choice(range(5)))#隨機睡眠[0-5)秒self.container.get()#消費deftest1():print('基本同步和通信的生產者消費者模型:')container=Container1()#創建容器Producer(container).start()#創建消費者線程并啟動Consumer(container).start()#創建消費者線程并啟動deftest2():print('無同步和通信的生產者消費者模型:')container=Container2()#創建容器Producer(container).start()#創建消費者線程并啟動Consumer(container).start()#創建消費者線程并啟動if__name__=='__main__':test1()基于queue模塊中隊列的同步使用Python標準模塊queue提供了適用于多線程編程的先進先出的數據結構(即隊列),用來在生產者和消費者線程之間的信息傳遞。使用queue模塊中的線程安全的隊列,可以快捷實現生產者和消費者模型Queue模塊中包含三種線程安全的隊列:Queue、LifoQueue和PriorityQueue。以Queue為例,其主要方法包括:(1)Queue(maxsize=0):構造函數,構造指定大小的隊列。默認不限定大小(2)put(item,block=True,timeout=None):向隊列中添加一個項。默認阻塞,即隊列滿的時候,程序阻塞等待(3)get(block=True,timeout=None):從隊列中拿出一個項。默認阻塞,即隊列為空的時候,程序阻塞等待【例18.8】基于queue.Queue的生產者和消費者模型importtimeimportqueueimportthreadingq=queue.Queue(10)#創建一個大小為10的隊列defproductor(i):whileTrue:time.sleep(1)#休眠1秒鐘,即每秒鐘做一個包子q.put("廚師{}做的包子!".format(i))#如果隊列滿,則等待defconsumer(j):whileTrue:print("顧客{}吃了一個{}".format(j,q.get()))#如果隊列空,則等待time.sleep(1)#休眠1秒鐘,即每秒鐘吃一個包子foriinrange(3):#3個廚師不停做包子,t=threading.Thread(target=productor,args=(i,))t.start()forkinrange(10):#10個顧客等待吃包子v=threading.Thread(target=consumer,args=(k,))v.start()基于Event的同步和通信threading.Event是線程之間的通信機制之一:Event對象管理一個標志(flag),默認為FalseEvent相當于紅綠燈信號,可用于主線程控制其他線程的執行。當flag為False時,其他的線程調用e.wait()阻塞等待這個信號;當設置flag為True時,等待的線程解除阻塞繼續執行Event對象主要包括下列方法:(1)wait([timeout]):阻塞等待,直到Event對象的flag為True或超時(2)set():將flag設置為True(3)clear():將flag設置為False(4)isSet():判斷flag是否為True【例18.9】基于Event的線程通信importthreadingimportrandomdeff(i,e):e.wait()#檢測Event的標志,如果是False則阻塞print("線程{}的隨機結果為{}".format(i,random.randrange(1,100)))if__name__=='__main__':event=threading.Event()#創建事件對象,默認標志為Falseforiinrange(3):#創建3個線程并運行,默認阻塞等待Eventt=threading.Thread(target=f,args=(i,event))t.start()ready=input('請輸入1開始繼續執行阻塞的線程:')ifready=="1":event.set()#設置Event的flag為True基于進程的并行計算multiprocessing模塊概述Python標準庫模塊multiprocessing提供了與進程相關的操作:創建進程、啟動進程、進程同步等模塊multiprocessing還提供進程池和線程池創建和使用進程multiprocessing模塊包含以下若干實用函數。cpu_count():可用的CPU核數量。current_process():返回當前進程。active_children():活動的子進程。log_to_stderr():函數可設置輸出日志信息到標準錯誤輸出(默認為控制臺)【例18.10】使用Process對象創建和啟動新進程importtime,randomimportmultiprocessingasmpdeftimer(interval):foriinrange(3):time.sleep(random.choice(range(interval)))#隨機睡眠interval秒pid=mp.current_process().pid#獲取當前進程IDprint('Process:{0}Time:{1}'.format(pid,time.ctime()))if__name__=='__main__':p1=mp.Process(target=timer,args=(5,))#創建進程p2=mp.Process(target=timer,args=(5,))#創建進程p1.start();p2.start()#啟動線程p1.join();p2.join()進程的數據共享模塊multiprocessing為進程間通信提供了兩種方法:Queue和Pipe模塊multiprocessing中的Queue類似于queue.Queue(參見18.2.9),為進程間通信提供了一個線程和進程安全的隊列模塊multiprocessing中的Pipe()返回一個管道(包括兩個連接對象),兩個進程可以分別連接到不同的端的連接對象,然后通過其send()方法發送數據或者通過recv()方法接收數據【例18.11】基于進程和模塊multiprocessing中的Queue隊列的生產者和消費者模型(mp_queue.py)importtimeimportmultiprocessingasmpdefproductor(i,q):whileTrue:time.sleep(1)#休眠1秒鐘,即每秒鐘做一個包子q.put("廚師{}做的包子!".format(i))#如果隊列滿,則等待defconsumer(j,q):whileTrue:print("顧客{}吃了一個{}".format(j,q.get()))#如果隊列空,則等待time.sleep(1)#休眠1秒鐘,即每秒鐘吃一個包子if__name__=='__main__':q=mp.Queue(10)#創建一個大小為10的隊列foriinrange(3):#3個廚師不停做包子,p=mp.Process(target=productor,args=(i,q))p.start()forkinrange(10):#10個顧客等待吃包子p=mp.Process(target=consumer,args=(k,q))p.start()【例18.12】基于模塊multiprocessing中的Pipe的進程間通信importmultiprocessingasmpimporttime,random,itertoolsdefconsumer(conn):#從管道讀取數據whileTrue:try:item=conn.recv()time.sleep(random.randrange(2))#隨機休眠,代表處理過程print("consume:{}".format(item))exceptEOFError:breakdefproducer(conn):#生產項目并將其發送到連接的管道上foriinitertools.count(1):#從1開始無限循環time.sleep(random.randrange(2))#隨機休眠,代表處理過程conn.send(i)print("produce:{}".format(i))if__name__=="__main__":#創建管道,返回兩個連接對象的元組conn_out,conn_in=mp.Pipe()#創建并啟動生產者進程,傳入參數管道一端的連接對象p_producer=mp.Process(target=producer,args=(conn_out,))p_producer.start()#創建并啟動消費者進程,傳入參數管道另一端的連接對象p_consumer=mp.Process(target=consumer,args=(conn_in,))p_consumer.start()#加入進程,等待完成p_producer.join();p_consumer.join()進程池(Pool)使用Python的標準庫模塊multiprocessing中的Pool類可創建進程池。其大致步驟如下:(1)使用構造函數Pool(processes,initializer,initargs)創建一個進程池對象。這三個均為可選參數。其中processes為進程池的進程數量,默認為CPU的核數量;initializer和initargs為啟動任務進程時執行的初始化函數及其參數(2)調用進程池對象的方法執行任務,返回結果收集為一個列表。包括apply_async、apply、map_async、map等。其中apply_async和map_async是異步非阻塞模式,即啟動進程函數之后會繼續執行后續的代碼不用等待進程函數返回。進程池的map()方法與內置的map()函數一樣,把函數應用于可迭代對象的每一個元素(3)等待任務進程完成。調用join()方法加入進程池,等待其完成。也可以調用close()關閉進程池,不再加入新的任務(注:Pool對象支持with上下文操作,自動調用close()方法);或者調用terminate()直接終止進程池【例18.13】進程池的使用(mp_pool.py)frommultiprocessingimportPool,TimeoutErrorimporttimeimportosdeff(x):returnx*x#返回x的平方if__name__=='__main__':#創建四個進程的進程池,并調用其對象方法并行執行各任務withPool(processes=4)aspool:#使用進程池對象map函數,并行計算并返回結果res1=pool.map(f,range(10))print("pool.map的結果:{}".format(res1))#使用進程池對象的apply_async函數,異步執行一次任務res2=pool.apply_async(f,(20,))#異步求解f(20),僅使用一個進程print(res2.get(timeout=1))#輸出結果:400res3=pool.apply_async(os.getpid,())#異步執行os.getpid(),僅使用一個進程print(res3.get(timeout=1))#輸出執行任務的進程的PIDres4=pool.apply_async(time.sleep,(10,))#異步睡眠10秒鐘try:print(res4.get(timeout=1))#嘗試獲得結果,等待超時為1秒鐘exceptTimeoutError:print("結果超時!")#使用列表解析式,可能使用多個進程res5=[pool.apply_async(os.getpid,())foriinrange(5)]print([res.get(timeout=1)forresinres5])print("在With語句中,進程池可用")print("在With語句之外,進程池自動關閉,不再可用")基于線程池/進程池的并發/并行任務模塊concurrent.futures概述標準庫提供了concurrent.futures模塊實現了對threading和multiprocessing的進一步抽象,提供了編寫線程池(進程池)的支持。包concurrent意指并發,而futures意指將在未來完成的操作concurrent.futures模塊包含兩個主要類和實用函數:(1)Executor:表示任務執行器。它是抽象類,可以使用其子類ThreadPoolExecutor(線程池任務執行器)或者ProcessPoolExecutor(進程池任務執行器)創建任務執行器對象(2)Future:表示將執行的任務ThreadPoolExecutor(線程池任務執行器)是Executor的派生類,用于使用一個線程池異步執行任務【例18.14】使用ThreadPoolExecutor并發爬取網頁(future_tpe_get_pages.py)importconcurrent.futuresascfimporttime,urllib.requestdefload_page(url):withurllib.request.urlopen(url,timeout=60)asconn:return('{}主頁大小:{}字節'.format(url,len(conn.read())))if__name__=='__main__':URLS=['','/','/']#傳統串行方法start_time=time.time()forurlinURLS:print(load_page(url))end_time=time.time()print("串行處理消耗時間:{}".format(end_time-start_time))#使用ThreadPoolExecutor并發處理start_time=time.time()executor=cf.ThreadPoolExecutor()wait_for=[executor.submit(load_page,url)forurlinURLS]forfincf.as_completed(wait_for):#迭代完成的任務,輸出其結果print(f.result())end_time=time.time()print("并發處理消耗時間:{}".format(end_time-start_time))使用ProcessPoolExecutor并發執行任務ProcessPoolExecutor(進程池任務執行器)是Executor的派生類,用于使用一個線程池異步執行任務。ProcessPoolExecutor基于multiprocessing模塊,因而避免了CPython的GIL限制,從而適用于計算密集的任務【例18.15】使用ProcessPoolExecutor求解最大公約數(future_ppe_gcd.py)…tobecontinuedimporttimeimportconcurrent.futuresascfdefgcd(pair):#求最大公約數a,b=pairlow=min(a,b)foriinrange(low,0,-1):ifa%i==0andb%i==0:returniif__name__=='__main__':#測試數據TEST_DATA=[(11880774,83664910),(13961044,17644234),(10112000,13380625)]#傳統串行方法start_time=time.time()res1=list(map(gcd,TEST_DATA))end_time=time.time()print("串行處理結果:{},消耗時間:{}".format(res1,end_time-start_time))#使用ProcessPoolExecutor并行處理start_time=time.time()pool=cf.ProcessPoolExecutor(max_workers=4)res2=list(pool.map(gcd,TEST_DATA))end_time=time.time()print("并行處理結果:{},消耗時間:{}".format(res2,end_time-start_time))【例18.15】使用ProcessPoolExecutor求解最大公約數(future_ppe_gcd.py)基于asyncio的異步IO編程異步IO(AsynchronousIO)是指程序發起一個IO操作(阻塞等待)后,不用等IO操作結束,可以繼續其它操作;做其他事情,當IO操作結束時,會得到通知,然后繼續執行。異步IO編程是實現并發的一種方式,適用于IO密集型任務Python標準庫模塊asyncio提供了一個異步編程框架,主要包括下列部分:(1)事件循環(eventloop)(2)協程(coroutine)(3)任務(Task)和Future(將執行的任務)創建協程(coroutine)對象通過async關鍵字定義一個異步函數,調用異步函數返回一個協程(coroutine)對象。協程也是一種對象,協程不能直接運行,需要把協程加入到事件循環中,由后者在適當的時候調用協程使用asyncio.get_event_loop()方法可以創建一個事件循環對象,然后使用其run_until_complete()方法將協程注冊到事件循環在異步函數中,可以使用await關鍵字,針對耗時的操作(例如網絡請求、文件讀取等IO操作)進行掛起【例18.16】創建協程(coroutine)對象示例importasyncio,timeasyncdefdo_some_work(n):#使用async關鍵字定義異步函數print('等待:{}秒'.format(n))awaitasyncio.sleep(n)#休眠一段時間return'{}秒后返回結束運行'.format(n)start_time=time.time()#開始時間coro=do_some_work(2)loop=asyncio.get_event_loop()loop.run_until_complete(coro)print('運行時間:',time.time()-start_time)創建任務(Task)對象任務(Task)對象用于封裝協程對象,保存了協程運行后的狀態,用于未來獲取協程的結果可以使用asyncio.ensure_future(coroutine)創建一個任務對象,也可以使用事件循環對象的create_task(coroutine)方法創建任務使用run_until_complete()方法將任務注冊到事件循環。同時注冊多個任務的列表可以使用run_until_complete(asyncio.wait(tasks)),注冊多個任務可以使用run_until_complete(asyncio.gather(*tasks))【例18.17】創建任務對象示例importasyncio,timeasyncdefdo_some_work(i,n):#使用async關鍵字定義異步函數print('任務{}等待:{}秒'.format(i,n))awaitasyncio.sleep(n)#休眠一段時間return'任務{}在{}秒后返回結束運行'.format(i,n)start_time=time.time()#開始時間tasks=[asyncio.ensure_future(do_some_work(1,2)),asyncio.ensure_future(do_some_work(2,1)),asyncio.ensure_future(do_some_work(3,3))]loop=asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))fortaskintasks:print('任務執行結果:',task.result())print('運行時間:',time.time()-start_time)應用舉例使用Pool并行計算查找素數比較常規的串行處理(結果寫入prime1.txt)和基于進程池Pool的并行處理(結果寫入prime2.txt)的時間消耗【例18.18】并行查找小于n的所有素數(1)importmathimporttimeimportmultiprocessingdefisprime(n):"""判斷n是否為素數,如果是,返回n,否則返回0"""ifn<2:return0ifn==2:returnnk=int(math.ceil(math.sqrt(n)))i=2whilei<=k:ifn%i==0:return0i+=1returnnif__name__=="__main__":#測試數據test_data=range(10**6)#串行處理測試start_time=time.time()#結束時間withopen("prime1.txt","w")asoutf:fornumintest_data:r=isprime(num)ifr>0:outf.writelines("{}\n".format(num))end_time=time.time()print("串行處理消耗時間:{}".format(end_time-start_time))#并行處理測試start_time=time.time()#開始時間pool=multiprocessing.Pool(4)resultList=pool.map(isprime,test_data)pool.close()pool.join()withopen("prime2.txt","w")asoutf:forrinresultList:ifr>0:outf.writelines("{}\n".format(r))end_time=time.time()#結束時間print("并行處理消耗時間:{}".format(end_time-start_time))【例18.18】并行查找小于n的所有素數(2)使用ProcessPoolExecutor并行判斷素數比較常規的串行處理和基于ProcessPoolExecutor的并行處理的時間消耗【例18.19】使用ProcessPoolExecutor并行判斷素數(future_ppe_prime.py)(1)【例18.19】使用ProcessPoolExecutor并行判斷素數(future_ppe_prime.py)(2)importconcurrent.futuresascfimportmath,timedefis_prime(n):ifn<2:returnFalseifn==2:returnTrueifn%2==0:returnFalsesqrt_n=int(math.floor(math.sqrt(n)))foriinrange(3,sqrt_n+1,2):ifn%i==0:returnFalsereturnTrueif__name__=="__main__":#測試數據test_data=[112272535095293,112272535095293,115280095190773,1099726899285419]#串行處理測試start_time=time.time()#結束時間fornumintest_data:print('{}是素數否:{}'.format(num,is_prime(num)))end_time=time.time()print("串行處理消耗時間:{}".format(end_time-start_time))#并行處理測試start_time=time.time()#開始時間withcf.ProcessPoolExecutor()asexecutor:primes=executor.map(is_prime,test_data)fornumber,primeinzip(test_data,primes):print('{}是素數否:{}'.format(number,prime))end_time=time.time()print("并行處理消耗時間:{}".format(end_time-start_time))【例18.20】使用ThreadPoolExecutor批量下載網頁內容(future_tpe_download.py)(1)importconcurrent.futuresimporturllib.requestimporttimedefload_url(url,ti
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
- 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
- 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 公司提成策劃方案(3篇)
- 推門聽課活動方案(3篇)
- 醫院食堂人群管理制度
- 室內小房改造方案(3篇)
- 停水設備檢修方案(3篇)
- 醫院設備故障管理制度
- 建安企業倉儲管理制度
- 關于餐廳衛生管理制度
- 物業地面改造方案(3篇)
- 危險崗位應急管理制度
- 安全生產事故案例分析
- 2025中煤電力有限公司總部及所屬企業招聘筆試參考題庫附帶答案詳解
- 廣西壯族自治區2025屆高三下學期一模英語試題(解析版)
- 育兒嫂簽合同協議
- 5G電力虛擬專網網絡安全白皮書2025
- 書法中考試題及答案
- 《學前兒童社會教育活動指導》形考測試題+答案
- 充電樁基本知識課件
- 電解鋁廠項目施工組織設計
- 中職電子類面試題及答案
- 作風建設學習教育讀書班交流發言提綱
評論
0/150
提交評論