分布式計算、云計算與大數據 第2版 課件 第9章 大數據技術與編程_第1頁
分布式計算、云計算與大數據 第2版 課件 第9章 大數據技術與編程_第2頁
分布式計算、云計算與大數據 第2版 課件 第9章 大數據技術與編程_第3頁
分布式計算、云計算與大數據 第2版 課件 第9章 大數據技術與編程_第4頁
分布式計算、云計算與大數據 第2版 課件 第9章 大數據技術與編程_第5頁
已閱讀5頁,還剩94頁未讀 繼續免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

第9章大數據技術與編程提綱5.1 大數據背景與概述5.2 大數據處理關鍵技術5.3 大數據計算模式9.4基于Hadoop的大數據編程實踐9.5基于Spark的大數據編程實踐大數據產生的背景

1980年,未來學家阿爾文·托夫勒在《第三次浪潮》中推崇大數據,但當時數據領域尚處于初期。首個數據中心和關系數據庫便出現在這個時代。隨著物聯網的興起,越來越多設備連接互聯網,收集客戶使用模式和產品性能數據,而機器學習也推動了數據增長。然而,盡管已經存在一段時間,大數據的潛力仍未完全釋放。今天,云計算提供了彈性和可擴展性,使開發人員能夠輕松測試數據子集,從而引發了全球技術變革,大數據技術得以嶄露頭角。

2005年左右,人們開始意識到在線服務如Facebook和YouTube產生了大量數據。同時,Hadoop和NoSQL等工具的出現降低了數據存儲成本,推動了大數據的發展。大數據量在接下來幾年急速增長,現在全球“用戶”--不僅有人類,還有機器不斷產生數據。大數據的定義

數據量大!

數據類型復雜!大數據的“5V”特征

容量(Volume)

速率(Velocity)

多樣性(Variety)

真實性(Veracity)

價值(Value)大數據的5V特征大數據的5V特征

容量(Volume)這是指大規模的數據量,并且數據量呈持續增長趨勢。通常,它們的規模超過10T,但隨著技術進步,定義中的數據大小也可能會改變。大規模的數據對象構成的集合,即稱為“數據集”。這些數據集由不同類型的數據組成,具有不同的特征,包括維度、稀疏性和分辨率。數據集可以分為記錄數據集(存儲在數據庫中的記錄集合)、基于圖形的數據集(包含數據對象之間的聯系,并用圖形表示)和有序數據集(包括時間和空間信息,用于存儲時間序列和空間數據等)。大數據的5V特征

速率(Velocity)即數據生成、流動速率快。數據流動速率指指對數據采集、存儲以及分析具有價值信息的速度。因此也意味著數據的采集和分析等過程必須迅速及時。

真實性(Veracity)指數據的質量和保真性。大數據環境下的數據最好具有較高的信噪比。信噪比與數據源和數據類型無關。大數據的5V特征

多樣性(Variety)指是大數據包括多種不同格式和不同類型的數據。數據來源包括人與系統交互時與機器自動生成,來源的多樣性導致數據類型的多樣性。根據數據是否具有一定的模式、結構和關系,數據可分為三種基本類型:結構化數據、非結構化數據、半結構化數據。結構化數據:指遵循一個標準的模式和結構,以二維表格的形式存儲在關系型數據庫里的行數據。非結構化數據:指不遵循統一的數據結構或模型的數據(如文本、圖像、視頻、音頻等),不方便用二維邏輯表來表現。半結構化數據:指有一定的結構性,但本質上不具有關系性,介于完全結構化數據和完全非結構化數據之間的數據。大數據的5V特征

價值(Value)指大數據的低價值密度。隨著數據量的增長,數據中有意義的信息卻沒有成相應比例增長。而價值同時與數據的真實性和數據處理時間相關,如圖所示。大數據發展趨勢大數據發展趨勢大數據發展趨勢提綱9.1 大數據背景與概述9.2 大數據處理關鍵技術9.3 大數據計算模式9.4基于Hadoop的大數據編程實踐9.5基于Spark的大數據編程實踐大數據處理關鍵技術大數據采集大數據預處理大數據存儲及管理大數據分析及挖掘大數據展現及應用大數據采集

數據采集(DAQ),又稱數據獲取,是指從傳感器和其它待測設備等模擬和數字被測單元中自動采集信息的過程。根據數據量大小、數據復雜度以及采用數據庫不同,數據采集分“傳統的數據采集”和“大數據的數據采集”兩類(區別如下)。采集方法

大數據技術在數據采集方面采用了以下方法:系統日志采集方法許多互聯網企業都使用分布式架構的工具,如Hadoop的Chukwa、Apache的Flume、Cloudera的Flume、Facebook的Scribe等,用于采集系統日志,能夠滿足大量日志數據每秒數百MB的采集和傳輸需求。網絡數據采集方法網絡數據采集是通過爬蟲或網站API等手段從網站上提取數據,將非結構化數據存儲為本地結構化文件,支持采集圖片、音頻、視頻等文件或附件采集。同時,網絡流量采集可以用帶寬管理技術如DPI或DFI進行處理。其它數據采集方法對于企業生產經營數據或學科研究數據等保密性要求較高的數據,可以通過與企業或研究機構合作,使用特定系統接口等相關方式采集數據。采集平臺

