年4月大數據na-上課版01spark技術原理_第1頁
年4月大數據na-上課版01spark技術原理_第2頁
年4月大數據na-上課版01spark技術原理_第3頁
年4月大數據na-上課版01spark技術原理_第4頁
年4月大數據na-上課版01spark技術原理_第5頁
免費預覽已結束,剩余50頁可下載查看

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

Spark技術原理Spark概述Spark基本功能技術架構Spark在FusionInsight中的集成情況

是什么?是一個基于內存的分布式批處理引擎。由AMPLAB貢獻到Apache社區的開源項目,是AMP大數據棧的基礎組件。做什么?數據處理(DataProcessing):可以用來快速處理數據,兼具容錯性和可擴展性。迭代計算(Iterative

Computation):支持迭代計算,有效應對多步數據處理邏輯。數據挖掘(DataMining):在海量數據基礎上進行復雜的挖掘分析,可支持各種數據挖掘和機器學習算法。Spark概述Spark的特點快Spark對小數據集可達到亞秒級的延遲Spark巧巧妙借力現有大數據組件輕Spark核心代碼有3萬行靈Spark提供了不同層面的靈活性Spark適用場景適用場景對比Hadoop性能上提升高于100倍。Spark的中間數據放在內存中,對于迭代運算的效率更高,進行批處理時更高效。更低的延遲。Spark提供更多的數據集操作類型,編程模型比Hadoop更靈活,開發效率更高。更高的容錯能力(血統機制)。數據處理,ETL(抽取、轉換、加載)。機器學習。交互式分析。特別適用于迭代計算,數據重復利用場景。需要反復操作的次數越多,所需讀取的數據量越大,受益越大。Spark概述Spark基本功能技術架構SparkCoreSparkSQLSparkStreamingSpark在FusionInsight中的集成情況Spark技術架構SparkFusionInsightSpark架構StandaloneSchedulerStandaloneSchedulerApplication:帶有自己需要的mem和cpu資源量,會在master里排隊,最后被分發到worker上執行。App的啟動是去各個worker遍歷,獲取可用的cpu,然后去各個workerlaunchtask。Worker:每臺slave起一個(也可以起多個),默認或被設置cpu和mem數,并在內存里做加減維護資源剩余量。Worker同時負責拉起本地的executorbackend,即執行進程。Master:接受Worker、App的注冊,為App執行資源分配。Master和Worker本質上都是一個帶Actor的進程。StandaloneScheduler第一步,registerworker是一個啟動集群和搜集初始資源的過程。worker啟動后把自己注冊給master,從而master維護worker上的資源量和worker本身host、port等的信息。第二步,master接收新App的注冊。master遍歷等待的drivers,為每個driver輪詢遍歷aliveworkers,只要worker的剩余mem和cpu滿足該driver,那么就向該worker發送LaunchDriver的消息,里面包含driver的信息。接著,遍歷所有waitingApps,為每個App遍歷可用的worker,為其分配cpu,App的cpus可以分布在不同的worker上。App向資源滿足條件的Worker傳遞LaunchExecutor的消息,并向所屬的driver傳遞ExecutorAdded的消息。第三步,launchexecutor。master為App分配了落在若干worker上的executors,然后對于每一個executor,master都會通知其worker去啟動。第四步,app自己來launchtask。Spark應用運行流程—關鍵角色Client:需求提出方,負責提交需求(應用)。Driver:負責應用的業務邏輯和運行規劃(DAGScheduler/taskScheduler)ApplicationMaster:負責應用的資源管理,根據應用需要,向資源管理部門(ResourceManager)申請資源。ResourceManager:資源管理部門,負責整個集群資源統一調度和分配。Executor:負責實際計算工作,一個應用會分拆給多個Executor來進行計算。Spark應用運行流程(client模式)ClientDriverApplicationMasterResourceManagerExecutor1、啟動ExecutorExecutorExecutor2、申請AM3、啟動AM4、申請Container5、分配Container9、注銷應用…7、注冊8、分配Task6、啟動ExecutorSpark應用運行流程(yarn模式)Spark基本概念Application:

Spark用戶程序,提交一次應用為一個Application,一個App會啟動一個SparkContext,也就是Application的Driver,驅動整個Application的運行。Job:

一個Application可能包含多個Job,每個action算子對應一個Job;action算子有collect,count等。Stage:

每個Job可能包含多層Stage,劃分標記為shuffle過程;Stage按照依賴關系依次執行。Task:

