2024大數據技術培訓 Flink_第1頁
2024大數據技術培訓 Flink_第2頁
2024大數據技術培訓 Flink_第3頁
2024大數據技術培訓 Flink_第4頁
2024大數據技術培訓 Flink_第5頁
已閱讀5頁,還剩32頁未讀 繼續免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

大數據培訓課程之Flink第一章概述流處理技術的演變在開源世界里,ApacheStorm項目是流處理的先鋒。Storm最早由NathanMarz和創業公司BackType的一個團隊開發,后來才被Apache基金會接納。Storm提供了低延遲的流處理,但是它為實時性付出了一些代價:很難實現高吞吐,并且其正確性沒能達到通常所需的水平,換句話說,它并不能保證exactly-once,即便是它能夠保證的正確性級別,其開銷也相當大。在低延遲和高吞吐的流處理系統中維持良好的容錯性是非常困難的,但是為了得到有保障的準確狀態,人們想到了一種替代方法:將連續時間中的流數據分割成一系列微小的批量作業。如果分割得足夠小(即所謂的微批處理作業),計算就幾乎可以實現真正的流處理。因為存在延遲,所以不可能做到完全實時,但是每個簡單的應用程序都可以實現僅有幾秒甚至幾亞秒的延遲。這就是在Spark批處理引擎上運行的SparkStreaming所使用的方法。更重要的是,使用微批處理方法,可以實現exactly-once語義,從而保障狀態的一致性。如果一個微批處理失敗了,它可以重新運行,這比連續的流處理方法更容易。StormTrident是對Storm的延伸,它的底層流處理引擎就是基于微批處理方法來進行計算的,從而實現了exactly-once語義,但是在延遲性方面付出了很大的代價。對于StormTrident以及SparkStreaming等微批處理策略,只能根據批量作業時間的倍數進行分割,無法根據實際情況分割事件數據,并且,對于一些對延遲比較敏感的作業,往往需要開發者在寫業務代碼時花費大量精力來提升性能。這些靈活性和表現力方面的缺陷,使得這些微批處理策略開發速度變慢,運維成本變高。于是,Flink出現了,這一技術框架可以避免上述弊端,并且擁有所需的諸多功能,還能按照連續事件高效地處理數據,Flink的部分特性如下圖所示:圖Flink的部分特性初識FlinkFlink起源于Stratosphere項目,Stratosphere是在2010~2014年由3所地處柏林的大學和歐洲的一些其他的大學共同進行的研究項目,2014年4月Stratosphere的代碼被復制并捐贈給了Apache軟件基金會,參加這個孵化項目的初始成員是Stratosphere系統的核心開發人員,2014年12月,Flink一躍成為Apache軟件基金會的頂級項目。Flink一詞表示快速和靈巧,項目采用一只松鼠的彩色圖案作為logo,這不僅是因為松鼠具有快速和靈巧的特點,還因為柏林的松鼠有一種迷人的紅棕色,Flink的松鼠logo擁有可愛的尾巴,尾巴的顏色與Apache軟件基金會的logo顏色相呼應,也就是說,這是一只Apache風格的松鼠。圖FlinkLogoFlink主頁在其頂部展示了該項目的理念ApacheFlink是為分布式、高性能、隨時可用以及準確的流處理應用程序打造的開源流處理框架”。ApacheFlink是一個框架和分布式處理引擎,用于對無界和有界數據流進行有狀態計算。Flink被設計在所有常見的集群環境中運行,以內存執行速度和任意規模來執行計算。Flink核心計算框架FlinkFlinkRuntimeFlinkRuntimeAnotherResourceNegotiator)在集群上運行,也可以在Mesos(Flink應用。圖Flink計算架構上圖為Flink技術棧的核心組成部分,值得一提的是,Flink分別提供了面向流式處理的接口(DataStreamAPI)和面向批處理的接口(DataSetAPI)Flink既可以完成流處理,也可以完成批處理。Flink支持的拓展庫涉及機器學習(FlinkML)、復雜事件處理(CEP)、以及圖計算(Gelly),還有分別針對流處理和批處理的TableAPI。能被FlinkRuntime執行引擎接受的程序很強大,但是這樣的程序有著冗長的代碼,編寫起來也很費力,基于這個原因,Flink提供了封裝在Runtime執行引擎之上的API,以幫助用戶方便地生成流式計算程序。Flink提供了用于流處理的DataStreamAPI和用于批處理的DataSetAPI。值得注意的是,盡管FlinkRuntime執行引擎是基于流處理的,但是DataSetAPI先于DataStreamAPI被開發出來,這是因為工業界對無限流處理的需求在Flink誕生之初并不大。DataStreamAPI可以流暢地分析無限數據流,并且可以用Java或者Scala來實現。開發人員需要基于一個叫DataStream的數據結構來開發,這個數據結構用于表示永不停止的分布式數據流。Flink的分布式特點體現在它能夠在成百上千臺機器上運行,它將大型的計算任務分成許多小的部分,每個機器執行一部分Flink能夠自動地確保發生機器故障或者其他錯誤時計算能夠持續進行,或者在修復bug或進行版本升級后有計劃地再執行一次。這種能力使得開發人員不需要擔心運行失敗。Flink本質上使用容錯性數據流,這使得開發人員可以分析持續生成且永遠不結束的數據(即流處理)。第二章Flink基本架構2.5JobManager與TaskManagerFlink運行時包含了兩種類型的處理器:JobManager也稱之為Master,用于協調分布式執行,它們用來調度task,協調檢查點,協調失敗時恢復等。Flink運行時至少存在一個master處理器,如果配置高可用模式則會存在多個master處理器,它們其中有一個是leader,而其他的都是standby。TaskManager處理器:也稱之為Worker,用于執行一個dataflow的task(或者特殊的subtask)datastreamFlink運行時至少會存在一個worker處理器。圖JobManager與TaskManagerMaster和Worker處理器可以直接在物理機上啟動,或者通過像YARN這樣的資源調度框架。Worker連接到Master,告知自身的可用性進而獲得任務分配。無界數據流與有界數據流Flink用于處理有界和無界數據:無界數據流無界數據流有一個開始但是沒有結束,它們不會在生成時終止并提供數據,必須連續處理無界流,也就是說必須在獲取后立即處理event。對于無界數據流我們無法等待所有數據都到達,因為輸入是無界的,并且在任何時間點都不會完成。處理無界數據通常要求以特定順序(例如事件發生的順序)獲取event,以便能夠推斷結果完整性,無界流的處理稱為流處理。有界數據流有界數據流有明確定義的開始和結束,可以在執行任何計算之前通過獲取所有數據來處理有界流,處理有界流不需要有序獲取,因為可以始終對有界數據集進行排序,有界流的處理也稱為批處理。圖無界數據流與有界數據流在無界數據流和有界數據流中我們提到了批處理和流處理,這是大數據處理系統中常見的兩種數據處理方式。批處理的特點是有界、持久、大量,批處理非常適合需要訪問全套記錄才能完成的計算工作,一般用于離線統計。流處理的特點是無界、實時,流處理方式無需針對整個數據集執行操作,而是對通過系統傳輸的每個數據項執行操作,一般用于實時統計。在Spark生態體系中,對于批處理和流處理采用了不同的技術框架,批處理由SparkSQL實現,流處理由SparkStreaming實現,這也是大部分框架采用的策略,使用獨立的處理器實現批處理和流處理,而Flink可以同時實現批處理和流處理。Flink是如何同時實現批處理與流處理的呢?答案是,Flink將批處理(即處理有限的靜態數據)視作一種特殊的流處理。ApacheFlink是一個面向分布式數據流處理和批量數據處理的開源計算平臺,它能夠基于同一個Flink運行時(FlinkRuntime),提供支持流處理和批處理兩種類型應用的功能。現有的開源計算方案,會把流處理和批處理作為兩種不同的應用類型,因為它們要實現的目標是完全不相同的:流處理一般需要支持低延遲、Exactly-once保證,而批處理需要支持高吞吐、高效處理,所以在實現的時候通常是分別給出兩套實現方法,或者通過一個獨立的開源框架來實現其中每一種處理方案。例如,實現批處理的開源方案有MapReduce、Tez、Crunch、Spark,實現流處理的開源方案有Samza、Storm。Flink在實現流處理和批處理時,與傳統的一些方案完全不同,它從另一個視角看待流處理和批處理,將二者統一起來:Flink是完全支持流處理,也就是說作為流處理看待時輸入數據流是無界的;批處理被作為一種特殊的流處理,只是它的輸入數據流被定義為有界的。基于同一個Flink運行時(FlinkRuntime),分別提供了流處理和批處理API,而這兩種API也是實現上層面向流處理、批處理類型應用框架的基礎。數據流編程模型Flink提供了不同級別的抽象,以開發流或批處理作業,如下圖所示:圖Flink抽象級別最底層級的抽象僅僅提供了有狀態流,它將通過過程函數(ProcessFunction)被嵌入到DataStreamAPI中。底層過程函數(ProcessFunction)與DataStreamAPI相集成,使其可以對某些特定的操作進行底層的抽象,它允許用戶可以自由地處理來自一個或多個數據流的事件,并使用一致的容錯的狀態。除此之外,用戶可以注冊事件時間并處理時間回調,從而使程序可以處理復雜的計算。實際上,大多數應用并不需要上述的底層抽象,而是針對核心CoreAPIs)進行編程,比如DataStreamAPI(有界或無界流數據)以及DataSetAPI(有界數據集)。這些API為數據處理提供了通用的構建模塊,比如由用戶定義的多種形式(transformations),((aggregations),窗口操作(windows)等等。DataSetAPI為有界數據集提供了額外的支持,例如循環與迭代。這些API處理的數據類型以類(classes)的形式由各自的編程語言所表示。TableAPI以表為中心,其中表可能會動態變化(在表達流數據時TableAPI遵循(擴展的)關系模型:表有二維數據結構(schema)(類似于關系數據庫中的表同時API提供可比較的操作,例如selectproject、joingroup-byaggregate等。TableAPI程序聲明式地定義了什么邏輯操作應該執行,而不是準確地確定這些操作代碼的看上去如何。盡管TableAPI可以通過多種類型的用戶自定義函數(進行擴展,其仍不如核心API更具表達能力,但是使用起來卻更加簡潔(碼量更少)。除此之外,TableAPI程序在執行之前會經過內置優化器進行優化。你可以在表與DataStream/DataSet之間無縫切換,以允許程序將TableAPI與DataStream以及DataSet混合使用。Flink提供的最高層級的抽象是SQL。這一層抽象在語法與表達能力上與TableAPI類似,但是是以SQL查詢表達式的形式表現程序。SQL抽象與TableAPI交互密切,同時SQL查詢可以直接在TableAPI定義的表上執行。第三章Flink集群搭建Flink可以選擇的部署方式有:Local、Standalone(資源利用率低)、Yarn、Mesos、Docker、Kubernetes、AWS。我們主要對Standalone模式和Yarn模式下的Flink集群部署進行分析。Standalone模式安裝我們對standalone模式的Flink集群進行安裝,準備三臺虛擬機,其中一臺作為JobManager(hadoop-senior01),另外兩臺作為TaskManager(hadoop-senior02、hadoop-senior03)。在官網下載1.6.1Flink(/dist/flink/flink-1.6.1/)。將安裝包上傳到要按照JobManager的節點(hadoop-senior01)。Linux系統對安裝包進行解壓:修改安裝目錄下conf文件夾內的flink-conf.yaml配置文件,指定JobManager:修改安裝目錄下conf文件夾內的slave配置文件,指定TaskManager:將配置好的Flink目錄分發給其他的兩臺節點:hadoop-senior01節點啟動集群:jps查看進程信息:HTML5Androidpython(官訪問集群web界面(8081端口):模式安裝在官網下載1.6.1Flink(/dist/flink/flink-1.6.1/)。將安裝包上傳到要按照JobManager的節點(hadoop-senior01)。Linux系統對安裝包進行解壓:修改安裝目錄下conf文件夾內的flink-conf.yaml配置文件,指定JobManager:修改安裝目錄下conf文件夾內的slave配置文件,指定TaskManager:將配置好的Flink目錄分發給其他的兩臺節點:明確虛擬機中已經設置好了環境變量HADOOP_HOME。Hadoop集群(HDFSYarn)。在hadoop-senior01節點提交Yarn-Session,使用安裝目錄下bin目錄中的yarn-session.sh腳本進行提交:/opt/modules/flink-1.6.1/bin/yarn-session.sh-n2-s6-jm1024-tm1024-nmtest-d 其中:-n(--container):TaskManager的數量。-s(--slots):每個TaskManager的slot數量,默認一個slot一個core,默認每個taskmanager的slot的個數為1。-jm:JobManager的內存(單位MB)。-tm:每個taskmanager的內存(單位MB)。-nm:yarn的appName(現在yarn的ui上的名字)。-d:后臺執行。啟動后查看YarnWeb頁面,可以看到剛才提交的會話:在提交Session的節點查看進程:Jar到集群運行:/opt/modules/flink-1.6.1/bin/flinkrun-myarn-clusterexamples/batch/WordCount.jar 提交后在YarnWeb頁面查看任務運行情況:任務運行結束后在控制臺打印如下輸出:第四章 Flink運行架構任務提交流程圖Yarn模式任務提交流程Flink任務提交后,Client向HDFS上傳Flink的Jar包和配置,之后向YarnResourceManager提交任務,ResourceManager分配Container資源并通知對應的NodeManager啟動ApplicationMasterApplicationMaster啟動后加載FlinkJar包和配置構建環境,然后啟動JobManager,之后ApplicationMasterResourceManager申請資源啟動TaskManagerResourceManager分配ContainerApplicationMaster通知資源所在節點的NodeManager啟動TaskManager,NodeManager加載FlinkJar包和配置構建環境并啟動TaskManager,TaskManager啟動后向JobManagerJobManager向其分配任務。TaskManager每一個TaskManager是一個JVM進程,它可能會在獨立的線程上執行一個或多個subtask。為了控制一個worker能接收多少個task,worker通過taskslot來進行控制(一個worker至少有一個taskslot)。·每個taskslot表示TaskManager擁有資源的一個固定大小的子集。假如一個TaskManager有三個slot,那么它會將其管理的內存分成三份給各個slot。資源slot化意味著一個subtask將不需要跟來自其他job的subtask競爭被管理的內存,取而代之的是它將擁有一定數量的內存儲備。需要注意的是,這里不會涉及到CPU的隔離,slot目前僅僅用來隔離task的受管理的內存。通過調整taskslot的數量,允許用戶定義subtask之間如何互相隔離。如果一個askManager一個slot,那將意味著每個taskgroup運行在獨立的JVM(該JVM可能是通過一個特定的容器啟動的),而一個TaskManager多個slot意味著更多的subtask可以共享同一個JVM。而在同一個JVM進程中的task將共享TCP連接(于多路復用)和心跳消息。它們也可能共享數據集和數據結構,因此這減少了每個task的負載。圖TaskManager與SlotTaskSlot是靜態的概念,是指TaskManager具有的并發執行能力,可以通過參數taskmanager.numberOfTaskSlots進行配置,而并行度parallelism是動態概念,即TaskManager運行程序時實際使用的并發能力,可以通過參數parallelism.default進行配置。也就是說,假設一共有3個TaskManager,每一個TaskManager中的分配3個TaskSlot,也就是每個TaskManager可以接收3個task,一共9個TaskSlot,如果我們設置parallelism.default=1,即運行程序默認的并行度為1,9個TaskSlot只用了1個,有8個空閑,因此,設置合適的并行度才能提高效率。DataflowFlinkSourceTransformationSinkSourceTransformationSink(streams)。圖Flink程序模型Flink程序的基礎構建模塊是流(streams)與轉換(transformations)(注意的是,FlinkDataSetAPI所使用的DataSets其內部也是stream)stream可以看成一個中間結果,而一個transformations是以一個或多個stream作為輸入的某種operation,該operation利用這些stream進行計算從而產生一個或多個resultstream。在運行時,Flink上運行的程序會被映射成streamingdataflows,它包含了streams和transformationsoperators。每一個dataflow以一個或多個sources開始以一個或多個sinks結束,dataflow類似于任意的有向無環圖(DAG)。圖程序與數據流并行數據流Flink程序的執行具有并行、分布式的特性。在執行過程中,一個stream包含一個或多個streampartition,而每一個operator包含一個或多個operatorsubtask,這些operatorsubtasks在不同的線程、不同的物理機或不同的容器中彼此互不依賴得執行。一個特定operator的subtask的個數被稱之為其parallelism(并行度)。一個stream的并行度總是等同于其producingoperator的并行度。一個程序中,不同的operator可能具有不同的并行度。圖并行數據流Stream在operator之間傳輸數據的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具體是哪一種形式,取決于operator的種類。One-to-one:stream(比如在source和mapoperator之間)維護著分區以及元素的順序。那意味著mapoperator的subtask看到的元素的個數以及順序跟sourceoperatorsubtask生產的元素的個數、順序相同,map、fliter、flatMap等算子都是one-to-one的對應關系。Redistributing:這種操作會改變數據的分區個數operatorsubtask依據所選擇的transformation發送數據到不同的目標subtask。例如,keyBy()基于hashCode重分區、broadcast和rebalance會隨機重新分區,這些算子都會引起redistribute過程,而redistribute過程就類似于Spark中的shuffle過程。與operatorchains出于分布式執行的目的,Flink將operator的subtask鏈接在一起形成task,每個task在一個線程中執行。將operators鏈接成task是非常有效的優化:它能減少線程之間的切換和基于緩存區的數據交換,在減少時延的同時提升吞吐量。鏈接的行為可以在編程API中進行指定。下面這幅圖,展示了5個subtask以5個并行的線程來執行:圖task與operatorchains任務調度流程圖任務調度原理客戶端不是運行時和程序執行的一部分,但它用于準備并發送dataflow給Master,然后,客戶端斷開連接或者維持連接以等待接收計算結果,客戶端可以以兩種方式運行:要么作為Java/Scala程序的一部分被程序觸發執行,要么以命令行./bin/flinkrun的方式執行。第四章 FlinkAPIFlink運行模型圖Flink程序模型以上為Flink的運行模型,Flink的程序主要由三部分構成,分別為Source、Transformation、Sink。DataSource主要負責數據的讀取,Transformation主要負責對屬于的轉換操作,Sink負責最終數據的輸出。Flink程序架構每個Flink程序都包含以下的若干流程:獲得一個執行環境;(ExecutionEnvironment)加載/創建初始數據;(Source)指定轉換這些數據;(Transformation)指定放置計算結果的位置;(Sink)觸發程序執行。EnvironmentStreamExecutionEnvironment.getExecutionEnvironmentStreamExecutionEnvironment.createLocalEnvironmentStreamExecutionEnvironment.createRemoteEnvironment執行環境StreamExecutionEnvironment是所有Flink程序的基礎StreamExecutionEnvironment.getExecutionEnvironmentStreamExecutionEnvironment.createLocalEnvironmentStreamExecutionEnvironment.createRemoteEnvironmentStreamExecutionEnvironment.getExecutionEnvironment創建一個執行環境,表示當前執行程序的上下文。如果程序是獨立調用的,則此方法返回本地執行環境;如果從命令行客戶端調用程序以提交到集群,則此方法返回此集群的執行環境,也就是說,getExecutionEnvironment會根據查詢運行的方式決定返回什么樣的運行環境,是最常用的一種創建執行環境的方式。valenv=StreamExecutionEnvironment.getExecutionEnvironment StreamExecutionEnvironment.createLocalEnvironment返回本地執行環境,需要在調用時指定默認的并行度。valenv=StreamExecutionEnvironment.createLocalEnvironment(1) StreamExecutionEnvironment.createRemoteEnvironment返回集群執行環境,將Jar提交到遠程服務器。需要在調用時指定JobManager的IP和端口號,并指定要在集群中運行的Jar包。valenv=StreamExecutionEnvironment.createRemoteEnvironment(1) SourceFile的數據源一列一列的讀取遵循TextInputFormat規范的文本文件,并將結果作為String返回。valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.readTextFile("/opt/modules/test.txt")stream.print()valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.readTextFile("/opt/modules/test.txt")stream.print()env.execute("FirstJob")注意:stream.print():每一行前面的數字代表這一行是哪一個并行線程輸出的。readFile(fileInputFormat,path)valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalpath=newPath("/opt/modules/test.txt")valstream=env.readFile(newTextInputFormat(path),"/opt/modules/test.txt")stream.print()valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalpath=newPath("/opt/modules/test.txt")valstream=env.readFile(newTextInputFormat(path),"/opt/modules/test.txt")stream.print()env.execute("FirstJob")Socket的數據源1.socketTextStreamvalenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.socketTextStream("localhost",11111)stream.print()env.execute("FirstJob")從Socketvalenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.socketTextStream("localhost",11111)stream.print()env.execute("FirstJob")基于集合(Collection)的數據源fromCollection(seq)valenv=StreamExecutionEnvironment.getExecutionEnvironmentvallist=List(1,2,3,4)valstream=env.fromCollection(list)valenv=StreamExecutionEnvironment.getExecutionEnvironmentvallist=List(1,2,3,4)valstream=env.fromCollection(list)stream.print()env.execute("FirstJob")fromCollection(Iterator)valenv=StreamExecutionEnvironment.getExecutionEnvironmentvaliterator=Iterator(1,2,3,4)valstream=env.fromCollection(iterator)stream.print()env.execute("FirstJob")從迭代(Iterator)中創建一個數據流,指定元素數據類型的類由valenv=StreamExecutionEnvironment.getExecutionEnvironmentvaliterator=Iterator(1,2,3,4)valstream=env.fromCollection(iterator)stream.print()env.execute("FirstJob")fromElements(elements:_*)valenv=StreamExecutionEnvironment.getExecutionEnvironmentvallist=List(1,2,3,4)valstream=env.fromElement(list)stream.print()valenv=StreamExecutionEnvironment.getExecutionEnvironmentvallist=List(1,2,3,4)valstream=env.fromElement(list)stream.print()env.execute("FirstJob")generateSequence(from,to)valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.generateSequence(1,10)stream.print()valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.generateSequence(1,10)stream.print()env.execute("FirstJob")SinkDataSink消費DataStream中的數據,并將它們轉發到文件、套接字、外部系統或者打印出。Flink有許多封裝在DataStream操作里的內置輸出格式。將元素以字符串形式逐行寫入(TextOutputFormat),這些字符串通過調用每個元素的toString()方法來獲取。WriteAsCsv將元組以逗號分隔寫入文件中(CsvOutputFormat),行及字段之間的分隔是可配置的。每個字段的值來自對象的toString()方法。打印每個元素的toString()方法的值到標準輸出或者標準錯誤輸出流中。或者也可以在輸出流中添加一個前綴,這個可以幫助區分不同的打印調用,如果并行度大于1,那么輸出也會有一個標識由哪個任務產生的標志。writeUsingOutputFormat自定義文件輸出的方法和基類(FileOutputFormat),支持自定義對象到字節的轉換。根據SerializationSchema將元素寫入到socket中。TransformationMapvalenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.generateSequence(1,10)valstreamMap=stream.map{x=>x*2}streamFilter.print()env.execute("FirstJob")DataStream→valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.generateSequence(1,10)valstreamMap=stream.map{x=>x*2}streamFilter.print()env.execute("FirstJob")FlatMapvalenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.readTextFile("test.txt")valstreamFlatMap=stream.flatMap{x=>x.split("")}DataStream→DataStream:輸入一個參數,產生0valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.readTextFile("test.txt")valstreamFlatMap=stream.flatMap{x=>x.split("")}streamFilter.print()streamFilter.print()env.execute("FirstJob")Filtervalenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.generateSequence(1,10)valstreamFilter=stream.filter{x=>x==1}streamFilter.print()env.execute("FirstJob")DataStream→DataStreamvalenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.generateSequence(1,10)valstreamFilter=stream.filter{x=>x==1}streamFilter.print()env.execute("FirstJob")Connect圖Connect算子DataStream,DataStream→ConnectedStreams:連接兩個保持他們類型的數據流,兩個數據流被Connect之后,只是被放在了一個同一個流中,內部依然保持各自的valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.readTextFile("test.txt")val streamMap = stream.flatMap(item => item.split(" ")).filter(item valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.readTextFile("test.txt")val streamMap = stream.flatMap(item => item.split(" ")).filter(item item.equals("hadoop"))valstreamCollect=env.fromCollection(List(1,2,3,4))valstreamConnect=streamMap.connect(streamCollect)streamConnect.map(item=>println(item),item=>println(item))env.execute("FirstJob")CoMap,CoFlatMap圖CoMap/CoFlatMapConnectedStreams→DataStream:作用于ConnectedStreams上,功能與map和flatMap一樣,對ConnectedStreams中的每一個Stream分別進行map和flatMap處理。valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream1=env.readTextFile("test.txt")valstreamFlatMap=stream1.flatMap(x=>x.split(""))valstream2=env.fromCollection(List(1,2,3,4))valstreamConnect=streamFlatMap.connect(stream2)valstreamCoMap=streamConnect.map((str)=>str+"connect",(in)=>in+100)env.execute("FirstJob")valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream1=env.readTextFile("test.txt")valstream2=env.readTextFile("test1.txt")valstreamConnect=stream1.connect(stream2)valstreamCoMap=streamConnect.flatMap((str1)=>str1.split(""),(str2)=>str2.split(""))streamConnect.map(item=>println(item),item=>println(item))env.execute("FirstJob")Split圖Splitvalenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.readTextFile("test.txt")valstreamFlatMap=stream.flatMap(x=>x.split(""))valstreamSplit=streamFlatMap.split(num=>字符串內容為hadoopvalenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.readTextFile("test.txt")valstreamFlatMap=stream.flatMap(x=>x.split(""))valstreamSplit=streamFlatMap.split(num=>字符串內容為hadoopDataStreamDataStream(num.equals("hadoop"))match{casetrue=>List("hadoop")casefalse=>List("other")})env.execute("FirstJob")Select圖Selectvalenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.readTextFile("test.txt")valstreamFlatMap=stream.flatMap(x=>x.split(""))valstreamSplit=streamFlatMap.split(SplitStream→DataStream從一個SplitStream中獲取一個或者多個DataStreavalenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.readTextFile("test.txt")valstreamFlatMap=stream.flatMap(x=>x.split(""))valstreamSplit=streamFlatMap.split(num=>num=>(num.equals("hadoop"))match{casetrue=>List("hadoop")casefalse=>List("other")})valhadoop=streamSplit.select("hadoop")valother=streamSplit.select("other")hadoop.print()env.execute("FirstJob")Union圖UnionDataStream→DataStream:對兩個或者兩個以上的DataStream進行union操作,產生一個包含所有DataStream元素的新DataStream。注意:如果你將一個DataStream跟它自己做union操作,在新的DataStream中,你將看到每一個元素都出現兩次。valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream1=env.readTextFile("test.txt")valstreamFlatMap1=stream1.flatMap(x=>x.split(""))valstream2=env.readTextFile("test1.txt")valstreamFlatMap2=stream2.flatMap(x=>x.split(""))valstreamConnect=streamFlatMap1.union(streamFlatMap2)env.execute("FirstJob")KeyByvalenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.readTextFile("test.txt")valstreamFlatMap=stream.flatMap{x=>x.split("")}valstreamMap=streamFlatMap.map{x=>(x,1)DataStream→KeyedStream:輸入必須是Tuple類型,邏輯地將一個流拆分成不相交的分區,每個分區包含具有相同keyvalenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.readTextFile("test.txt")valstreamFlatMap=stream.flatMap{x=>x.split("")}valstreamMap=streamFlatMap.map{x=>(x,1)}}valstreamKeyBy=streamMap.keyBy(0)env.execute("FirstJob")ReduceKeyedStream→DataStream:一個分組數據流的聚合操作,合并當前的元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是只返回最后一次聚合的最終結果。valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.readTextFile("test.txt").flatMap(item=>item.split("")).map(item=>(item,1)).keyBy(0)valstreamReduce=stream.reduce((item1,item2)=>(item1._1,item1._2+item2._2))streamReduce.print()env.execute("FirstJob")FoldKeyedStream→DataStream:一個有初始值的分組數據流的滾動折疊操作,合并當前元素和前一次折疊操作的結果,并產生一個新的值,返回的流中包含每一次折疊的結果,而不是只返回最后一次折疊的最終結果。valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.readTextFile("test.txt").flatMap(item=>item.split("")).map(item=>(item,1)).keyBy(0)valstreamReduce=stream.fold(100)((begin,item)=>(begin+item._2))streamReduce.print()env.execute("FirstJob")AggregationsKeyedStream→DataStream:分組數據流上的滾動聚合操作。min和minBy的區別是min返回的是一個最小值,而minBy返回的是其字段中包含最小值的元素(同樣原理適用于max和maxBy),返回的流中包含每一次聚合的結果,而不是只返回最后一次聚合的最終結果。keyedStream.sum(0)keyedStream.sum("key")keyedStream.min(0)keyedStream.sum(0)keyedStream.sum("key")keyedStream.min(0)keyedStream.min("key")keyedStream.max(0)keyedStream.max("key")keyedStream.minBy(0)keyedStream.minBy("key")keyedStream.maxBy(0)keyedStream.max(0)keyedStream.max("key")keyedStream.minBy(0)keyedStream.minBy("key")keyedStream.maxBy(0)keyedStream.maxBy("key")valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalenv=StreamExecutionEnvironment.getExecutionEnvironmentvalstream=env.readTextFile("test02.txt").map(item=>(item.split("")(0),item.split("")(1).toLong)).keyBy(0)valstreamReduce=stream.sum(1)streamReduce.print()env.execute("FirstJob")在2.3.10之前的算子都是可以直接作用在Stream上的,因為他們不是聚合類型的操作,但是到2.3.10后你會發現,我們雖然可以對一個無邊界的流數據直接應用聚合算子,但是它會記錄下每一次的聚合結果,這往往不是我們想要的,其實,reduce、fold、aggregation這些聚合算子都是和Window配合使用的,只有配合Window,才能得到想要的結果。第六章 Time與WindowTime在Flink的流式處理中,會涉及到時間的不同概念,如下圖所示:圖Flink時間概念EventTime:是事件創建的時間。它通常由事件中的時間戳描述,例如采集的日志數據中,每一條日志都會記錄自己的生成時間,Flink通過時間戳分配器訪問事件時間戳。IngestionTime:是數據進入Flink的時間。ProcessingTime:是每一個執行基于時間操作的算子的本地系統時間,與機器相關,默認的時間屬性就是ProcessingTime。例如,一條日志進入Flink的時間為2017-11-1210:00:00.123,到達Window的系統時間為2017-11-1210:00:01.234,日志的內容如下:2017-11-0218:37:15.624INFOFailovertorm2 對于業務來說,要統計1min內的故障日志個數,哪個時間是最有意義的?——eventTime,因為我們要根據日志的生成時間進行統計。WindowWindow概述streaming流式計算是一種被設計用于處理無限數據集的數據處理引擎,而無限數據集是指一種不斷增長的本質上無限的數據集,而window是一種切割無限數據為有限塊進行處理的手段。Window是無限數據流處理的核心,Window將一個無限的stream拆分成有限大小的”buckets”桶,我們可以在這些桶上做計算操作。Window類型Window可以分成兩類:CountWindow:按照指定的數據條數生成一個Window,與時間無關。TimeWindow:按照時間生成Window。對于TimeWindow(TumblingWindow)、滑動窗口(SlidingWindow)和會話窗口(SessionWindow)。(TumblingWindows)將數據依據固定的窗口長度對數據進行切片。特點:時間對齊,窗口長度固定,沒有重疊。滾動窗口分配器將每個元素分配到一個指定窗口大小的窗口中,滾動窗口有一個固定的大小,并且不會出現重疊。例如:如果你指定了一個5分鐘大小的滾動窗口,窗口的創建如下圖所示:圖滾動窗口適用場景:適合做BI統計等(做每個時間段的聚合計算)。(SlidingWindows)滑動窗口是固定窗口的更廣義的一種形式,滑動窗口由固定的窗口長度和滑動間隔組成。特點:時間對齊,窗口長度固定,有重疊。滑動窗口分配器將元素分配到固定長度的窗口中,與滾動窗口類似,窗口的大小由窗口大小參數來配置,另一個窗口滑動參數控制滑動窗口開始的頻率。因此,滑動窗口如果滑動參數小于窗口大小的話,窗口是可以重疊的,在這種情況下元素會被分配到多個窗口中。例如,你有10分鐘的窗口和5分鐘的滑動,那么每個窗口中5分鐘的窗口里包含著上個10分鐘產生的數據,如下圖所示:圖滑動窗口適用場景:對最近一個時間段內的統計(求某接口最近5min的失敗率來決定是否要報警)。(SessionWindows)由一系列事件組合一個指定時間長度的timeout間隙組成,類似于web應用的session,也就是一段時間沒有接收到新數據就會生成新的窗口。特點:時間無對齊。session窗口分配器通過session活動來對元素進行分組,session窗口跟滾動窗口和滑動窗口相比,不會有重疊和固定的開始時間和結束時間的情況,相反,當它在一個固定的時間周期內不再收到元素,即非活動間隔產生,那個這個窗口就會關閉。一個session窗口通過一個session間隔來配置,這個session間隔定義了非活躍周期的長度,當這個非活躍周期產生,那么當前的session將關閉并且后續的元素將被分配到新的session窗口中去。圖會話窗口WindowCountWindowCountWindow根據窗口中相同key元素的數量來觸發執行,執行時只計算元素數量達到窗口大小的key對應的結果。注意:CountWindow的window_size指的是相同Key的元素的個數,不是輸入的所有元素的總數。滾動窗口默認的CountWindow是一個滾動窗口,只需要指定窗口大小即可,當元素數量達到窗口大小時,就會觸發窗口的執行。////獲取執行環境valenv=StreamExecutionEnvironment.getExecutionEnvironment//創建SocketSourcevalstream=env.socketTextStream("localhost",11111)valstream=env.socketTextStream("localhost",11111)//對streamkey聚合val streamKeyBy = stream.map(item => (item.split(" ")(0), ")(1).toLong)).keyBy(0)//引入滾動窗口//這里的55個相同key的元素計算一次valstreamWindow=streamKeyBy.countWindow(5)//執行聚合操作valstreamReduce=streamWindow.reduce((item1,item2)=>(item1._1,item1._2+item2._2))//將聚合數據寫入文件streamReduce.print()//執行程序env.execute("TumblingWindow")滑動窗口滑動窗口和滾動窗口的函數名是完全一致的,只是在傳參數時需要傳入兩個參數,一個是window_size,一個是sliding_size。下面代碼中的sliding_size設置為了2,也就是說,每收到兩個相同key的數據就計算一次,每一次計算的window范圍是5個元素。////獲取執行環境valenv=StreamExecutionEnvironment.getExecutionEnvironment//創建SocketSourcevalstream=env.socketTextStream("localhost",11111)//對streamkey聚合val streamKeyBy = stream.map(item => (item.split(" ")(0), ")(1).toLong)).keyBy(0)//引入滾動窗口//key25valstreamWindow=streamKeyBy.countWindow(5,2)//執行聚合操作valstreamReduce=streamWindow.reduce((item1,item2)=>(item1._1,item1._2+item2._2))//將聚合數據寫入文件streamReduce.print()//執行程序env.execute("TumblingWindow")}TimeWindowTimeWindow是將指定時間范圍內的所有數據組成一個window,一次對一個window里面的所有數據進行計算。滾動窗口Flink默認的時間窗口根據ProcessingTime進行窗口的劃分,將Flink獲取到的數據根據進入Flink的時間劃分到不同的窗口中。////獲取執行環境valenv=StreamExecutionEnvironment.getExecutionEnvironment//創建SocketSourcevalstream=env.socketTextStream("localhost",11111)//對streamkey聚合valstreamKeyBy=stream.map(item=>(item,1)).keyBy(0)//引入時間窗口valstreamWindow=streamKeyBy.timeWindow(Time.seconds(5))//執行聚合操作valstreamReduce=streamWindow.reduce((item1,item2)=>(item1._1,item1._2+item2._2))//將聚合數據寫入文件streamReduce.print()//執行程序env.execute("TumblingWindow")時間間隔可以通過Tliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個來指定。(SlidingEventTimeWindows)滑動窗口和滾動窗口的函數名是完全一致的,只是在傳參數時需要傳入兩個參數,一個是window_size,一個是sliding_size。下面代碼中的sliding_size設置為了2s,也就是說,窗口每2s就計算一次,每一次計算的window范圍是5s內的所有元素。////獲取執行環境valenv=StreamExecutionEnvironment.getExecutionEnvironment//創建SocketSourcevalstream=env.socketTextStream("localhost",11111)//對streamkey聚合valstreamKeyBy=stream.map(item=>(item,1)).keyBy(0)//引入滾動窗口valstreamWindow=streamKeyBy.timeWindow(Time.seconds(5),Time.seconds(2))//執行聚合操作valstreamReduce=streamWindow.reduce((item1,item2)=>(item1._1,item1._2+item2._2))//將聚合數據寫入文件streamReduce.print()//執行程序env.execute("TumblingWindow")時間間隔可以通過Tliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個來指定。WindowReduce//獲取執行環境valenv=StreamExecutionEnvironment.getExecutionEnvironment//創建SocketSourcevalstream=env.socketTextStream("localhost",11111)////獲取執行環境valenv=StreamExecutionEnvironment.getExecutionEnvironment//創建SocketSourcevalstream=env.socketTextStream("localhost",11111)//對streamkey聚合valstreamKeyBy=stream.map(item=>(item,1)).keyBy(0)//引入時間窗口valstreamWindow=streamKeyBy.timeWindow(Time.seconds(5))//執行聚合操作valstreamReduce=streamWindow.reduce((item1,item2)=>(item1._1,item1._2+item2._2))//將聚合數據寫入文件streamReduce.print()//執行程序env.execute("TumblingWindow")WindowFoldWindowedStream→DataStream:給窗口賦一個fold功能的函數,并返回一個fold后的結果。////獲取執行環境valenv=StreamExecutionEnvironment.getExecutionEnvironment//創建SocketSourcevalstream=env.socketTextStream("localhost",11111,'\n',3)//對streamkey聚合valstreamKeyBy=stream.map(item=>(item,1)).keyBy(0)//引入滾動窗口valstreamWindow=streamKeyBy.timeWindow(Time.seconds(5))//執行fold操作valstreamFold=streamWindow.fold(100){(begin,item)=>begin+item._2}//將聚合數據寫入文件streamFold.print()////執行程序env.execute("TumblingWindow")AggregationonWindowWindowedStream→DataStream:對一個window內的所有元素做聚合操作。min和minBy的區別是min返回的是最小值,而minBy返回的是包含最小值字段的元素(同樣的原理適用于max和maxBy)。////獲取執行環境valenv=StreamExecutionEnvironment.getExecutionEnvironment//創建SocketSourcevalstream=env.socketTextStream("localhost",11111)//對streamkey聚合valstreamKeyBy=stream.map(item=>(item.split("")(0),item.split("")(1))).keyBy(0)//引入滾動窗口valstreamWindow=streamKeyBy.timeWindow(Time.seconds(5))//執行聚合操作valstreamMax=streamWindow.max(1)//將聚合數據寫入文件streamMax.print()//執行程序env.execute("TumblingWindow")第七章 EventTime與WindowEventTime的引入在Flink的流式處理中,

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論