以下是幾款應用廣泛的大數據采集平臺,供參考。大數據采集平臺簡介1ApacheFlumeFlume是Apache的開源數據采集系統,具備高可靠性、可擴展性和易管理性,支持客戶端擴展。它基于JRuby構建,需要Java運行環境。2FluentdFluentd是一個開源的數據收集框架,使用C/Ruby編寫,采用JSON格式統一日志數據。它支持多種數據源和輸出格式,具有高可靠性和良好的擴展性,由TreasureData,Inc提供支持和維護。3LogstashLogstash是ELK數據棧中的L,是一個開源數據采集系統,使用JRuby開發,運行時依賴JVM。4SplunkForwarderSplunk是分布式機器數據平臺,包括三個角色:SearchHead負責搜索和信息抽取,Indexer負責存儲和索引,Forwarder負責數據的采集、清洗和發送給Indexer。大數據預處理

缺失數據處理缺失數據是常見情況,當收集數據時,有些樣本數據可能因各種原因缺失。如何有效處理這些缺失數據以用于算法訓練是需要解決的關鍵問題。缺失數據處理方法介紹1刪除法簡單刪除因特殊異常原因導致的數據缺失,只需刪除極少數的樣本數據,且未來模型應用中缺失維度的情況很少發生。2填充法通常使用默認值或均值等填充缺失維度信息的方法很常見,因其易操作和易解釋等優勢。但有時填入相同的數值可能會降低該維度的區分度。3映射到高維空間完美的保留的缺失值這個信息,不會對原始信息加入人為的先驗知識,帶來的問題就是數據維度的增加,算法的計算量也隨之變大。大數據預處理

數據數值化在收集到的各維度信息中,有些是字符串,如性別和學歷水平等。這些信息無法直接用于算法計算,通常需要將它們轉換成數值形式以便后續算法計算。數據數值化方法介紹1離散編碼對于可窮舉的字符串通常根據出現的頻率進行編碼即可,例如男出現100次,女出現80次,將男編碼為0,女編碼為1。2語義編碼對于無法通過窮舉法完全表示的信息,如文本分類中的自然語言信息,通常采用詞嵌入(wordembedding)方法,其中基于Google的word2vec方法是一個較好的選擇。在同一語料庫訓練下,這些詞嵌入可以攜帶一些語義信息。大數據預處理

大數據存儲及管理

背景存儲規模大種類和來源多樣化,存儲管理復雜對數據的種類和水平要求高大數據存儲及管理

有效存儲和管理大數據的三種方式:不斷加密企業對各種數據的安全至關重要,通常視為私有和受控的。然而,黑客攻擊頻繁發生,網絡攻擊報道屢見不鮮,這讓很多公司感到擔憂,尤其是行業領袖常常成為攻擊目標。為了保護資產,加密技術是一種有效對抗網絡威脅的方法,它將數據轉化為代碼并使用加密信息,只有收件人能夠解碼。通過加密來保護數據傳輸,提高數字傳輸的準確性和安全性。倉庫存儲大數據似乎難以管理,就像一個無盡的數據漩渦。將數據集中到一個公司位置似乎明智,類似于一個倉庫,但一些報告提出反對意見,認為即使在最大的存儲中心,也無法應對大數據指數級增長。備份服務當然,大數據管理和存儲正迅速擺脫物理機器,進入數字領域。隨著技術不斷進步,大數據增長迅猛,已經到達無法完全容納在所有機器和倉庫中的程度。大數據存儲及管理

數據存儲管理這塊分為兩個部分,一部分是底層的文件系統,還有一部分就是之上的數據庫或數據倉庫。常用工具如下:存儲管理工具介紹1文件系統大數據文件系統其實是大數據平臺架構最為基礎的組件,其他的組件或多或少都會依賴這個基礎組件,目前應用最為廣泛的大數據存儲文件系統非Hadoop的HDFS莫屬,除此之外,還有發展勢頭不錯的Ceph。2數據庫或數據倉庫針對大數據的數據庫大部分是NOSQL數據庫,這里順便澄清一下,NOSQL的真正意義是“notonlysql”,并非NOSQL是RMDB的對立面。常用的數據庫或數據倉庫有HBase、MongoDB、Cassandra、Neo4j等。大數據分析及挖掘概述數據分析是對大量數據進行詳細研究、提取有用信息和形成結論的過程,支持質量管理體系,幫助做出適當決策。它結合了數學和計算機科學,早在20世紀初就確立了數學基礎,但直到計算機出現才得以廣泛應用。數據挖掘是跨學科的計算機科學分支,利用人工智能、機器學習、統計學和數據庫等方法,在大型數據集中發現模式的計算過程。其總體目標是從數據集中提取信息并轉化為可理解的結構,以便進一步應用。數據挖掘包括數據預處理、模型構建、興趣度度量、復雜性考慮、可視化等步驟,屬于機器學習的一部分,也是數據庫知識發現(KDD)的一部分。區別:數據挖掘是通過人工智能、機器學習、統計學和數據庫方法在大型數據集中發現知識規則的計算過程。數據分析包括檢查、清理、轉換和建模等過程,是人的智能活動的結果,旨在發現有用信息、提出建設性結論并輔助決策。在實際應用中,兩者應該互相結合,根據具體業務需求選擇適當的思路和算法,最終綜合考慮效果和資源匹配等因素來確定最佳解決方案。大數據分析及挖掘常用方法常用大數據分析挖掘方法簡介1神經網絡方法神經網絡因其魯棒性、自適應性、并行處理、分布存儲和高容錯性等特點,成為解決數據挖掘問題的理想選擇,近年來備受關注。2遺傳算法遺傳算法是一種仿生全局優化方法,基于自然選擇和遺傳機理,具有隱含并行性和易于與其他模型結合的特性,因此在數據挖掘中廣泛應用。3決策樹方法決策樹是一種常用于預測模型的算法,通過有目的地分類數據來提取有價值的信息。它簡單易懂,處理大規模數據速度快。4粗集方法