具體執行任務的基本單位,被發到executor上執行。RDD的存儲和分區用戶可以選擇不同的存儲級別存儲RDD以便重用(11種)。當前RDD默認存儲于內存,但當內存不足時,RDD會溢出到磁盤中。RDD在需要進行分區時會根據每條記錄Key進行分區,以此保證兩個數據集能高效進行Join操作。RDD的生成從Hadoop文件系統(或與Hadoop兼容的其它存儲系統)輸入創建。并行化創建。從父RDD轉換得到新的RDD。RDD的優點RDD的只讀性提供更高容錯能力。RDD的不可變性可以實現HadoopMapReduce的推測式執行。RDD的數據分區特性,可以通過數據的本地性來提高性能。RDD都是可序列化的,在內存不足時可自動降級為磁盤存儲。RDD的特點失敗自動重建。可以控制存儲級別(內存,磁盤等)來進行重用。是靜態類型的。RDD(ResilientDistributedDatasets)即彈性分布數據集,指的是一個只讀的,可分區的分布式數據集。這個數據集的全部或部分可以緩存在內存,在多次計算之間重用。RDDSpark核心概念--RDD返回值還是一個RDD,如map、filter、join等。Transformation都是Lazy的,代碼調用到Transformation的時候,并不會馬上執行,需要等到有Action操作的時候才會啟動真正的計算過程。如count,collect,save等,Action操作是返回結果或者將結果寫入存儲的操作。Action是Spark應用真正執行的觸發動作。ActionTransformationRDD算子:Transformation和Action常用的Transformation:map(f:T=>U):RDD[T]=>RDD[U]?lter(f:T=>Bool):RDD[T]=>RDD[T]?atMap(f:T=>Seq[U]):RDD[T]=>RDD[U]groupByKey():RDD[(K,V)]=>RDD[(K,Seq[V])]reduceByKey(f:(V,V)=>V):RDD[(K,V)]=>RDD[(K,V)]union():(RDD[T],RDD[T])=>RDD[T]join():(RDD[(K,V)],RDD[(K,W)])=>RDD[(K,(V,W))]mapValues(f:V=>W):RDD[(K,V)]=>RDD[(K,W)]partitionBy(p:Partitioner[K]):RDD[(K,V)]=>RDD[(K,V)]常用的Action:count():RDD[T]=>Longcollect():RDD[T]=>Seq[T]reduce(f:(T,T)=>T):RDD[T]=>Tlookup(k:K):RDD[(K,V)]=>Seq[V]RDD

Transformation和Action

RDD常用Action算子

Action含義reduce(func)根據函數聚合數據集里的元素。collect()一般在filter或者足夠小的結果的時候,再用collect封裝返回一個數組。count()統計數據集中元素個數。first()獲取第一個元素。take(n)獲取數據集最上方的幾個元素,返回一個數組。takeOrdered(n,

[ordering])提供自定義比較器,返回比較后最上方的幾個元素。RDD

Transformation和ActionHDFSHDFSflatMapmapreduceByKeytextFileJoin132456saveAsSequenceFileobjectWordCount{defmain(args:Array[String]):Unit={//配置Spark應用名稱

valconf=newSparkConf().setAppName("WordCount")valsc:SparkContext=newSparkContext(conf)valtextFile=sc.textFile("hdfs://...")valcounts=textFile.flatMap(line=>line.split("")).map(word=>(word,1)).reduceByKey(_+_)counts.saveAsTextFile("hdfs://...")}}樣例程序--WordCountHDFS(A,2)(apple,2)(pair,1)(of,1)(shoes,1)(Orange,1)AappleApairofshoesOrangeappleAappleApairofshoesOrangeapple(A,1)(apple,1)(A,1)(pair,1)(of,1)(shoes,1)(Orange,1)(apple,1)HDFSflatMapmapreduceByKeysaveAsTextFileAnappleApairofshoesOrangeappletextFile(A,2)(apple,2)(pair,1)(of,1)(shoes,1)(Orange,1)RDDRDDRDDRDDHDFSHDFS樣例程序--WordCountSpark核心概念–寬依賴和窄依賴RDD父子依賴關系:窄依賴(Narrow)指父RDD的每一個分區最多被一個子RDD的分區所用。寬依賴(Wide)指子RDD的分區依賴于父RDD的所有分區,是Stage劃分的依據。

寬依賴是一個Shuffle的過程,類似于洗牌。Spark核心概念–窄依賴的優勢窄依賴的pipeline優化。邏輯上,每個RDD的算子都是一個fork/join。如果直接轉換到物理實現,是很不經濟的:一是每一個RDD(即使是中間結果)都需要物化到內存或存儲中,費時費空間;二是join作為全局的barrier,是很昂貴的,會被最慢的那個節點拖死。如果子RDD的分區到父RDD的分區是窄依賴,就可以把兩個fork/join合為一個;如果連續的變換算子序列都是窄依賴,就可以把很多個fork/join并為一個,這將極大地提升性能。窄依賴可以支持在同一個clusternode上以管道形式執行多條命令,例如在執行了map后,緊接著執行filter;窄依賴的失敗恢復更有效,因為它只需要重新計算丟失的父partition即可,而且可以并行地在不同節點進行重計算。Sparkshuffle原理Hashshuffle(1.1.0版本)Sortshuffle(1.1.0版本以后)Stage劃分(job邏輯執行)stage的劃分是Spark作業調度的關鍵一步,它基于DAG確定依賴關系,借此來劃分stage,將依賴鏈斷開,每個stage內部可以并行運行,整個作業按照stage順序依次執行,最終完成整個Job。stage中task數目由stage末端的RDD分區個數來決定,RDD轉換是基于分區的一種粗粒度計算,一個stage執行的結果就是這幾個分區構成的RDD。Spark應用調度Spark應用調度流程生成Job提交Job獲取JobID,并向DAGScheduler發送job提交信息劃分stage并注冊提交stage生成task組成一個TaskSet提交整個TaskSet調度Task。將Task序列化并分發到相應節點上運行Spark容錯——血統機制Lineage

