MapReduce:超大機群上的簡單數據處理_第1頁
MapReduce:超大機群上的簡單數據處理_第2頁
MapReduce:超大機群上的簡單數據處理_第3頁
MapReduce:超大機群上的簡單數據處理_第4頁
MapReduce:超大機群上的簡單數據處理_第5頁
已閱讀5頁,還剩17頁未讀, 繼續免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

1、MapReduce: 超大機群上的簡單數據處理摘要MapReduce是一個編程模型,和處理,產生大數據集的相關實現。用戶指定 一個map函數處理一個key/value對,從而產生中間的key/value對集。然后再指 定一個reduce函數合并所有的具有相同中間 key的中間value。下面將列舉許多 可以用這個模型來表示的現實世界的工作。以這種方式寫的程序能自動的在大規模的普通機器上實現并行化。 這個運行時系統關心這些細節:分割輸入數據,在機群上的調度,機器的錯誤處理,管理機器之間必要的通信。這樣就可以讓那些沒有并行分布式處理系統經驗的程序員利用大量分布式系統的資源。我們的MapReduce

2、 實現運行在規??梢造`活調整的由普通機器組成的機群上,一個典型的MapReduce計算處理幾千臺機器上的以 TB計算的數據。程序 員發現這個系統非常好用 : 已經實現了數以百計的 MapReduce 程序,每天在 Google的機群上都有1000多個MapReduce程序在執行。1. 介紹在過去的 5 年里,作者和 Google 的許多人已經實現了數以百計的為專門目的而寫的計算來處理大量的原始數據, 比如, 爬行的文檔, Web 請求日志, 等等。為了計算各種類型的派生數據, 比如, 倒排索引, Web 文檔的圖結構的各種表示,每個主機上爬行的頁面數量的概要, 每天被請求數量最多的集合, 等等

3、。 很多這樣的計算在概念上很容易理解。 然而, 輸入的數據量很大, 并且只有計算被分布在成百上千的機器上才能在可以接受的時間內完成。怎樣并行計算,分發數據,處理錯誤, 所有這些問題綜合在一起, 使得原本很簡介的計算, 因為要大量的復 雜代碼來處理這些問題,而變得讓人難以處理。作為對這個復雜性的回應, 我們設計一個新的抽象模型, 它讓我們表示我們將要執行的簡單計算,而隱藏并行化,容錯,數據分布,負載均衡的那些雜亂的細節, 在一個庫里。 我們的抽象模型的靈感來自 Lisp 和許多其他函數語言的 map 和 reduce 的原始表示。我們認識到我們的許多計算都包含這樣的操作:在我們輸 入數據的邏輯記

4、錄上應用 map操作,來計算出一個中間key/value對集,在所有 具有相同key的value上應用reduce操作,來適當的合并派生的數據。功能模型 的使用,再結合用戶指定的 map和reduce操作,讓我們可以非常容易的實現大 規模并行化計算,和使用再次執行作為初級機制來實現容錯。這個工作的主要貢獻是通過簡單有力的接口來實現自動的并行化和大規模分布式計算,結合這個接口的實現來在大量普通的 PC 機上實現高性能計算。第二部分描述基本的編程模型, 并且給一些例子。 第三部分描述符合我們的基于集群的計算環境的 MapReduce 的接口的實現。第四部分描述我們覺得編程模型中一些有用的技巧。第五

5、部分對于各種不同的任務,測量我們實現的性能。第六部分探究在Google內部使用MapReduce作為基礎來重寫我們的索引系統產 品。第七部分討論相關的,和未來的工作。2. 編程模型計算利用一個輸入 key/value 對集,來產生一個輸出 key/value 對集。MapReduce庫的用戶用兩個函數表達這個計算:map和reduce用戶自定義的map函數,接受一個輸入對,然后產生一個中間 key/value對 集。MapReduce庫把所有具有相同中間key I的中間value聚合在一起,然后把 它們傳遞給reduce 函數。用戶自定義的reduce函數,接受一個中間key I和相關的一個v

6、alue集。它 合并這些value,形成一個比較小的value集。一般的,每次reduce調用只產生0 或1個輸出value。通過一個迭代器把中間value提供給用戶自定義的reduce函 數。這樣可以使我們根據內存來控制 value 列表的大小。2.1 實例考慮這個問題:計算在一個大的文檔集合中每個詞出現的次數。用戶將寫和下面類似的偽代碼:map(String key, String value):key:文檔的名字/value:文檔的內容for each word w in value:EmitIntermediate(w, 1);reduce(String key, Iterator v

7、alues):/key:一個詞/values:一個計數列表int result=0;for each v in values:result+=ParseInt(v);Emit(AsString(resut);map 函數產生每個詞和這個詞的出現次數(在這個簡單的例子里就是1)。reduce函數把產生的每一個特定的詞的計數加在一起。另外,用戶用輸入輸出文件的名字和可選的調節參數來填充一個mapreduce規范對象。用戶然后調用 MapReduce 函數,并把規范對象傳遞給它。用戶的代碼和MapReduce庫鏈接在一起(用C+實現)。附錄A包含這個實例的全部文本。2.2 類型即使前面的偽代碼寫成了

8、字符串輸入和輸出的 term 格式,但是概念上用戶寫的map和reduce函數有關聯的類型:map(k1, v1) -list(k2 , v2)reduce(k2, list(v2) -list(v2)例如,輸入的 key, value 和輸出的 key, value 的域不同。此外,中間key,value和輸出key, values的域相同。我們的C+實現傳遞字符用來和用戶自定義的函數交互,并把它留給用戶的 代碼,來在字符串和適當的類型間進行轉換。2.3 更多實例這里有一些讓人感興趣的簡單程序,可以容易的用MapReduce計算來表示。分布式的 Grep(UNIX 工具程序, 可做文件內的字

9、符串查找):如果輸入行匹配給定的樣式,map函數就輸出這一行。reduce函數就是把中間數據復制到輸出。計算 URL 訪問頻率 :map 函數處理 web 頁面請求的記錄,輸出 (URL , 1)。reduce函數把相同URL的value都加起來,產生一個(URL,記錄總數)的對。倒轉網絡鏈接圖 :map 函數為每個鏈接輸出(目標,源)對,一個URL 叫做目標,包含這個URL 的頁面叫做源。reduce 函數根據給定的相關目標URLs 連接所有的源 URLs 形成一個列表,產生( 目標,源列表)對。每個主機的術語向量:一個術語向量用一個(詞, 頻率 ) 列表來概述出現在一個文檔或一個文檔集中的

10、最重要的一些詞。 map 函數為每一個輸入文檔產生一個(主機名,術語向量)對(主機名來自文檔的URL)。reduce函數接收給定主機的所有文檔的術語向量。 它把這些術語向量加在一起, 丟棄低頻的術語, 然后產生一個最終的(主機名,術語向量)對。倒排索引 :map 函數分析每個文檔,然后產生一個(詞,文檔號)對的序列。reduce函數接受一個給定詞的所有對,排序相應的文檔IDs,并且產生一個(詞,文檔 ID 列表 )對。 所有的輸出對集形成一個簡單的倒排索引。 它可以簡單的增加 跟蹤詞位置的計算。分布式排序:map函數從每個記錄提取key,并且產生一個(key, record)對。 reduce

11、函數不改變任何的對。這個計算依賴分割工具(在4.1描述)和排序屬性(在 4.2描述 )。3. 實現MapReduce接口可能有許多不同的實現。根據環境進行正確的選擇。例如,一個實現對一個共享內存較小的機器是合適的,另外的適合一個大NUMA 的多處理器的機器,而有的適合一個更大的網絡機器的集合。這部分描述一個在Google廣泛使用的計算環境的實現:用交換機連接的普通 PC機的大機群。我們的環境是:1. Linux 操作系統,雙處理器, 24GB 內存的機器。2. 普通的網絡硬件,每個機器的帶寬或者是百兆或者千兆,但是平均小于全部帶寬的一半。3. 因為一個機群包含成百上千的機器,所有機器會經常出現

12、問題。4. 存儲用直接連到每個機器上的廉價IDE 硬盤。一個從內部文件系統發展起來的分布式文件系統被用來管理存儲在這些磁盤上的數據。 文件系統用復制的 方式在不可靠的硬件上來保證可靠性和有效性。5. 用戶提交工作給調度系統。每個工作包含一個任務集,每個工作被調度者映射到機群中一個可用的機器集上。3.1 執行預覽通過自動分割輸入數據成一個有M 個 split 的集, map 調用被分布到多臺機器上。輸入的 split 能夠在不同的機器上被并行處理。通過用分割函數分割中間key,來形成R個片(例如,hash(key) mod R), reduce調用被分布到多臺機器上。分割數量(R)和分割函數由用

13、戶來指定。圖1顯示了我們實現的 MapReduce操作的全部流程。當用戶的程序調用 MapReduce 的函數的時候,將發生下面的一系列動作(下面的數字和圖 1 中的數字標簽相對應 ):1 .在用戶程序里的MapReduce庫首先分割輸入文件成 M個片,每個片 的大小一般從16到64MB(用戶可以通過可選的參數來控制)。然后在機群中開 始大量的拷貝程序。2 .這些程序拷貝中的一個是 master,其他的都是由master分配任務 的workero有M個map任務和R個reduce任務將被分配。管理者分配一個 map 任務或reduce任務給一個空閑的 worker。3。一個被分配了map 任務

14、的 worker 讀取相關輸入 split 的內容。它從輸入數據中分析出key/value對,然后把key/value對傳遞給用戶自定義的 map函數。 由map函數產生的中間key/value對被緩存在內存中。4。 緩存在內存中的 key/value 對被周期性的寫入到本地磁盤上, 通過分割函數把它們寫入 R 個區域。在本地磁盤上的緩存對的位置被傳送給master, master負責把這些位置傳送給reduce worker。5。當一個reduce worker得到master的位置通知的時候,它使用遠程過程調 用來從map worker的磁盤上讀取緩存的數據。當reduce worker讀

15、取了所有的中 間數據后,它通過排序使具有相同 key 的內容聚合在一起。因為許多不同的 key 映射到相同的 reduce 任務,所以排序是必須的。如果中間數據比內存還大,那 么還需要一個外部排序。6。reduce worker迭代排過序的中間數據,對于遇到的每一個唯一的 中間key,它把key和相關的中間value集傳遞給用戶自定義的reduce函數。reduce 函數的輸出被添加到這個reduce 分割的最終的輸出文件中。7。當所有的map和reduce任務都完成了,管理者喚醒用戶程序。在這個時 候,在用戶程序里的MapReduce調用返回到用戶代碼。在成功完成之后, mapreduce

16、執行的輸出存放在R 個輸出文件中(每一個reduce任務產生一個由用戶指定名字的文件)。一般,用戶不需要合并這 R個輸 出 文 件 成 一 個 文 件 - 他 們 經 常 把 這 些 文 件 當 作 一 個 輸 入 傳 遞 給 其 他 的 MapReduce調用,或者在可以處理多個分割文件的分布式應用中使用他們。3。2master數據結構master保持一些數據結構。它為每一個map和reduced務存儲它們的狀態(空 閑,工作中,完成),和 worker 機器 (非空閑任務的機器)的標識。master就像一個管道,通過它,中間文件區域的位置從 map任務傳遞到reduce 任務。因此,對于每

17、個完成的 map任務,master存儲由map任務產生的R個中 間文件區域的大小和位置。當 map 任務完成的時候,位置和大小的更新信息被 接受。這些信息被逐步增加的傳遞給那些正在工作的reduce任務。3。 3 容錯因為MapReduce庫被設計用來使用成百上千的機器來幫助處理非常大規模 的數據,所以這個庫必須要能很好的處理機器故障。worker 故障master周期性的ping每個worker。如果master在一個確定的時間段內沒有收到 worker 返回的信息,那么它將把這個worker 標記成失效。因為每一個由這個失效的worker完成的map任務被重新設置成它初始的空閑狀態,所以它

18、可以 被安排給其他的workero同樣的,每一個在失敗的 worker上正在運行的map或 reduce任務,也被重新設置成空閑狀態,并且將被重新調度。在一個失敗機器上已經完成的 map 任務將被再次執行,因為它的輸出存儲在它的磁盤上,所以不可訪問。已經完成的 reduce 任務將不會再次執行,因為 它的輸出存儲在全局文件系統中。當一個map任務首先被worker A執行之后,又被B執行了(因為A失效了), 重新執行這個情況被通知給所有執行 reduce任務的worker。任何還沒有從A讀 數據的reduce任務將從worker B讀取數據。MapReduce可以處理大規模 worker失敗的

19、情況。例如,在一個 MapReduce操作期間,在正在運行的機群上進行網絡維護引起80 臺機器在幾分鐘內不可訪問了,MapReduce master只是簡單的再次執行已經被不可訪問的worker完成的工作,繼續執行,最終完成這個 MapReduce操作。master失敗可以很容易的讓管理者周期的寫入上面描述的數據結構的checkpoints。 如果這個master任務失效了,可以從上次最后一個checkpoint開始啟動另一個master 進程。然而,因為只有一個 master,所以它的失敗是比較麻煩的,因此我們現在 的實現是,如果master失敗,就中止MapReduce計算??蛻艨梢詸z查這

20、個狀態, 并且可以根據需要重新執行MapReduce操作。在錯誤面前的處理機制當用戶提供的map和reduce操作對它的輸出值是確定的函數時,我們的分 布式實現產生,和全部程序沒有錯誤的順序執行一樣,相同的輸出。我們依賴對map和reduce任務的輸出進行原子提交來完成這個性質。每個工作中的任務把它的輸出寫到私有臨時文件中。一個reduce 任務產生一個這樣的文件,而一個map任務產生R個這樣的文件(一個reduce任務對應一個文件)。 當一個map任務完成白時候,worker發送一個消息給 master,在這個消息中包 含這R個臨時文件的名字。如果 master從一個已經完成的map任務再次

21、收到一 個完成的消息,它將忽略這個消息。否則,它在master 的數據結構里記錄這R個文件的名字。當一個reduce任務完成的時候,這個reduce worker原子的把臨時文件重命名成最終的輸出文件。如果相同的 reduce 任務在多個機器上執行,多個重命名調用將被執行, 并產生相同的輸出文件。 我們依賴由底層文件系統提供的原子重命名操作來保證,最終的文件系統狀態僅僅包含一個reduce任務產生的數據。我們的map和reduce操作大部分都是確定的,并且我們的處理機制等價于一個順序的執行的這個事實, 使得程序員可以很容易的理解程序的行為。 當 map 或/和reduce操作是不確定的時候,我

22、們提供雖然比較弱但是合理的處理機制。當在一個非確定操作的前面,一個reduce任務R1的輸出等價于一個非確定順序 程序執行產生的輸出。然而,一個不同的reduce任務R2的輸出也許符合一個不 同的非確定順序程序執行產生的輸出??紤]map任務M和reduce任務R1, R2的情況。我們設定e(Ri)為已經提交 的Ri的執行(有且僅有一個這樣的執行)。這個比較弱的語義出現,因為 e(R1)也 許已經讀取了由M的執行產生的輸出,而e(R2)也許已經讀取了由M的不同執 行產生的輸出。3。 4 存儲位置在我們的計算機環境里, 網絡帶寬是一個相當缺乏的資源。 我們利用把輸入數據(由GFS管理)存儲在機器的

23、本地磁盤上來保存網絡帶寬。GFS把每個文件分 成 64MB 的一些塊,然后每個塊的幾個拷貝存儲在不同的機器上(一般是3 個拷貝)。MapReduce的master考慮輸入文件的位置信息,并且努力在一個包含相關 輸入數據的機器上安排一個map 任務。如果這樣做失敗了,它嘗試在那個任務的輸入數據的附近安排一個map 任務 (例如,分配到一個和包含輸入數據塊在一個switch里的worker機器上執行)。當運行巨大的MapReduce操作在一個機群中 的一部分機器上的時候,大部分輸入數據在本地被讀取,從而不消耗網絡帶寬。4。 5 任務粒度象上面描述的那樣,我們細分map 階段成 M 個片,reduc

24、e 階段成R 個片。M 和 R 應當比 worker 機器的數量大許多。 每個 worker 執行許多不同的工作來提高動態負載均衡,也可以加速從一個worker 失效中的恢復,這個機器上的許多已經完成的map任務可以被分配到所有其他的 worker機器上。在我們的實現里,M和R的范圍是有大小限制的,因為master必須做O(M+R)次調度, 并且保存 O(M*R) 個狀態在內存中。(這個因素使用的內存是很少的,在O(M*R)個狀態片里,大約每個 map任務/reduce任務對使用一個字節的數據)。此外,R經常被用戶限制,因為每一個reduce任務最終都是一個獨立的輸出文件。 實際上, 我們傾向

25、于選擇M , 以便每一個單獨的任務大概都是16 到 64MB的輸入數據(以便上面描述的位置優化是最有效的), 我們把 R 設置成我們希望使用的worker機器數量的小倍數。我們經常執行MapReduce計算,在M=200000, R=5000,使用2000臺工作者機器的情況下。5。 6 備用任務一個落后者是延長MapReduce操作時間的原因之一:一個機器花費一個異乎 尋常地的長時間來完成最后的一些 map或reduce任務中的一個。有很多原因可 能產生落后者。 例如, 一個有壞磁盤的機器經常發生可以糾正的錯誤, 這樣就使讀性能從 30MB/s 降低到 3MB/s 。機群調度系統也許已經安排其

26、他的任務在這個機器上,由于計算要使用 CPU,內存,本地磁盤,網絡帶寬的原因,引起它執 行MapReduce代碼很慢。我們最近遇到的一個問題是,一個在機器初始化時的 Bug 引起處理器緩存的失效:在一個被影響的機器上的計算性能有上百倍的影響。我們有一個一般的機制來減輕這個落后者的問題。當一個 MapReduce操作 將要完成的時候,master調度備用進程來執行那些剩下的還在執行的任務。無論是原來的還是備用的執行完成了, 工作都被標記成完成。 我們已經調整了這個機 制, 通常只會占用多幾個百分點的機器資源。 我們發現這可以顯著的減少完成大 規模MapReduce操作的時間。作為一個例子,將要在

27、 5。3描述的排序程序,在 關閉掉備用任務的情況下,要比有備用任務的情況下多花44%的時間。4 技巧盡管簡單的 map 和 reduce 函數的功能對于大多數需求是足夠的了,但是我 們開發了一些有用的擴充。這些將在這個部分描述。4。 1 分割函數MapReduce用戶指定reduce任務和reduce任務需要的輸出文件的數量。在 中間 key 上使用分割函數, 使數據分割后通過這些任務。 一個缺省的分割函數使 用hash方法(例如,hash(key) mod Rb這個導致非常平衡的分割。然后,有的時 候,使用其他的 key 分割函數來分割數據有非常有用的。例如,有時候,輸出的 key 是 UR

28、Ls ,并且我們希望每個主機的所有條目保持在同一個輸出文件中。為了支持像這樣的情況,MapReduce庫的用戶可以提供專門的分割函數。例如,使 用hash(Hostname(urlkey) mod R祚為分割函數,使所有來自同一個主機的 URLs 保存在同一個輸出文件中。4。 2 順序保證我們保證在一個給定的分割里面,中間key/value對以key遞增的順序處理。這個順序保證可以使每個分割產出一個有序的輸出文件, 當輸出文件的格式需要支持有效率的隨機訪問 key 的時候, 或者對輸出數據集再作排序的時候, 就很容4。 3combiner 函數在某些情況下, 允許中間結果key 重復會占據相當

29、的比重, 并且用戶定義的reduce函數滿足結合律和交換律。一個很好的例子就是在 2。 1 部分的詞統計程序。因 為詞頻率傾向于一個zipf 分布 (齊夫分布), 每個 map 任務將產生成百上千個這樣的記錄the, 1。所有的這些計數將通過網絡被傳輸到一個單獨的reduce任務,然后由 reduce 函數加在一起產生一個數字。我們允許用戶指定一個可選的combiner 函數,先在本地進行合并一下,然后再通過網絡發送。在每一個執行map任務的機器上combiner函數被執行。一般的,相同的代 碼被用在combiner和reduce函數。在combiner和reduce函數之間唯一的區別是 Ma

30、pReduce庫怎樣控制函數的輸出。reduce函數的輸出被保存最終輸出文件里。 combiner函數的輸出被寫到中間文件里,然后被發送給 reduce任務。部分使用combiner可以顯著的提高一些MapReduce操作的速度。附錄A包 含一個使用 combiner 函數的例子。4。 4 輸入輸出類型MapReduce庫支持以幾種不同的格式讀取輸入數據。例如,文本模式輸入把 每一行看作是一個key/value對。key是文件的偏移量,value是那一行的內容。 其他普通的支持格式以 key 的順序存儲key/value 對序列。每一個輸入類型的實現知道怎樣把輸入分割成對每個單獨的 map 任

31、務來說是有意義的(例如,文本模式的范圍分割確保僅僅在每行的邊界進行范圍分割) 。雖然許多用戶僅僅使用很少的預定意輸入類型的一個,但是用戶可以通過提供一個簡單的reader接口來支 持一個新的輸入類型。一個reader不必要從文件里讀數據。例如,我們可以很容易的定義它從數據 庫里讀記錄,或從內存中的數據結構讀取。4。 5 副作用有的時候,MapReduce的用戶發現在map操作或/和reduce操作時產生輔助 文件作為一個附加的輸出是很方便的。 我們依靠應用程序寫來使這個副作用成為 原子的。一般的,應用程序寫一個臨時文件,然后一旦這個文件全部產生完,就 自動的被重命名。對于單個任務產生的多個輸出

32、文件來說, 我們沒有提供其上的兩階段提交的 原子操作支持。 因此, 一個產生需要交叉文件連接的多個輸出文件的任務, 應該 使確定性的任務。不過這個限制在實際的工作中并不是一個問題。4。 6 跳過錯誤記錄有的時候因為用戶的代碼里有 bug,導致在某一個記錄上 map或reduce函 數突然crash掉。這樣的bug使得MapReduce操作不能完成。雖然一般是修復這 個bug,但是有時候這是不現實的;也許這個bug是在源代碼不可得到的第三方庫 里。 有的時候也可以忽略一些記錄, 例如, 當在一個大的數據集上進行統計分析。我們提供一個可選的執行模式,在這個模式下,MapReduce庫檢測那些記錄引

33、起 的crash,然后跳過那些記錄,來繼續執行程序。每個 worker 程序安裝一個信號處理器來獲取內存段異常和總線錯誤。在調用一個用戶自定義的map或reduce操作之前,MapReduce庫把記錄的序列號存 儲在一個全局變量里。 如果用戶代碼產生一個信號, 那個信號處理器就會發送一個包含序號的last gasp”UDP包給MapReduce的masters當master不止一次看到 同一個記錄的時候,它就會指出,當相關的map或reduce任務再次執行的時候, 這個記錄應當被跳過。4。 7 本地執行調試在 map 或 reduce 函數中問題是很困難的,因為實際的計算發生在一個分布式的系統

34、中,經常是有一個master 動態的分配工作給幾千臺機器。為了簡化調試和測試,我們開發了一個可替換的實現,這個實現在本地執行所有的MapReduce操作。用戶可以控制執行,這樣計算可以限制到特定的 map任務上。用戶以一個標志調用他們的程序, 然后可以容易的使用他們認為好用的任何調試和測試工具(例如,gdb)。4。 8 狀態信息master運行一個HTTP服務器,并且可以輸出一組狀況頁來供人們使用。狀態頁顯示計算進度,象多少個任務已經完成,多少個還在運行,輸入的字節數,中間數據字節數,輸出字節數,處理百分比,等等。這個頁也包含到標準錯誤的鏈接, 和由每個任務產生的標準輸出的鏈接。 用戶可以根據

35、這些數據預測計算需要花費的時間, 和是否需要更多的資源。 當計算比預期的要慢很多的時候, 這些 頁面也可以被用來判斷是不是這樣。此外, 最上面的狀態頁顯示已經有多少個工作者失敗了, 和當它們失敗的時候,那個map和reduce任務正在運行。當試圖診斷在用戶代碼里的bug時,這個信息也是有用的。6。 9 計數器MapReduce庫提供一個計數器工具,來計算各種事件的發生次數。例如,用 戶代碼想要計算所有處理的詞的個數,或者被索引的德文文檔的數量。為了使用這個工具,用戶代碼創建一個命名的計數器對象,然后在map 或/和 reduce 函數里適當的增加計數器。例如:Counter * upperca

36、se;uppercase=GetCounter(uppercase);map(String name, String contents):for each word w in contents:if(IsCapitalized(w):uppercase-Increment();EmitIntermediate(w, 1);來自不同worker機器上的計數器值被周期性的傳送給 master ping回應里) master把來自成功的 map和reduce任務的計數器值加起來,在 MapReduce操作 完成的時候,把它返回給用戶代碼。當前計數器的值也被顯示在master 狀態頁里, 以便人們可以

37、查看實際的計算進度。 當計算計數器值的時候消除重復執行的影響,避免數據的累加。(在備用任務的使用,和由于出錯的重新執行,可以產生重復執行)有些計數器值被MapReduce庫自動的維護,比如,被處理的輸入 key/value 對的數量,和被產生的輸出 key/value 對的數量。用戶發現計數器工具對于檢查MapReduce操作的完整性很有用。例如,在一些MapReduce操作中,用戶代碼也許想要確保輸出對的數量完全等于輸入對 的數量, 或者處理過的德文文檔的數量是在全部被處理的文檔數量中屬于合理的范圍。5 性能在本節,我們用在一個大型集群上運行的兩個計算來衡量MapReduce 的性能。一個計

38、算用來在一個大概1TB 的數據中查找特定的匹配串。另一個計算排序大概 1TB 的數據。這兩個程序代表了 MapReduce 的用戶實現的真實的程序的一個大子集。一類是, 把數據從一種表示轉化到另一種表示。 另一類是, 從一個大的數據集中提 取少量的關心的數據。5。 1 機群配置所有的程序在包含大概1800 臺機器的機群上執行。機器的配置是:2 個 2G的 Intel Xeon 超線程處理器, 4GB 內存, 兩個 160GB IDE 磁盤, 一個千兆網卡。這些機器部署在一個由兩層的, 樹形交換網絡中, 在根節點上大概有100 到 2000G的帶寬。所有這些機器都有相同的部署(對等部署) ,因此

39、任意兩點之間的來回時間小于 1 毫秒。在 4GB 的內存里,大概有1-1。 5GB 被用來運行在機群中其他的任務。這個程序是在周末的下午開始執行的,這個時候 CPU,磁盤,網絡基本上是空閑 的。5。 2Grep這個Grep程序掃描大概10A10個,每個100字節的記錄,查找比較少的 3 字符的查找串(這個查找串出現在92337個記錄中)。 輸入數據被分割成大概64MB的片 (M=15000) ,全部的輸出存放在一個文件中 (R=1)。圖 2 顯示計算過程隨時間變化的情況。 Y 軸表示輸入數據被掃描的速度。 隨 著更多的機群被分配給這個 MapReduce計算,速度在逐步的提高,當有1764個

40、worker的時候這個速度達到最高的30GB/s。當map任務完成的時候,速度開始 下降, 在計算開始后 80 秒, 輸入的速度降到0。 這個計算持續的時間大概是150秒。 這包括了前面大概一分鐘的啟動時間。 啟動時間用來把程序傳播到所有的機 器上,等待GFS打開1000個輸入文件,得到必要的位置優化信息。5。 3 排序這個sort程序排序10A10個記錄,每個記錄100個字節(大概1TB的數據)。 這個程序是模仿TeraSort 的。這個排序程序只包含不到 50 行的用戶代碼。其中有3 行 map 函數用來從文 本行提取10字節的排序key,并且產生一個由這個key和原始文本行組成的中 問k

41、ey/value對。我們使用一個內置的Identity函數作為reduce操作。這個函數 直接把中間key/value對作為輸出的key/value對。最終的排序輸出寫到一個2路 復制的GFS文件中(也就是,程序的輸出會寫2TB的數據)。象以前一樣,輸入數據被分割成 64MB的片(M=15000)。我們把排序后的輸 出寫到4000個文件中(R=4000)。分區函數使用key的原始字節來把數據分區到 R 個小片中。我們以這個基準的分割函數,知道 key 的分布情況。在一般的排序程序中,我們會增加一個預處理的 MapReduce操作,這個操作用于采樣 key的情況,并 且用這個采樣的 key 的分

42、布情況來計算對最終排序處理的分割點。圖3(a)顯示這個排序程序的正常執行情況。左上圖顯示輸入數據的讀取速度。 這個速度最高到達13GB/S,并且在不到200秒所有map任務完成之后迅速滑落 到0o注意到這個輸入速度小于 Grep。這是因為這個排序 map任務花費大概一 半的時間和帶寬,來把中間數據寫到本地硬盤中。而Grep 相關的中間數據可以忽略不計。左中圖顯示數據通過網絡從 map任務傳輸給reduce任務的速度。當第一個 map任務完成后,這個排序過程就開始了。圖示上的第一個高峰是啟動了第一批 大概1700個reduce任務(整個MapReduce任務被分配到1700臺機器上,每個機 器一

43、次只執行一個reduce任務)。大概開始計算后的300秒,第一批reduce任務 中的一些完成了,我們開始執行剩下的 reduce 任務。全部的排序過程持續了大 概 600 秒的時間。左下圖顯示排序后的數據被 reduce 任務寫入最終文件的速度。因為機器忙于排序中間數據, 所以在第一個排序階段的結束和寫階段的開始有一個延遲。 寫 的速度大概是2-4GB/S。大概開始計算后的850秒寫過程結束。包括前面的啟動 過程,全部的計算任務持續的891秒。這個和TeraSort benchmark最高紀錄1057 秒差不多。需要注意的事情是:因此位置優化的原因,很多數據都是從本地磁盤讀取的而沒有通過我們

44、有限帶寬的網絡,所以輸入速度比排序速度和輸出速度都要快。排序速度比輸出速度快的原因是輸出階段寫兩個排序后數據的拷貝(我們寫兩個副本的原因是為了可靠性和可用性) 。我們寫兩份的原因是因為底層文件系統的可靠性和可用性的要求。如果底層文件系統用類似容錯編碼(erasure coding珀方式,而不采用復制寫的方式,在寫盤階段可以降低網絡帶寬的要求。5。 4 備用任務的影響在圖3(b)中,顯示我們不用備用任務的排序程序的執行情況。 除了它有一個 很長的幾乎沒有寫動作發生的尾巴外,執行流程和圖 3(a)相似。在960秒后,只 有5個reduce任務沒有完成。然而,就是這最后幾個落后者知道 300秒后才完

45、 成。全部的計算任務執行了 1283 秒,多花了44%的時間。7。 5 機器失效在圖3(c)中,顯示我們有意的在排序程序計算過程中停止1746臺worker中的 200 臺機器上的程序的情況。 底層機群調度者在這些機器上馬上重新開始新的worker 程序 (因為僅僅程序被停止,而機器仍然在正常運行)。因為已經完成的 map 工作丟失了 (由于相關的 map worker 被殺掉了) ,需要重新再作,所以worker死掉會導致一個負數的輸入速率。相關 map任務的重新 執行很快就重新執行了。 整個計算過程在933 秒內完成, 包括了前邊的啟動時間(只比正常執行時間多了5%的時間)。6 經驗我們在

46、2003年的2月寫了 MapReduce庫的第一個版本,并且在2003年的8 月做了顯著的增強, 包括位置優化, worker 機器間任務執行的動態負載均衡, 等 等。從那個時候起,我們驚奇的發現MapReduce 函數庫廣泛用于我們日常處理的問題。它現在在Google 內部各個領域內廣泛應用,包括:大規模機器學習問題Google News和Froogle產品的機器問題。提取數據產生一個流行查詢的報告(例如,Google Zeitgeist)。為新的試驗和產品提取網頁的屬性(例如,從一個web 頁的大集合中提取位置信息 用在位置查詢 ) 。大規模的圖計算。圖4顯示了我們主要的源代碼管理系統中,

47、隨著時間推移,MapReduce程序的顯著增加, 從 2003 年早先時候的 0 個增長到 2004 年 9 月份的差不多 900 個不同的程序。MapReduce之所以這樣的成功,是因為他能夠在不到半小時時間內寫出一個簡單的能夠應用于上千臺機器的大規模并發程序, 并且極大的提高了開發和原形設計的周期效率。并且,他可以讓一個完全沒有分布式和/ 或并行系統經驗的程序員,能夠很容易的利用大量的資源。在每一個任務結束的時候,MapReduce 函數庫記錄使用的計算資源的統計信息。在圖 1 里,我們列出了 2004 年 8 月份在 Google 運行的一些 MapReduce 的 工作的統計信息。6

48、。 1 大規模索引到目前為止, 最成功的 MapReduce 的應用就是重寫了 Google web 搜索服務所使用到的 index 系統。索引系統處理爬蟲系統抓回來的超大量的文檔集,這些文檔集保存在GFS 文件里。這些文檔的原始內容的大小,超過了20TB 。索引程序是通過一系列的,大概5 到 10 次 MapReduce 操作來建立索引。通過利用MapReduce睛換掉上一個版本的特別設計白分布處理的索引程序版本)有這樣一些好處 :索引的代碼簡單,量少,容易理解,因為容錯,分布式,并行處理都隱藏在MapReduce庫中了。例如,當使用 MapReduce函數庫的時候,計算的代碼行數從原來的3

49、800行C+代碼一下減少到大概700行代碼。MapReduce的函數庫的性能已經非常好,所以我們可以把概念上不相關的計算步驟分開處理, 而不是混在一起以期減少在數據上的處理。 這使得改變索引過程很容易。例如,我們對老索引系統的一個小更改可能要好幾個月的時間,但是在新系統內,只需要花幾天時間就可以了。索引系統的操作更容易了,這是因為機器的失效,速度慢的機器,以及網絡失效都已經由 MapReduce 自己解決了,而不需要操作人員的交互。另外,我們可以簡單的通過對索引系統增加機器的方式提高處理性能。7 相關工作很多系統都提供了嚴格的設計模式, 并且通過對編程的嚴格限制來實現自動的并行計算。例如,一個

50、結合函數可以通過N 個元素的數組的前綴在N 個處理器上使用并行前綴計算在log N的時間內計算完。MapReduce是基于我們的大型現實計算的經驗, 對這些模型的一個簡化和精煉。 并且, 我們還提供了基于上千臺處理器的容錯實現。 而大部分并發處理系統都只在小規模的尺度上實現, 并且 機器的容錯還是程序員來控制的。Bulk Synchronous Programming 以及一些 MPI primitives 提供了更高級別的抽象,可以更容易寫出并行處理的程序。這些系統和 MapReduce 系統的不同之處在,MapReduce利用嚴格的編程模式自動實現用戶程序的并發處理,并且提供了透明的容錯處

51、理。我們本地的優化策略是受active disks等技術的啟發,在active disks中,計 算任務是盡量推送到靠近本地磁盤的處理單元上, 這樣就減少了通過I/O 子系統或網絡的數據量。 我們在少量磁盤直接連接到普通處理機運行, 來代替直接連接 到磁盤控制器的處理機上,但是一般的步驟是相似的。我們的備用任務的機制和在 Charlotte 系統上的積極調度機制相似。這個簡單的積極調度的一個缺陷是, 如果一個任務引起了一個重復性的失敗, 那個整個計算將無法完成。 我們通過在故障情況下跳過故障記錄的機制, 在某種程度上解決了這個問題。MapReduce 實現依賴一個內置的機群管理系統來在一個大規

52、模共享機器組上分布和運行用戶任務。 雖然這個不是本論文的重點, 但是集群管理系統在理念 上和 Condor 等其他系統是一樣的。在 MapReduce 庫中的排序工具在操作上和 NOW-Sort 相似。源機器(mapworker)分割將要被排序的數據,然后把它發送到R個reduce worker中的一個上。 每個reduce worker來本地排序它的數據(如果可能,就在內存中)。當然,NOW-Sort 沒有用戶自定義的map 和 reduce 函數,使得我們的庫可以廣泛的應用。River 提供一個編程模型,在這個模型下,處理進程可以靠在分布式的隊列上發送數據進行彼此通訊。和MapReduce

53、一樣,River系統嘗試提供對不同應用有近似平均的性能,即使在不對等的硬件環境下或者在系統顛簸的情況下也能提供近似平均的性。River 是通過精心調度硬盤和網絡的通訊,來平衡任務的完成時間。 MapReduce 不和它不同。利用嚴格編程模型, MapReduce 構架來把問題分割成大量的任務。這些任務被自動的在可用的 worker 上調度,以便速度快的worker 可以處理更多的任務。 這個嚴格編程模型也讓我們可以在工作快要結束的時候安排冗余的執行,來在非一致處理的情況減少完成時間(比如,在有慢機或者阻塞的 worker 的時候 )BAD-FS 是一個很 MapReduce 完全不同的編程模型

54、,它的目標是在一個廣闊的網絡上執行工作。然而,它們有兩個基本原理是相同的。 (1)這兩個系統使用冗余的執行來從由失效引起的數據丟失中恢復。(2)這兩個系統使用本地化調度策略,來減少通過擁擠的網絡連接發送的數據數量。TACC 是一個被設計用來簡化高有效性網絡服務結構的系統。 和 MapReduce 一樣,它通過再次執行來實現容錯。8 結束語MapReduce編程模型已經在Google成功的用在不同的目的。我們把這個成 功歸于以下幾個原因 :第一,這個模型使用簡單,甚至對沒有并行和分布式經驗 的程序員也是如此,因為它隱藏了并行化,容錯,位置優化和負載均衡的細節。第二,大量不同的問題可以用 MapReduce計算來表達。例如,MapReduce被用 來,為Google的產品web搜索服務,排序,數據挖掘,機器學習,和其他許多 系統, 產生數據。 第三, 我們已經在一個好幾千臺計算機的大型集群上開發實現 了這個MapReduce。 這個實現使得對于這些機器資源的利用非常簡單, 因此也適用于解決Google遇到的其他很多需要大量計算的問題。從這個工作中我們也學習到了一些東西。 首先, 嚴格的編程模型使得并行化和分布式計算簡單, 并且也易于構造這

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論