粗集理論是處理不確定知識的數學工具。它具有幾個優點:不需額外信息、簡化信息表達、操作簡單。粗集通常用于處理類似二維關系表的數據。5覆蓋正例排斥反例方法利用覆蓋所有正例、排斥所有反例的思想來找規則。首先選擇一個正例種子,逐個與反例比較,與字段取值相符則保留,否則舍去。如此循環所有正例種子,得到正例的規則(選擇子的合取式)。大數據分析及挖掘常用工具常用大數據分析挖掘工具簡介1HadoopHadoop是一個分布式處理大數據的軟件框架,以可靠、高效、可伸縮為特點。它可靠,因為它考慮了計算和存儲的故障,保持多個數據副本以應對失敗。它高效,通過并行處理提高速度。它可伸縮,適用于PB級數據。Hadoop基于社區服務器,成本較低,對所有人都開放。2SparkSpark是Apache基金會的開源項目,由加州大學伯克利分校實驗室開發,是一種重要的分布式計算系統。與Hadoop不同,Spark使用內存存儲數據,因此速度可達Hadoop的100倍以上。但由于內存數據會丟失,不適用于長期保存的數據處理。Spark已經將大部分數據挖掘算法從單機遷移到分布式,提供了方便的數據分析可視化界面。3StormStorm是Twitter推廣的分布式計算系統,由BackType團隊開發,是Apache基金會的項目。它在Hadoop基礎上提供了實時計算能力,可實時處理大數據流。與Hadoop和Spark不同,Storm不收集或存儲數據,而是直接通過網絡實時接收、處理和傳輸數據及結果。大數據展現及應用應用商業智能政府決策公共服務大數據重點應用三大領域大數據展現及應用大數據檢索大數據檢索是大數據展現及應用中的重要一環,因為數據集很大很復雜,所以它們需要特別涉及的硬件和軟件工具。大數據檢索工具簡介1ApacheDrillDrill是一個低延遲的分布式數據查詢引擎,支持大規模數據(包括結構化、半結構化和嵌套數據)。它使用ANSISQL語法,可以連接本地文件、HDFS、HBase、MongoDB等后端存儲,同時支持多種數據格式,如Parquet、JSON、CSV等。2PrestoFaceBook在2013年11月開源了Presto,一個專注于高速、實時數據分析的分布式SQL查詢引擎。它支持標準的ANSISQL,包括復雜查詢、聚合、連接和窗口函數。Presto設計了一個簡單的數據存儲抽象層,使得可以在不同的數據存儲系統上(包括HBase、HDFS、Scribe等)使用SQL進行查詢。3ApacheKylin這是一個開源的分布式分析引擎,提供Hadoop/Spark之上的SQL查詢接口及多維分析(OLAP)能力以支持超大規模數據,最初由eBayInc.開發并貢獻至開源社區。它能在亞秒內查詢巨大的Hive表。大數據展現及應用大數據可視化大數據可視化是根據數據的特性,如時間和空間信息等,采用圖表、圖形和地圖等可視化方式,將數據直觀呈現,幫助人們理解數據并發現其中的規律和信息。Jupyter:大數據可視化的一站式商店GoogleChart:Google支持的免費而強大的整合功能D3.js:以任何你需要的方式直觀的顯示大數據大數據展現及應用大數據應用我國大數據市場產值圖(單位:億元)醫療大數據生物大數據金融大數據零售大數據電商大數據農牧大數據交通大數據政府調控和財政支出大數據展現及應用大數據安全大數據安全體系個人隱私保護數據安全大數據平臺安全提綱9.1 大數據背景與概述9.2 大數據處理關鍵技術9.3 大數據計算模式9.4基于Hadoop的大數據編程實踐9.5基于Spark的大數據編程實踐MapReduce介紹MapReduce的運行模型MapReduce是一種面向大數據的批處理計算模式,由Google提出,適用于處理大規模數據集。它借用了函數式編程理念中的"Map"和"Reduce"概念。在該模型中,用戶需要定義一個"Map"函數,將一組鍵值對映射為新的鍵值對,同時指定一個并發的"Reduce"函數,用于歸納所有映射結果中共享相同鍵的數據。MapReduce實現原理MapReduce執行流程MapReduce實現原理1、用戶程序中的MapReduce庫首先將輸入文件分割成M個片段,每個片段通常大小在16到64MB之間(可由用戶通過可選參數控制),然后在集群中啟動大規模的數據拷貝操作。MapReduce實現原理2、這些程序拷貝中的一個是master,其他的都是由master分配任務的worker。有M個map任務和R個reduce任務將被分配。master分配一個map任務或reduce任務給一個空閑的worker。MapReduce實現原理3、一個被分配了map任務的worker讀取相關輸入split的內容。它從輸入數據中分析出key/value對,然后把key/value對傳遞給用戶自定義的map函數。由map函數產生的中間key/value對被緩存在內存中。MapReduce實現原理4、緩存在內存中的key/value對被周期性的寫入到本地磁盤上,通過分割函數把它們寫入R個區域。在本地磁盤上的緩存對的位置被傳送給master,master負責把這些位置傳送給reduceworker。MapReduce實現原理5、reduceworker收到master位置通知后,通過遠程過程調用從mapworker的磁盤讀取緩存數據。讀取所有中間數據后,它會對具有相同key的數據進行排序和聚合,因為多個不同的key可能映射到同一個reduce任務,所以排序是必要的。如果中間數據超過內存大小,將需要進行外部排序。MapReduce實現原理6、reduceworker迭代排過序的中間數據,對于遇到的每一個唯一的中間key,它把key和相關的中間value集傳遞給用戶自定義的reduce函數。reduce函數的輸出被添加到這個reduce分割的最終的輸出文件中。MapReduce優勢劣勢1、移動計算而不是移動數據,避免了額外的網絡負載;2、任務相互獨立,易于處理局部故障,單個節點故障只需重啟節點任務。防止故障擴散到整個集群,允許處理同步中的錯誤。備份任務可提升拖延任務的執行速度;3、MapReduce模型是可線性擴展的;4、MapReduce模型結構簡單,用戶只需要至少編寫Map和Reduce函數;5、相對于其他分布式模型,MapReduce的一大特點是其平坦的集群擴展代價曲線。在大規模集群時,MapReduce表現非常好。1、MapReduce模型缺乏一個中心用于同步各個任務;2、由于MapReduce模型是沒有索引結構,用MapReduce模型來實現常見的數據庫連接操作非常麻煩且效率低下;3、MapReduce集群管理比較麻煩,在集群中調試、部署以及日志收集工作都很困難;4、單個Master節點有單點故障的可能性且可能會限制集群的擴展性;5、當中間結果必須給保留的時候,作業的管理并不簡單;6、對于集群的參數配置的最優解并非顯然,許多參數需要有豐富的應用經驗才能確定。Spark介紹Spark由加州大學伯克利分校AMP實驗室開發,可用來構建大型的、低延遲的數據分析應用程序,是一種面向大數據處理的分布式內存計算模式或框架。Spark生態環境Spark總體架構Spark總體架構DriverProgram:運行main函數并且新建SparkContext的程序;SparkContext:Spark程序的入口,負責調度各個運算資源,協調各個WorkerNode上的Executor;Application:基于Spark的用戶程序,包含了driver程序和集群上的executor;ClusterManager:集群的資源管理器(例如:Standalone,Mesos,Yarn);Spark總體架構Spark總體架構WorkerNode:集群中任何可以運行應用代碼的節點;Executor:是在一個WorkerNode上為某應用啟動的一個進程,該進程負責運行任務,并且負責將數據存在內存或者磁盤上。每個應用都有各自獨立的Executors;Task:被送到某個Executor上的工作單元Spark總體架構Spark運行狀態Q:一個用戶程序是如何從提交到最終到集群上執行的呢?Spark彈性分布數據集RDDRDD(ResilientDistributedDataset),作為Spark的核心數據結構,是一個基于分區的、只讀的數據記錄集抽象。它是邏輯集中的實體,但在集群中的多臺機器上進行了分區。通過對多臺機器上不同RDD聯合分區的控制,可以減少機器之間數據混合(DataShuffling)。RDD可緩存在RAM中,提供更快的數據訪問。目前只支持整個RDD級別的緩存,當集群內存不足時可以根據LRU算法替換RDD。Spark彈性分布數據集RDDRDD提供抽象數據架構,隱藏底層分布性,應用邏輯通過Transformation和Action來表達一系列轉換處理,前者在RDD之間指定處理的相互依賴關系有向無環圖DAG,后者指定輸出的形式。調度程序通過拓撲排序確定DAG執行順序,追蹤最源頭的節點或者代表緩存RDD的節點。用戶通過Transformation和Action操作控制RDD轉換和輸出。Transformation延遲執行,根據Action生成各代RDD,最終生成輸出。用戶通過選擇Transformation的類型并定義Transformation中的函數來控制RDD之間的轉換關系。當用戶調用不同類型的Action操作來把任務以自己需要的形式輸出。Transformation在定義時并沒有立刻被執行,而是等到第一個Action操作到來時,在根據Transformation生成各代RDD.最后由RDD生成最后的輸出。SparkRDD依賴的類型在RDD依賴關系有向無環圖中,RDD之間的關系由Transformation來確定,根據Transformation的類型,生成的依賴關系有兩種形式:寬依賴與窄依賴。窄依賴和寬依賴SparkRDD依賴的類型–窄依賴窄依賴是指父RDD的每一個分區最多被一個子RDD的分區所用,表現為一個父RDD的分區對應于一個子RDD的分區或多個父RDD的分區對應于一個子RDD的分區,也就是說一個父RDD的一個分區不可能對應一個子RDD的多個分區。窄依賴的RDD可以通過相同的鍵進行聯合分區,整個操作都可以在一臺機器上進行,不會造成網絡之間的數據混合。窄依賴SparkRDD依賴的類型–寬依賴寬依賴是指子RDD的分區依賴于父RDD的多個分區或所有分區,也就是說存在一個父RDD的一個分區對應一個子RDD的多個分區。寬依賴的RDD就會涉及到數據混合。調度程序會檢查依賴性的類型,將窄依賴的RDD劃到一組處理當中,即stage。寬依賴在一個執行中會跨越連續的stage,同時需要顯式指定多個子RDD的分區。寬依賴SparkSpark的任務生成模式Spark任務生成模式RDD分區RDD之間關系1、用戶提交的計算任務是由RDD構成的DAG;2、若RDD轉換是寬依賴,這個寬依賴轉換就會將這個DAG分為了不同階段的stage,不同的stage不可以進行并行計算;3、運行時,Spark會把任務集合提交給任務調度器處理;4、RDD之間是窄依賴的,都歸到一個stage里。Spark分析:Spark迭代性能遠超Hadoop的原因是什么?Spark與Hadoop迭代過程比較流式計算流式數據流式大數據是隨著時間而無限增加的數據序列,簡稱為流數據。流式大數據數據量大時效性高數據源不單一有序處理流式大數據特征流式計算流式計算系統流式計算是對流式數據進行實時分析計算的一種技術。它能很好地滿足流數據處理的實時性和可靠性的要求。目前,具體代表性的大數據流式計算系統架構主要有兩類,一類是有中心的主從式架構,一類是去中心化的對等式架構。主從式架構對等式架構金融銀行業互聯網物聯網應用場景流式計算典型流式計算系統–SparkStreamingSparkStreaming處理流程SparkStreaming是在Spark基礎上擴展的實時計算框架,能夠實現高吞吐量的、容錯處理的流式數據處理。其中,SparkStreaming中將流數據分為許多微批數據的引擎為SparkCore,它將流數據分為許多段微小的數據,再將這些數據轉換成RDD,利用Spark系統的SparkEngine對RDD進行Transformation處理,將結果保存在內存中。SparkRDD容錯性微批數據SparkEngine實時性流式計算典型流式計算系統–Storm系統Strom系統是由Twitter支持開發的一個分布式、實時的高容錯開源流式計算系統,側重于低延遲。與微批處理不同,Storm系統直接采用原生數據處理,成本較大。Storm系統拓撲Storm系統計算的作業邏輯單元是一個叫作Thrift的拓撲結構,由以下組件構成:Spout組件拓撲的起始單元,從外部讀取原生數據流;Bolt組件拓撲的處理單元,對接收來的Tuples元組進行過濾、聚合、連接等處理,以流形式輸出。流式計算典型流式計算系統–Storm系統Storm系統架構主從式架構設計由一個主節點nimbus、多個從節點supervisor和Zookeeper集群組成主節點和從節點由Zookeeper進行協調流式計算典型流式計算系統–Storm系統Storm系統數據交互將原生數據流處理成拓撲,提交給主節點Nimbus主節點nimbus從zookeeper集群中獲得心跳信息,根據系統情況分配資源和任務給從節點Supervisor執行從節點監聽到任務后啟動或關閉Worker進程執行任務;Worker執行任務,把相關信息發送給Zookeeper集群存儲。優勢:不足:單數據流處理,延時極低單數據流丟失難以維護,不適合邏輯復雜、容錯性要求高的工作流式計算典型流式計算系統–S4系統S4系統(SimpleScalableStreamingSystem)是雅虎用Java語言開發的通用、分布式、低延時、可擴展、可拔插的大數據流式計算系統,它采用的也是原生流數據處理。S4系統任務拓撲S4系統的基本計算單元:函數表示PE的功能與配置;事件類型表示PE接收的事件類型;主鍵鍵值定義每個PE只處理事件類型、主鍵、鍵值都匹配的事件。不匹配則創建新的處理單元。(K,A)流式計算典型流式計算系統–S4系統S4系統架構客戶端驅動TCP/IP協議棧對等式架構用戶服務請求流式計算典型流式計算系統–Kafka系統Kafka系統架構Kafka系統是由Linkedin支持開發的分布式、高吞吐量、開源的發布訂閱消息系統,能夠有效處理活躍的流式數據,側重于系統吞吐量。消息發布者Producer緩存代理Broker訂閱者Consumermessage狀態管理、負載均衡流式計算典型流式計算系統–Kafka系統根據消息源的類型將其分為不同的主題topic,每個topic包含一個或多個partition消息發布者按照指定的partition方法,給每個消息綁定一個鍵值,保證將消息推送到相應的topic的partition中,每個partition代表一個有序的消息隊列緩存代理將消息持久化到磁盤,設置消息的保留時間,系統僅存儲未讀消息。訂閱者訂閱了某一個主題topic,則從緩存代理中拉取該主題的所有具有相同鍵值的消息。Kafka系統消息處理流程優勢:不足:可擴展性低延時性可快速處理大量流數據,適合吞吐量高的工作負載僅支持部分容錯代理緩存沒有副本節點流式計算典型流式計算系統--總結性能指標SparkStreamingStorm系統S4系統Kafka系統系統架構主從式架構主從式架構對等式架構主從式架構開發語言JavaClojure,JavaJavaScala數據傳輸方式拉取拉取推送推送拉取容錯機制作業級容錯作業級容錯部分容錯部分容錯負載均衡支持不支持不支持部分支持資源利用率高高低低狀態持久化支持不支持支持不支持編程模型純編程純編程編程+XML純編程提綱9.1 大數據背景與概述9.2 大數據處理關鍵技術9.3 大數據計算模式9.4基于Hadoop的大數據編程實踐9.5基于Spark的大數據編程實踐Hadoop環境的搭建單機偽分布環境搭建環境要求:Linux操作系統Centos7發行版,Java環境(1.8版本的JDK)。第一步:下載Hadoop壓縮包并解壓到任意目錄,由于權限問題建議解壓到當前用戶的主目錄(home)。(下載地址:/apache/hadoop/common/hadoop-2.10.0/hadoop-2.10.0.tar.gz)。Hadoop環境的搭建單機偽分布環境搭建環境要求:Linux操作系統Centos7發行版,Java環境(1.8版本的JDK)。第二步:修改Hadoop的配置文件:etc/hadoop/hadoop-env.sh、etc/hadoop/hdfs-site.xml、etc/hadoop/core-site.xml。(如果只是部署HDFS環境只需要修改這三個文件,如需配置MapReduce環境請參考相關文檔)#conf/core-site.xml修改如下:<configuration><property><name></name><value>HDFS://localhost:9000</value></property></configuration>#conf/HDFS-site.xml修改如下(這里只設置了副本數為1):<configuration><property><name>dfs.replication</name><value>1</value></property></configuration>#etc/hadoop/Hadoop-env.sh中修改了JAVA_HOME的值exportJAVA_HOME=/home/Hadoop/jdkHadoop環境的搭建單機偽分布環境搭建環境要求:Linux操作系統Centos7發行版,Java環境(1.8版本的JDK)。第三步:配置ssh自動免密碼登錄。1、運行ssh-keygen命令并一路回車使用默認設置,產生一對ssh密鑰。2、執行ssh-copy-id-i~/.ssh/id_rsa.publocalhost把剛剛產生的公鑰加入到當前主機的信任密鑰中,這樣當前使用的用戶就可以使用ssh無密碼登錄到當前主機。第四步:第一次啟動HDFS集群時需要格式化HDFS,在master主機上執行hadoop