每個RDD都包含了他是如何由其他RDD變換過來的以及如何重建某一塊數據的信息。因此RDD的容錯機制又稱“血統(Lineage)”容錯。當這個RDD的部分分區數據丟失時,它可以通過Lineage獲取足夠的信息來重新運算和恢復丟失的數據分區。

窄依賴可以在某個計算節點上直接通過計算父RDD的某塊數據計算得到子RDD對應的某塊數據;寬依賴則要等到父RDD所有數據都計算完成之后,并且父RDD的計算結果進行hash并傳到對應節點上之后才能計算子RDD。Spark容錯——檢查點checkpointDAG中的Lineage過長會造成容錯成本過高,如果重算,則開銷太大。數據丟失時,對于寬依賴則要將父RDD中的所有數據塊全部重新計算來恢復。所以在在寬依賴上做Checkpoint獲得的收益更大。一旦建立檢查點,以前的血統關系會被刪除。Spark概述Spark基本功能技術功架SparkCoreSparkSQLSparkStreamingSpark在FusionInsight中的集成情況SparkSQL所處位置SparkSQL原理spark-corespark-sqlJSONParquetORC…SQLRDDSparkSQL使用方式JDBC腳本和Spark-beeline都需要JDBCServerSpark-sql是在客戶端直接啟動spark應用SparkSQL原理ParseSQLAnalyzeLogicalPlanOptimizeLogicalPlanGeneratePhysicalPlanPrepareedSparkPlanExecuteSQLGenerateRDDExecuteRDD解析SQL語句分析邏輯計劃優化的邏輯計劃生成物理計劃研制spark計劃執行SQL生成RDD執行RDDSparkSQLDataFrameDataFrame是分布式的Row對象的集合。其中數據被組織為命名的列。它概念上等價于關系數據庫中的表,但底層做了更多的優化。DataFrame可以從很多數據源構建,比如:已經存在的RDD、結構化文件、外部數據庫、Hive表。SparkSQLDateSetSpark1.6新添加的接口,

Dataset是一個強類型的、不可變的對象集合。

DataSet具有兩個完全不同的API特征:強類型API和弱類型API。

DataSet包含了DataFrame的功能,相對應地,DataSet是強類型JVMobject的集合。DataFrame是特殊的DataSet,其每行是一個弱類型JVMobject。Spark2.0中兩者統一,DataFrame表示為DataSet[Row],即DataSet的子集。SparkSQLDateSetSpark概述Spark基本功能技術功架SparkCoreSparkSQLSparkStreamingSpark在FusionInsight中的集成情況SparkStreaming所處位置SparkStreaming原理SparkStreaming接收實時的輸入數據流,然后將這些數據切分為批數據供Spark引擎處理,Spark引擎將數據生成最終的結果數據。使用Dstream(Streaming中的一個對象)從Kafka和HDFS等源獲取連續的數據流,Dstreams由一系列連續的RDD組成,每個RDD包含確定時間間隔的數據,任何對Dstreams的操作都轉換成對RDD的操作。SparkStreaming概述數據來源:Kafka、Flume、HDFS、Kinesis、Twitter…數據目的HDFS、Databases、Dashboards…SparkStreaming特性Sparkstreaming特點高吞吐量、容錯能力強數據采集逐條進行,數據處理分批進行優點:粗粒度處理方式可以快速處理小批量數據可以確保處理且僅處理一次,更方便實現容錯恢復機制Dstream操作基于RDD操作,降低學習成本缺點:粗粒度處理引入不可避免的延遲與SparkStreaming的比較SparkStreaming微批處理流程Streaming流處理流程SparkStreamingStreaming任務執行方式執行邏輯即時啟動,運行完回收執行邏輯預先啟動,持續存在事件處理方式事件需積累到一定批量時才進行處理事件實時處理時延秒級毫秒級吞吐量高(約為Streaming的2~5倍)較高SparkStreaming對比StormSparkStreaming窗口可設置滑動間隔和窗口長度當滑動間隔<窗口長度數據重復計算當滑動間隔>窗口長度部分數據不計算當滑動間隔=窗口長度每份數據參與計算且只計算一次數據源可靠性SparkStreaming數據源基本源:HDFS等文件系統,Socket連接等高級源:Kafka等自定義源:需要實現用戶自定義的receiver可靠性(二次開發)ReliableReceiver:能夠正確應答一個可靠源(如Kafka等),數據已經被接收并且被正確復制到spark中設置CheckPoint確保Driver可以自動重啟使用WriteAheadLog功能Spark概述Spark基本功能技術功架Spark在FusionInsight中的集成情況Spark的WebUI呈現服務狀態信息、角色信息以及開放的配置項。管理操作:啟停spark、下載spark客戶端、同步配置。服務總體概況。角色的顯示和健康狀況,點擊相應角色可查看角色下的實例。FusionInsight平臺為

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論