namenode-format進行格式化。如果格式化成功后,則在Hadoop所在的目錄執行sbin/start-dfs.sh開啟HDFS服務。查看HFDS是否正確運行可以執行jps命令進行查詢。Hadoop環境的搭建多節點全分布搭建本實例中,每個節點都需要使用固定IP并保持相同的Hadoop配置文件,每個節點Hadoop和jdk所在路徑都相同、存在相同的用戶且配置好免密碼登錄。第一步:與單機偽分布模式相同,下載Hadoop的二進制包,并解壓備用。1、etc/hadoop/hadoop-env.sh中修改變了JAVA_HOME的值為JDK所在路徑。2、etc/hadoop/core-site.xml修改如下:第二步:修改Hadoop配置文件,與偽分布有些許不同。Hadoop環境的搭建多節點全分布搭建3、etc/hadoop/hdfs-site.xml修改如下:(其中.dir

和dfs.data.dir可以任意指定,注意權限問題)第二步:修改Hadoop配置文件,與偽分布有些許不同。Hadoop環境的搭建多節點全分布搭建4、先執行mvmapred-site.xml.templatemapred-site.xml,然后修改mapred-site.xml如下第二步:修改Hadoop配置文件,與偽分布有些許不同。Hadoop環境的搭建多節點全分布搭建第二步:修改Hadoop配置文件,與偽分布有些許不同。5、etc/hadoop/yarn-site.xml修改如下:6、etc/hadoop/masters(需要自己新建文件)里添加secondarynamenode

主機名,比如任意slave的主機名。7、etc/hadoop/slaves里面添加各個slave的主機名,每行一個主機名。Hadoop環境的搭建多節點全分布搭建第三步:配置hosts文件或做好dns解析。以修改hosts文件為例子,在/etc/hosts里面添加所有主機的IP以及主機名。每個節點都使用相同的hosts文件。第四步:配置ssh自動登錄,確保master主機能夠使用當前用戶免密碼登錄到各個slave主機上。在master上執行ssh-keygen命令,并使用以下命令將master的公鑰添加到全部節點的信任列表上。ssh-copy-id-i~/.ssh/id_rsa.pubmasterssh-copy-id-i~/.ssh/id_rsa.pubslave1ssh-copy-id-i~/.ssh/id_rsa.pubslave2Hadoop環境的搭建多節點全分布搭建第五步:第一次啟動HDFS集群時需要格式化HDFS,在master主機上執行hadoopnamenode-format,這一操作和偽分布相同。啟動HDFS集群,在master主機上的Hadoop所在目錄運行sbin/start-dfs.sh啟動dfs。運行sbin/start-yarn.sh啟動yarn。運行jps可檢查各個節點是否順利啟動,具體顯示如下所示。基于MAPREDUCE程序實例(HDFS)本例基于IntelliJIDEA2019.1.3x64和Hadoop2.10.0組成的環境。在idea中新建maven工程,命名為bigdata。基于MAPREDUCE程序實例(HDFS)本例基于IntelliJIDEA2019.1.3x64和Hadoop2.10.0組成的環境。2.添加maven依賴,添加如下dependency<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-common</artifactId>

<version>2.10.0</version></dependency><dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-client</artifactId>

<version>2.10.0</version></dependency><dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-hdfs</artifactId>

<version>2.10.0</version></dependency><dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-mapreduce-client-core</artifactId>

<version>2.10.0</version></dependency><dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-mapreduce-client-jobclient</artifactId>

<version>2.10.0</version></dependency><dependency>

<groupId>log4j</groupId>

<artifactId>log4j</artifactId>

<version>1.2.17</version></dependency>續基于MAPREDUCE程序實例(HDFS)本例基于IntelliJIDEA2019.1.3x64和Hadoop2.10.0組成的環境。3.編寫wordcount程序在/src/main/java下新建Mapreduce包,包內新建WordCount類,在WordCount.java下編寫源代碼。4.將maven工程打包在右側maven工具欄中選擇Lifecycle/package,點擊Runmavenbuild:打包完成后,在項目的target文件夾中找到打包好的bigdata-1.0-SNAPSHOT.jar,將其重命名為WordCount.jar基于MAPREDUCE程序實例(HDFS)基于新API的WordCount分析基于MAPREDUCE程序實例(HDFS)基于新API的WordCount分析1.源代碼程序public

class

WordCount{

public

static

class

TokenizerMapper

extends

Mapper<Object,

Text,

Text,

IntWritable>{

private

final

static

IntWritable

one

=

new

IntWritable(1);

private

Text

word

=

new

Text();

public

void

map(Object

key,

Text

value,

Context

context)

throws

IOException,

InterruptedException{

StringTokenizer

itr

=

new

StringTokenizer(value.toString());

while(itr.hasMoreTokens()){

this.word.set(itr.nextToken());

context.write(this.word,one);

}

}

}

public

static

class

IntSumReducer

extends

Reducer<Text,IntWritable,Text,IntWritable>{

private

IntWritable

result

=

new

IntWritable();

public

void

reduce(Text

key,

Iterable<IntWritable>values,Context

context)

throws

IOException,

InterruptedException{

int

sum

=

0;

for(Iterator

i

=

values.iterator();i.hasNext();sum+=

val.get()){

val=(IntWritable)i.next();

}

this.result.set(sum);

context.write(key,

this.result);

}

}基于MAPREDUCE程序實例(HDFS)基于新API的WordCount分析1.源代碼程序

public

static

void

main(String[]args)throws

IOException,

ClassNotFoundException,

InterruptedException{

Configuration

conf

=

new

Configuration();

String[]otherArgs

=

new

GenericOptionsParser(conf,args).getRemainingArgs();

if(otherArgs.length

!=

2){

System.err.println("Usage:wordcount<in><out>");

System.exit(2);

}

Job

job

=

Job.getInstance(conf,

"wordcount");

job.setJarByClass(WordCount.class);

job.setMapperClass(WordCount.TokenizerMapper.class);

job.setCombinerClass(WordCount.IntSumReducer.class);

job.setReducerClass(WordCount.IntSumReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job,

new

Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job,

new

Path(otherArgs[1]));

System.exit(job.waitForCompletion(true)?

0

:

1);

}

}基于MAPREDUCE程序實例(HDFS)基于新API的WordCount分析1.Map過程public

static

class

TokenizerMapperextends

Mapper<Object,

Text,

Text,

IntWritable>{

private

final

static

IntWritable

one

=

new

IntWritable(1);

private

Text

word

=

new

Text();

public

void

map(Object

key,

Text

value,

Context

context)

throws

IOException,

InterruptedException{

StringTokenizer

itr

=

new

StringTokenizer(value.toString());

while(itr.hasMoreTokens()){

this.word.set(itr.nextToken());

context.write(this.word,one);

}

}}基于MAPREDUCE程序實例(HDFS)基于新API的WordCount分析2.Reduce過程public

static

class

IntSumReducerextends

Reducer<Text,IntWritable,Text,IntWritable>{

private

IntWritable

result

=

new

IntWritable();

public

void

reduce(Text

key,

Iterable<IntWritable>values,Context

context)

throws

IOException,

InterruptedException{

int

sum

=

0;

for(Iterator

i

=

values.iterator();i.hasNext();sum+=

val.get()){

val=(IntWritable)i.next();

}

this.result.set(sum);

context.write(key,

this.result);

}}基于MAPREDUCE程序實例(HDFS)基于新API的WordCount分析3.執行MapReduce任務public

static

void

main(String[]args)throwsException{

Configuration

conf

=

new

Configuration();

String[]otherArgs

=

new

GenericOptionsParser(conf,args).getRemainingArgs();

if(otherArgs.length

!=

2){

System.err.println("Usage:wordcount<in><out>");

System.exit(2);

}

Job

job

=

new

Job(conf,

"wordcount");

job.setJarByClass(WordCount.class);

job.setMapperClass(WordCount.TokenizerMapper.class);

job.setCombinerClass(WordCount.IntSumReducer.class);

job.setReducerClass(WordCount.IntSumReduce.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job,

new

Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job,

new

Path(otherArgs[1]));

System.exit(job.waitForCompletion(true)?

0

:

1);}基于MAPREDUCE程序實例(HBase)1.添加Maven依賴新增dependency依賴如下:<dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase-shaded-client</artifactId>

<version>2.2.4</version></dependency><dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase-common</artifactId>

<version>2.2.4</version></dependency><dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase-client</artifactId>

<version>2.2.4</version></dependency><dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase-mapreduce</artifactId>

<version>2.2.4</version></dependency><dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase-server</artifactId>

<version>2.2.4</version></dependency>

<dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase-endpoint</artifactId>

<version>2.2.4</version></dependency><dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase-metrics-api</artifactId>

<version>2.2.4</version></dependency><dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase-thrift</artifactId>

<version>2.2.4</version></dependency><dependency>

<groupId>org.apache.hbase</groupId>

<artifactId>hbase-rest</artifactId>

<version>2.2.4</version></dependency>注意:HBase的lib目錄下的Hadoop-core文件版本需要與Hadoop的版本對應,不然會出現無法連接的情況。基于MAPREDUCE程序實例(HBase)1.基于Hbase的WordCount實例程序1public

static

class

IntSumReducer

extendsTableReducer

<Text,IntWritable,ImmutableBytesWritable>{

private

IntWritable

result

=

new

IntWritable();

public

void

reduce(Text

key,

Iterable<IntWritable>values,

Context

context)throws

IOException,

InterruptedException{

int

sum

=

0;

for(IntWritable

val

:values){

sum+=

val.get();

}

result.set(sum);

Put

put

=

new

Put(key.getBytes());//put實例化,每一個詞存一行

//列族為content,列修飾符為count,列值為數目

put.addColumn(Bytes.toBytes("content"),Bytes.toBytes("count"),

Bytes.toBytes(String.valueOf(sum)));

context.write(new

ImmutableBytesWritable(key.getBytes()),put);

}}基于MAPREDUCE程序實例(HBase)1.基于Hbase的WordCount實例程序1public

static

void

main(String[]args)throwsException{

TableName

tablename

=

TableName.valueOf("wordcount");

//實例化Configuration,注意不能用newHBaseConfiguration()了。

Configuration

conf

=

HBaseConfiguration.create();

Connection

conn

=

ConnectionFactory.createConnection(conf);

Admin

admin

=

conn.getAdmin();

if(admin.tableExists(tablename)){

System.out.println("tableexists!recreating...");

admin.disableTable(tablename);

admin.deleteTable(tablename);

}

TableDescriptorBuilder

tdb

=

TableDescriptorBuilder.newBuilder(tablename);

HTableDescriptor

htd

=

new

HTableDescriptor(tablename);

HColumnDescriptor

hcd

=

new

HColumnDescriptor("content");

tdb.addFamily(hcd);

//創建列族

admin.createTable(tdb.build());//創建表

String[]otherArgs

=

new

GenericOptionsParser(conf,args).getRemainingArgs();

if(otherArgs.length

!=

1){

System.err.println("Usage:wordcount<in><out>"+otherArgs.length);

System.exit(2);

}基于MAPREDUCE程序實例(HBase)1.基于Hbase的WordCount實例程序1

Job

job

=

Job.getInstance(conf,

"wordcount");

job.setJarByClass(WordCountHBase.class);

job.setMapperClass(TokenizerMapper.class);

//job.setCombinerClass(IntSumReducer.class);

FileInputFormat.addInputPath(job,

new

Path(otherArgs[0]));

//此處的TableMapReduceUtil注意要用Hadoop.HBase.MapReduce包中的,而不是Hadoop.HBase.mapred包中的

TableMapReduceUtil.initTableReducerJob(tablename,

IntSumReducer.class,job);

//key和value到類型設定最好放在initTableReducerJob函數后面,否則會報錯

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true)?

0

:

1);}注意:此處的TableMapReduceUtil是Hadoop.HBase.MapReduce包中的,而不是Hadoop.HBase.mapred包中的,否則會報錯。基于MAPREDUCE程序實例(HBase)2.基于Hbase的WordCount實例程序2public

static

class

TokenizerMapper

extends

TableMapper<Text,

Text>{

public

void

map(ImmutableBytesWritable

row,

Result

values,

Context

context)throws

IOException,

InterruptedException{

StringBuffer

sb

=

new

StringBuffer("");

for(java.util.Map.Entry<byte[],byte[]>value

:

values.getFamilyMap("content".getBytes()).entrySet()){

//將字節數組轉換成String類型,需要newString();

String

str

=

new

String(value.getValue());

if(str!=

null){

sb.append(new

String(value.getKey()));

sb.append(":");

sb.append(str);

}

context.write(new

Text(row.get()),

new

Text(new

String(sb)));

}

}}map函數繼承到TableMapper接口,從result中讀取查詢結果。基于MAPREDUCE程序實例(

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論