




版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領
文檔簡介
1、spark任務提交源碼入手小試牛刀spark由于實際工作當中,都是將spark的任務提交到y(tǒng)arn集群上面去,所以我們安裝spark的環(huán)境只需要安裝一個任務提交客戶端即可第一步:下載安裝包并解壓node01下載spark3.0的安裝包 cd/kkb/softwget/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgztar-zxfspark-3.0.0-bin-hadoop3.2.tgz-C/kkb/install/第二步:修改配置文件node01執(zhí)行以下命令修改spark-env.sh配置文件 cd/kkb/install/spark-3.0.0-bin-hadoop3.2/conf/cpspark-env.sh.templatespark-env.shvimspark-env.shexportJAVA_HOME=/kkb/install/jdk1.8.0_141exportHADOOP_HOME=/kkb/install/hadoop-3.1.4exportHADOOP_CONF_DIR=/kkb/install/hadoop-3.1.4/etc/hadoopexportSPARK_CONF_DIR=/kkb/install/spark-3.0.0-bin-hadoop3.2/confexportYARN_CONF_DIR=/kkb/install/hadoop-3.1.4/etc/hadoopnode01執(zhí)行以下命令修改slaves配置文件cd/kkb/install/spark-3.0.0-bin-hadoop3.2/conf/cpslaves.templateslavesvimslaves#編輯文件內容添為以下node01node02node03node01執(zhí)行以下命令修改spark-defaults.conf配置選項cd/kkb/install/spark-3.0.0-bin-hadoop3.2/confcpspark-defaults.conf.templatespark-defaults.confvimspark-defaults.confspark.eventLog.enabledspark.eventLog.dirspark.eventLpresstruehdfs://node01:8020/spark_logtrue第三步:安裝包的分發(fā)將node01機器的spark安裝包分發(fā)到其他機器上面去node01執(zhí)行以下命令進行分發(fā) cd/kkb/install/scp-rspark-3.0.0-bin-hadoop3.2/node02:$PWDscp-rspark-3.0.0-bin-hadoop3.2/node03:$PWDnode01執(zhí)行以下命令啟動spark集群 hdfsdfs-mkdir-p/spark_logcd/kkb/install/spark-3.0.0-bin-hadoop3.2sbin/start-all.shsbin/start-history-server.sh第五步:訪問瀏覽器管理界面直接瀏覽器訪問 http://node01:8080查看spark集群管理webUI界面。注意,如果8080端口沒法訪問,順延8081端口進行訪問,如果8081端口也沒法訪問,繼續(xù)往后順延端口號http://node01:18080/訪問查看spark的historyserver地址2、spark運行計算圓周率之任務提交過程spark集群安裝運行成功之后,我們就可以運行計算spark的任務了,例如我們可以提交一個spark的自帶案例來計算圓周率,其中spark的任務提交又有多種方式,例如local模式,standAlone模式或者yarn模式等等,其中我們實際工作當中用的最多的就是yarn模式,以下是幾種提交運行模式的介紹sparklocal local模式不用啟動任何的spark的進程,只需要解壓一個spark的安裝包就可以直接使用了,基于local模式的client提交運行方式,提交命令如下binbin/spark-submit--classorg.apache.spark.examples.SparkPi--masterlocal--deploy-modeclient--executor-memory2G--total-executor-cores4examples/jars/spark-examples_2.12-3.0.0.jar10基于onyarn的cluster模式任務提交基于local模式的cluster提交運行方式,提交命令如下binbin/spark-submit--classorg.apache.spark.examples.SparkPi--masterlocal--deploy-modecluster--executor-memory2G--total-executor-cores4examples/jars/spark-examples_2.12-3.0.0.jar10我們會看到,基于local模式的cluster提交方式直接報錯2.2、spark任務提交的standAlone模式基于standAlone的任務提交,需要我們安裝搭建spark集群,并啟動master以及worker進程提交命令如下bin/spark-submit--classorg.apache.spark.examples.SparkPi\--masterspark://node01:7077\--deploy-modeclient\--executor-memory2G\--total-executor-cores4\examples/jars/spark-examples_2.12-3.0.0.jar10基于cluster任務的提交命令bin/spark-submit--classorg.apache.spark.examples.SparkPi\--masterspark://node01:7077\--deploy-modecluster\--executor-memory2G\--total-executor-cores4\examples/jars/spark-examples_2.12-3.0.0.jar103、spark任務提交的yarn模式并且將任務提交到y(tǒng)arn集群上面去node01執(zhí)行以下命令提交任務到y(tǒng)arn集群上面去運行 bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masteryarn\--deploy-modeclient\examples/jars/spark-examples_2.12-3.0.0.jar50\其中sparkonyarncluster模式代碼提交運行架構如下sparkonyarnclient模式運行.drawiobin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masteryarn\--deploy-modecluster\examples/jars/spark-examples_2.12-3.0.0.jar50\其中sparkonyarnclient模式任務提交過程如下spark-submitonyarncluster任務提交.drawio3、spark任務提交腳本分析mit 提交任務的過程是通過spark-submit這個腳本來進行提交的,那我們就一起來看一下spark-submit這個腳本的內容#!/usr/bin/envbash##LicensedtotheApacheSoftwareFoundation(ASF)underoneormore#contributorlicenseagreements.SeetheNOTICEfiledistributedwith#thisworkforadditionalinformationregardingcopyrightownership.#TheASFlicensesthisfiletoYouundertheApacheLicense,Version2.0#(the"License");youmaynotusethisfileexceptincompliancewith#theLicense.YoumayobtainacopyoftheLicenseat##/licenses/LICENSE-2.0##Unlessrequiredbyapplicablelaworagreedtoinwriting,software#distributedundertheLicenseisdistributedonan"ASIS"BASIS,#WITHOUTWARRANTIESORCONDITIONSOFANYKIND,eitherexpressorimplied.#SeetheLicenseforthespecificlanguagegoverningpermissionsand#limitationsundertheLicense.#if[-z"${SPARK_HOME}"];thensource"$(dirname"$0")"/find-spark-home#disablerandomizedhashforstringinPython3.3+exportPYTHONHASHSEED=0exec"${SPARK_HOME}"/bin/spark-classorg.apache.spark.deploy.SparkSubmit"$@"查看發(fā)現(xiàn)spark-submit這個腳本里面執(zhí)行了spark-class這個腳本,然后帶了一個org.apache.spark.deploy.SparkSubmit這個參數(shù),其中使用$@來將我們傳入給spark-submit這個腳本的所有參數(shù)都傳遞過來了既然知道了執(zhí)行了spark-class腳本,后面帶上了org.apache.spark.deploy.SparkSubmit這個class類,那么我們就來看一下spark-class這腳本內容 #!/usr/bin/envbashif[-z"${SPARK_HOME}"];thensource"$(dirname"$0")"/find-spark-home."${SPARK_HOME}"/bin/load-spark-env.sh#Findthejavabinaryif[-n"${JAVA_HOME}"];thenRUNNER="${JAVA_HOME}/bin/java"elseif["$(command-vjava)"];thenRUNNER="java"elseecho"JAVA_HOMEisnotset">&2exit1#FindSparkjars.if[-d"${SPARK_HOME}/jars"];thenSPARK_JARS_DIR="${SPARK_HOME}/jars"elseSPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"if[!-d"$SPARK_JARS_DIR"]&&[-z"$SPARK_TESTING$SPARK_SQL_TESTING"];thenecho"FailedtofindSparkjarsdirectory($SPARK_JARS_DIR)."1>&2echo"YouneedtobuildSparkwiththetarget\"package\"beforerunningthisprogram."1>&2exit1elseLAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"#Addthelauncherbuilddirtotheclasspathifrequested.if[-n"$SPARK_PREPEND_CLASSES"];thenLAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"#Fortestsif[[-n"$SPARK_TESTING"]];thenunsetYARN_CONF_DIRunsetHADOOP_CONF_DIRbuild_command(){"$RUNNER"-Xmx128m$SPARK_LAUNCHER_OPTS-cp"$LAUNCH_CLASSPATH"org.apache.spark.launcher.Main"$@"printf"%d\0"$?}#Turnoffposixmodesinceitdoesnotallowprocesssubstitutionset+oposixCMD=()DELIM=$'\n'CMD_START_FLAG="false"whileIFS=read-d"$DELIM"-rARG;doif["$CMD_START_FLAG"=="true"];thenCMD+=("$ARG")elseif["$ARG"==$'\0'];then#AfterNULLcharacterisconsumed,changethedelimiterandconsumecommandstring.DELIM=''CMD_START_FLAG="true"elif["$ARG"!=""];thenecho"$ARG"done<<(build_command"$@")COUNT=${#CMD[@]}LAST=$((COUNT-1))LAUNCHER_EXIT_CODE=${CMD[$LAST]}if![[$LAUNCHER_EXIT_CODE=~echo"${CMD[@]}"|head-n-1exit1^[0-9]+$]];then2if[$LAUNCHER_EXIT_CODE!=0];thenexit$LAUNCHER_EXIT_CODECMD=("${CMD[@]:0:$LAST}")exec"${CMD[@]}"最后執(zhí)行了$CMD這個命令,并且?guī)狭怂械膮?shù),那么CMD究竟是個什么東西,我們可以修改腳本給它打印出來看一下修改spark-class腳本,然后添加一行打印cd/kkb/install/spark-3.0.0-bin-hadoop3.2/vimbin/spark-class#在后面位置添加一行打印命令出來CMD=("${CMD[@]:0:$LAST}")echo"${CMD[@]}"exec"${CMD[@]}"重新提交任務cd/kkb/install/spark-3.0.0-bin-hadoop3.2/bin/spark-submit--classorg.apache.spark.examples.SparkPi--masteryarn--deploy-modeclientexamples/jars/spark-examples_2.12-3.0.0.jar50下:[hadoop@node01spark-3.0.0-bin-hadoop3.2]$bin/spark-submit--classorg.apache.spark.examples.SparkPi--masteryarn--deploy-modeclientexamples/jars/spark-examples_2.12-3.0.0.jar50/kkb/install/jdk1.8.0_141/bin/java-cp/kkb/install/spark-3.0.0-bin-hadoop3.2/conf/:/kkb/install/spark-3.0.0-bin-hadoop3.2/jars/*:/kkb/install/hadoop-3.1.4/etc/hadoop/-Xmx1gorg.apache.spark.deploy.SparkSubmit--masteryarn--deploy-modeclient--classorg.apache.spark.examples.SparkPiexamples/jars/spark-examples_2.12-3.0.0.jar50觀察打印信息,主要就是執(zhí)行了一個命令java-cp/kkb/install/spark-3.0.0-bin-hadoop3.2/conf/:/kkb/install/spark-3.0.0-bin-hadoop3.2/jars/*:/kkb/install/hadoop-3.1.4/etc/hadoop/-Xmx1gorg.apache.spark.deploy.SparkSubmit--masteryarn--deploy-modeclient--classorg.apache.spark.examples.SparkPiexamples/jars/spark-examples_2.12-3.0.0.jar50就是執(zhí)行了一個java的命令,通過org.apache.spark.deploy.SparkSubmit來進行了任務提交,其實就是啟動了一個jvm的虛擬機進程來執(zhí)行了任務的提交,就是執(zhí)行了SparkSubmit的main方法,我們可以去查看源碼,找到SparkSubmit的main方法,驗證啟動的過程步驟4、SparkSubmit源碼分析前面我們通過腳本查看到了我們提交任務都是通過SparkSubmit這個類,那么我們就可以通過源碼當中的SparkSubmit來查看這個類的main方法,main方法作為入口啟動了一個java的進程,通過IDEA的快捷鍵Ctrl+shift+alt+N來搜索SparkSubmit這類,然后知道Object當中的main方法、查看main方法 /**/***在這里作為程序的入口類overridedefmain(args:Array[String]):Unit={valsubmit=newSparkSubmit(){self=>/***解析傳入的參數(shù)*@paramargs*@returnoverrideprotecteddefparseArguments(args:Array[String]):SparkSubmitArguments={newSparkSubmitArguments(args){overrideprotecteddeflogInfo(msg:=>String):Unit=self.logInfo(msg)overrideprotecteddeflogWarning(msg:=>String):Unit=self.logWarning(msg)overrideprotecteddeflogError(msg:=>String):Unit=self.logError(msg)}}overrideprotecteddeflogInfo(msg:=>String):Unit=printMessage(msg)overrideprotecteddeflogWarning(msg:=>String):Unit=printMessage(s"Warning:$msg")overrideprotecteddeflogError(msg:=>String):Unit=printMessage(s"Error:$msg")overridedefdoSubmit(args:Array[String]):Unit={try{super.doSubmit(args)}catch{casee:SparkUserAppException=>exitFn(e.exitCode)}}}/***最終的任務提交submit.doSubmit(args)}進入到super.doSubmit方法Submit其實就是執(zhí)行了SparkSubmit內部當中的doSubmit方法 overridedefdoSubmit(args:Array[String]):Unit={try{super.doSubmit(args)}catch{casee:SparkUserAppException=>exitFn(e.exitCode)}}3、查看父類當中的doSubmit方法 查看doSubmit方法當中的parseArguments方法defdoSubmit(args:Array[String]):Unit={//Initializeloggingifithasn'tbeendoneyet.Keeptrackofwhetherloggingneedsto//beresetbeforetheapplicationstarts./***初始化日志記錄valuninitLog=initializeLogIfNecessary(true,silent=true)/***解析參數(shù)所有的參數(shù)都解析出來封裝到一個對象里面去了叫做SparkSubmitArgumentsvalappArgs=parseArguments(args)if(appArgs.verbose){logInfo(appArgs.toString)}/***使用模式匹配來執(zhí)行任務提交的各種操作appArgs.actionmatch{caseSparkSubmitAction.SUBMIT=>submit(appArgs,uninitLog)caseSparkSubmitAction.KILL=>kill(appArgs)caseSparkSubmitAction.REQUEST_STATUS=>requestStatus(appArgs)caseSparkSubmitAction.PRINT_VERSION=>printVersion()}}3.1、查看doSubmit方法當中的parseArguments方法查看到parseArguments valappArgs=parseArguments(args)點擊parseArguments方法進入到這個方法的具體實現(xiàn)如下protectedprotecteddefparseArguments(args:Array[String]):SparkSubmitArguments={newSparkSubmitArguments(args)}可以看到該方法就是創(chuàng)建了SparkSubmitArguments這個對象,點擊進入到這個對象當中來3.2、查看SparkSubmitArguments當中的parse方法查看方法解析SparkSubmitOptionParser這個java類當中的parse方法 3.2.1、查看handle這個方法所在的具體實現(xiàn)handle這個方法的具體實現(xiàn)通過idea的快捷鍵ctrl+shift+h的方式,我們可以看到具體實現(xiàn)在SparkSubmitArguments當中,我們直接去看SparkSubmitArguments當中的實現(xiàn)方法SparkSubmitArguments當中的handle方法如下:這個方法主要就是在解析各種參數(shù)3.3、查看SparkSubmitArguments當中的loadEnvironmentArguments方法給action賦值 前面已經(jīng)查看了SparkSubmitArguments當中的parse方法,該方法主要就是解析我們的參數(shù),然后給每個指定的參數(shù)進行賦值了,下面還有一個方法叫做loadEnvironmentArguments這個方法主要就是給action進行賦值的查看SparkSubmitArguments當中的loadEnvironmentArguments()方法查看loadEnvironmentArguments方法3.4、action賦值成功之后在SparkSubmit當中的doSubmit方法當中提交任務 給action賦值成功之后,默認值就是SUBMIT,那么在SparkSubmit當中執(zhí)行doSubmit方法當中,使用模式匹配來進行任務提交3.5、查看SparkSubmit當中的submit方法 找到了默認就是任務提交之后,我們就可以直接去看submit方法的實現(xiàn)了,通過submit方法來提交任務了3.6、查看SparkSubmit當中的runMain方法上面通過SparkSumit當中的doRunMain方法,執(zhí)行到了runMain方法,查看runMain方法里面的任務具體提交過程 在上述的runMain方法當中,執(zhí)行了一行代碼valval(childArgs,childClasspath,sparkConf,childMainClass)=prepareSubmitEnvironment(args)這一行代碼至關重要,創(chuàng)建了一個childMainClass這個屬性值,有了這個屬性值,才能繼續(xù)給下方的mainClass進行賦值3.6.1、查看SparkSubmit的prepareSubmitEnvironment這個方法在prepareSubmitEnvironment這個方法當中給childMainClass進行了賦值 通過提交模式為yarn模式,將childMainclass賦值為了org.apache.spark.deploy.yarn.YarnClusterApplication//Inyarn-clustermode,useyarn.Clientasawrapperaroundtheuserclass/***判斷如果是yarn集群模式,那么給childMainClass賦值為org.apache.spark.deploy.yarn.YarnClusterApplicationif(isYarnCluster){childMainClass=YARN_CLUSTER_SUBMIT_CLASSif(args.isPython){childArgs+=("--primary-py-file",args.primaryResource)childArgs+=("--class","org.apache.spark.deploy.PythonRunner")}elseif(args.isR){valmainFile=newPath(args.primaryResource).getNamechildArgs+=("--primary-r-file",mainFile)childArgs+=("--class","org.apache.spark.deploy.RRunner")}else{if(args.primaryResource!=SparkLauncher.NO_RESOURCE){childArgs+=("--jar",args.primaryResource)}childArgs+=("--class",args.mainClass)}if(args.childArgs!=null){args.childArgs.foreach{arg=>childArgs+=("--arg",arg)}}}這樣就得到了childMainclass的最終結果值為org.apache.spark.deploy.yarn.YarnClusterApplication3.7、查看SparkSubmit當中的runMain方法的app.start方法 上面通過childMainclass的最終結果值為org.apache.spark.deploy.yarn.YarnClusterApplication,然后執(zhí)行了app.start查看SparkSubmit當中的runMain方法里面的app.start方法3.8、查看YarnClusterApplication當中的start方法調用了start方法之后其實就是調用了org.apache.spark.deploy.yarn.YarnClusterApplication當中的start方法通過IDE的快捷鍵ctrl+shift+alt+N在SparkSource工程當中查找YarnClusterApplication這個類3.8.1、查看newClient創(chuàng)造Client對象創(chuàng)建了Client對象,在client對象的構造器當中,申明了一個yarnClient對象,這個對象的創(chuàng)建調用了YarnClient.createYarnClient這個方法3.8.2、查看YarnClient對象的創(chuàng)建查看YarnClient.createYarnClient這個方法,其實就是通過new創(chuàng)建了一個YarnClient對象 3.8.3、查看YarnClientImpl的對象創(chuàng)建通過newYarnClientImpl();來創(chuàng)建了YarnClientImpl這個對象,查看這個對象的創(chuàng)建初始化方法 這個方法又調用了super(YarnClientImpl.class.getName());這一行代碼3.9、查看Client當中的run方法前面通過YarnClusterApplication當中的start方法創(chuàng)建了Client對象,然后調用了run方法 查看run方法的具體實現(xiàn)類容如下:3.10、查看submitApplication方法的具體實現(xiàn)在Client當中,通過submitApplication正式向ResourceManager提交了任務 appContext來源于上面定義的一個變量3.10.1、查看Client當中的appContext的內容定義 在Client當中執(zhí)行submitApplication方法來提交任務的時候,該方法當中通過yarnClient.submitApplication(appContext)這一句代碼來實現(xiàn)了任務的提交,在提交任務的時候攜帶了一個參數(shù)叫做appContext查看createApplicationSubmissionContext方法3.10.1、查看Client當中的containerContext前面已經(jīng)看過了appContext當中主要就是定義解析一些參數(shù),接下來就繼續(xù)來查看 valvalcontainerContext=createContainerLaunchContext(newAppResponse)這一行代碼當中的創(chuàng)建容器的方法,查看createContainerLaunchContext方法實現(xiàn)如下4、查看ApplicationMaster的啟動流程 前面找到了我們通過java命令行的方式啟動了一個java進程,叫做ApplicationMaster,這個ApplicationMaster想要啟動執(zhí)行,肯定也是有一個main方法的執(zhí)行入口,通過main方法的執(zhí)行入口來執(zhí)行程序的啟動,使用idea的快捷鍵ctrl+alt+shift+N打開ApplicationMaster,然后找到Object伴生對象當中的main方法4.1、查看ApplicationMaster類當中的main方法下面的ApplicationMasterArguments類的初始化在ApplicationMaster類當中的main下面執(zhí)行了一行代碼 valvalamArgs=newApplicationMasterArguments(args),這一句代碼創(chuàng)建了一個ApplicationMasterArguments這個對象,這個對象專門用于解析前面?zhèn)魅脒^來的參數(shù),查看ApplicationMasterArguments類的初始化4.2、查看ApplicationMaster在main方法當中創(chuàng)建的對象ApplicationMaster在ApplicationMaster這個伴生對象的main方法當中,還執(zhí)行了一句代碼 master=newApplicationMaster(amArgs,sparkConf,yarnConf)這一句代碼主要就是創(chuàng)建了ApplicationMaster這個對象,這個對象的初始化創(chuàng)建過程如下在創(chuàng)建ApplicationMaster這個對象的時候,可以看到在里面初始化了一個YarnRMClient這個對象,這個對象其實就是一個中間的橋梁,用于連接ApplicationMaster與ResourceManager。4.3、查看ApplicationMaster中的main方法里面執(zhí)行的master.run方法在ApplicationMaster當中的main方法里面,執(zhí)行了一行代碼如下:ugiugi.doAs(newPrivilegedExceptionAction[Unit](){overridedefrun():Unit=System.exit(master.run())這一行代碼當中執(zhí)行了master.run,其實就是開始運行ApplicationMaster了,我們可以查看run方法的具體實現(xiàn)內容如下4.3.1、查看run方法當中的runDriver方法的具體實現(xiàn)在ApplicationMaster的run方法當中,執(zhí)行了一段代碼如下 if(isClusterMode){runDriver()}else{runExecutorLauncher()}判斷如果是集群模式,那么就執(zhí)行了runDriver這個方法,通過runDriver這個方法,啟動了driver程序判斷如果是集群模式,那么就執(zhí)行了runDriver這個方法,通過runDriver這個方法,啟動了driver程序runDriver當中主要運行了兩條任務線第一條是跟資源相關的資源申請線第二條是跟用戶程序執(zhí)行相關的程序執(zhí)行線那么我們可以去看一下runDrvier方法的具體實現(xiàn)內容如下在runDriver方法當中,主要執(zhí)行了啟動用戶的程序,等待sparkContext對象的創(chuàng)建,注冊ApplicationMaster以及分配container等多個動作。其中runDriver主要分為兩條線往下運行第一條線:啟動集群環(huán)境,申請運行資源相關的第二條線:resumeDriver保證用戶代碼繼續(xù)往下執(zhí)行4.3.2、查看runDriver方法當中的startUserApplication方法的具體實現(xiàn)在runDriver方法當中的具體實現(xiàn)里面,執(zhí)行了一句代碼 userClassThread=startUserApplication()通過這一行代碼啟動了用戶的應用程序,其中startUserApplication方法的具體實現(xiàn)如下4.3.3、查看runDriver方法當中的createAllocator方法具體實現(xiàn)在ApplicationMaster的runDriver方法當中,執(zhí)行了一行代碼如下 createAllocatorcreateAllocator(driverRef,userConf,rpcEnv,appAttemptId,distCacheConf)這一行代碼主要就是在進行container的分配,該方法的具體實現(xiàn)如下:4.3.4、繼續(xù)查看createAllocator方法當中的allocateResources方法內部實現(xiàn)在上面ApplicationMaster當中調用createAllocator方法當中,執(zhí)行了一行代碼如下 allocatorallocator.allocateResources()在allocateResources方法的具體實現(xiàn)如下4.3.5、查看allocateResources方法內部的handleAllocatedContainers方法的具體實現(xiàn)在YarnAllocator當中,我們看到在執(zhí)行allocateResources的時候,我們通過 valallocatedContainers=allocateResponse.getAllocatedContainers()這一行代碼獲取到了所有的可分配的container,然后通過下一行代碼if(allocatedContainers.size>0){logDebug(("Allocatedcontainers:%d.Currentexecutorcount:%d."+"Launchingexecutorcount:%d.Clusterresources:%s.")allocatedContainers.size,sizenumExecutorsStarting.get,allocateResponse.getAvailableResources))handleAllocatedContainers(allocatedContainers.asScala)}使用了handleAllocatedContainers(allocatedContainers.asScala)來進行容器container的分配,handleAllocatedContainers的方法的具體實現(xiàn)如下4.3.5、查看handleAllocatedContainers方法當中的runAllocatedContainers方法內部具體實現(xiàn)在YarnAllocator當中執(zhí)行handleAllocatedContainers方法時,執(zhí)行了一行代碼如下 runAllocatedContainers(containersToUse)這一行代碼主要就是在進行運行分配好了的container了,這個runAllocatedContainers方法的具體實現(xiàn)內容如下該方法的具體實現(xiàn)如下4.3.6、查看線程的執(zhí)行的run方法 在YarnAllocator當中執(zhí)行runAllocatedContainers方法的時候,使用了線程池launcherPool的方式進行執(zhí)行,最后調用run方法代碼如下newExecutorRunnable(Some(container),conf,sparkConf,driverUrl,executorId,ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID//useuntilfullysupported).run()run方法的具體內容如下4.3.7、查看startContainer方法的具體實現(xiàn) 在上面ExecutorRunnable當中,執(zhí)行run方法的時候,在run方法當中初始化了NodeManagerClient這個對象,這個對象主要是是用于與NodeManager進行通信,通過客戶端與NodeManager服務端進行通信,然后通過調用startContainer方法在NodeManager上面創(chuàng)建Exector進程,在run方法內部有一行代碼執(zhí)行如下startContainer()4.3.8、查看在ExecutorRunnable當中運行startContainer方法調用prepareCommand方法的過程在ExecturoRunnable當中執(zhí)行startContainer方法的時候,調用了一行代碼valvalcommands=prepareCommand()這一行代碼主要就是在準備java的一個命令,通過java-server的方式來啟動另外一個java進程,查看prepareCommand方法的具體實現(xiàn)如下ecutor 前面看到了我們通過java命令啟動了一個Executor的進程,那么就是執(zhí)行了org.apache.spark.executor.YarnCoarseGrainedExecutorBackend這個類當中的main方法,啟動了java的一個進程,那么我們就可以直接去找這個類的main方法去查看這個進程的啟動過程,使用idea的ctrl+alt+shift+N快捷鍵來查找YarnCoarseGrainedExecutorBackend這個類,并找到它的main方法作為程序的入口類5.1、查看YarnCoarseGrainedExecutorBackend這個Object當中main方法的run方法執(zhí)行在YarnCoarseGrainedExecutorBackend當中執(zhí)行了main方法,在main方法里面執(zhí)行了 CoarseGrainedExecutorBackendCoarseGrainedExecutorBackend.run(backendArgs,createFn)這樣一行代碼,這樣一行代碼當中執(zhí)行了run方法,查看run方法的具體實現(xiàn)如下5.2、查看run方法當中的setupEndPoint方法的具體實現(xiàn) 在上面CoarseGrainedExecutorBackend這個class類當中在執(zhí)行run方法的時候,run方法當中有一行代碼如下env.rpcEnv.setupEndpoint("Executor",backendCreateFn(env.rpcEnv,arguments,env,cfg.resourceProfile))其中通過sparkEnv對象,調用了rpcEnv這個屬性,這個對象來調用了setupEndPoint這個方法,這個方法的具體實現(xiàn)代碼如下,通過idea的快捷鍵ctrl+shift+h快捷鍵可以看到該方法的具體實現(xiàn)內容如下通過消息轉發(fā)器dispatcher來調用了registerRpcEndpoint5.3、查看setupEndpoint方法當中的registerRpcEndpoint方法的具體實現(xiàn)在上面NettyRpcEnv當中調用了setupEndpoint方法的時候,執(zhí)行了一行代碼如下 dispatcher.registerRpcEndpoint(name,endpoint)通過消息轉發(fā)器dispatcher來注冊了Rpc的Endpoint這個終端,registerRpcEndpoint方法的具體實現(xiàn)內容如下5.4、查看registerRpcEndpoint方法當中的DedicatedMessageLoop對象創(chuàng)建過程在registerRpcEndpoint方法當中,執(zhí)行了一段代碼如下varmessageLoop:MessageLoop=nulltry{messageLoop=endpointmatch{casee:IsolatedRpcEndpoint=>newDedicatedMessageLoop(name,e,this)case_=>sharedLoop.register(name,endpoint)sharedLoop}endpoints.put(name,messageLoop)}catch{caseNonFatal(e)=>moveendpointthrowe}在這一行代碼當中,創(chuàng)建了一個對象DedicatedMessageLoop,這個對象當中創(chuàng)建了inbox收件箱對象,主要用于收取消息,對象的創(chuàng)建過程如下在上面DedicatedMessageLoop對象創(chuàng)建的時候,我們會發(fā)現(xiàn)調用了一行代碼如下: privateprivatevalinbox=newInbox(name,endpoint)這里其實就是創(chuàng)建了一個Inbox對象,通過Inbox這個對象來實現(xiàn)數(shù)據(jù)的接受,Inbox的具體實現(xiàn)內容如下在inbox這個對象初始化的時候調用了//OnStartshouldbethefirstmessagetoprocessinbox.synchronized{messages.add(OnStart)}其實就是通過LinkedList給InboxMessage添加了一個onStart的事件進去了,這就涉及到通信的生命周期,在spark框架當中,通信的總的接口定義在了RpcEndpoint這個trait當中。在RpcEndpoint這個trait當中定義了一系列的生命周期的順序,在RpcEndpoint當中是這樣定義注釋的其中CoarseGrainedExecutorBackend這個類也是RpcEndPoint的子類調用onStart之后,執(zhí)行CoarseGrainedExecutorBackend的onStart()方法邏輯如下5.6、查看CoarseGrainedExecutorBackend類的onStart方法 前面在看到了通過inbox調用了onStart事件,然后inBox是DedicatedMessageLoop當中定義的對象,DedicatedMessageLoop又是YarnCoarseGrainedExecutorBackend當中定義的對象,而YarnCoarseGrainedExecutorBackend又是CoarseGrainedExecutorBackend的子類,所以這里通過inbox調用了onStart之后,就會執(zhí)行CoarseGrainedExecutorBackend當中的onStart方法,onStart的方法的具體定義如下在上面onStart的方法內容定義如下overridedefonStart():Unit={logInfo("Connectingtodriver:"+driverUrl)try{_resources=parseOrFindResources(resourcesFileOpt)}catch{caseNonFatal(e)=>exitExecutor(1,"Unabletocreateexecutordueto"+e.getMessage,e)}rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap{ref=>//Thisisaveryfastactionsowecanuse"ThreadUtils.sameThread"driver=Some(ref)//向Driver當中注冊Executor通過RegisterExecutor樣例類的方式來包裝了發(fā)送的消息出去ref.ask[Boolean](RegisterExecutor(executorId,self,hostname,cores,fileid}(ThreadUtils.sameThread).onComplete{caseSuccess(_)=>self.send(RegisteredExecutor)caseFailure(e)=>exitExecutor(1,s"Cannotregisterwithdriver:$driverUrl",e,notifyDriver=false)}在這個方法當中調用了RegisterExecutor,通過RegisterExecutor來封裝了樣例類,所以這里發(fā)送消息,一定會有個地方能接收到消息ref.ask[Boolean](RegisterExecutor(executorId,self,hostname,cores,通過RegisterExecutor其實就是向Driver端進行通信,向Driver端進行通信注冊Executor,方便Driver端后續(xù)做DAG的劃分以及task的分解,將分解之后的task運行在Executor上面,既然是向Driver端進行注到Driver端的響應以及回復5.7、簡單查看Driver端的消息接收以及回復過程sparkContext對象當中有一個屬性叫做SchedulerBackend SchedulerBackend其實就是我們的通信后臺,SchedulerBackend是一個trait,可以找到他的實現(xiàn)類為CoarseGrainedSchedulerBackend5.8、查看CoarseGrainedSchedulerBackend當中消息接收以及回復receiveAndReply方法 我們通過SchedulerBackend找到他的實現(xiàn)類為CoarseGrainedSchedulerBackend,在這會進行通信,里面有一個發(fā)方法叫做receiveAndReply這個方法,專門用于接收消息,并進行回復的,該方法的定義內容如下其中CoarseGrainedExecutorBackend與CoarseGrainedSchedulerBackend的請求以及響應關系如下圖CoarseGrainedExecutorBackend與CoarseGrainedSchedulerBackend通信流程.drawio至此,通過以上的通信環(huán)境,我們整個的運行環(huán)境全部創(chuàng)建成功,有了driver有了executor,有了resourceManager,有了nodeManager等,就可以進行任務的計算了,至此,在啟動ApplicationMaster的時候,第一條線路當中的資源環(huán)境問題就已經(jīng)正式啟動成功。剩下的就是第二條線,運行用戶代碼的問題了。4、spark任務組件之間的通信源碼深入剖析1、通信基本概念介紹NIO,AIO等這幾種通信模型的話,那么我們需要先了解阻塞和非阻塞,同步和非同步的概念阻塞和非阻塞是指進程在訪問數(shù)據(jù)的時候,數(shù)據(jù)內部是否準備就緒的一種處理方式。當數(shù)據(jù)沒有準備的時候阻塞:需要等待緩沖區(qū)的數(shù)據(jù)準備好才去處理之后的事情,否則一直等待下去非阻塞:無論緩沖區(qū)的數(shù)據(jù)是否準備好,都立刻返回同步和異步都是基于應用程序和操作系統(tǒng)處理IO時間鎖采用的方式。同步:應用程序要直接參與IO讀寫的操作,在處理IO事件的時候必須阻塞在某個方法上的等待我們IO完成的時間(阻塞IO事件或者通過輪詢IO事件的方式)。阻塞直到IO事件遇到write或者read,這個時候我們不能做任何我們想去做的事情,讓讀寫方法加入到線程中,通過阻塞線程來實現(xiàn),這樣對線程的性大。 異步:所有的IO讀寫都交給操作系統(tǒng)處理,此時應用程序可以處理其他事情,當操作系統(tǒng)完成IO后給應用程序一個通知即可。3、socket通信介紹socket: Socket是應用層與TCP/IP協(xié)議族通信的中間軟件抽象層,它是一組接口。在設計模式中,Socket其實就是一個門面模式,它把復雜的TCP/IP協(xié)議族隱藏在Socket接口后面,對用戶來說,一組簡單的接口就是全部,讓Socket去組織數(shù)據(jù),以符合指定的協(xié)議先從服務器端說起。服務器端先初始化Socket,然后與端口綁定(bind),對端口進行監(jiān)聽(listen),調用accept阻塞,等待客戶端連接。在這時如果有個客戶端初始化一個Socket,然后連接服務器(connect),如果連接成功,這時客戶端與服務器端的連接就建立了。客戶端發(fā)送數(shù)據(jù)請求,服務器端接收請求并處理請求,然后把回應數(shù)據(jù)發(fā)送給客戶端,客戶端讀取數(shù)據(jù),最后關閉連接,一次交互結束優(yōu)點1)傳輸數(shù)據(jù)為字節(jié)級,傳輸數(shù)據(jù)可自定義,數(shù)據(jù)量小(對于手機應用講:費用低)2)傳輸數(shù)據(jù)時間短,性能高3)適合于客戶端和服務器端之間信息實時交互4)可以加密,數(shù)據(jù)安全性強1)需對傳輸?shù)臄?shù)據(jù)進行解析,轉化成應用級的數(shù)據(jù)2)對開發(fā)人員的開發(fā)水平要求高3)相對于Http協(xié)議傳輸,增加了開發(fā)量4)效率低下同步阻塞I/O模式,全稱BlockingIO,數(shù)據(jù)的讀取寫入必須阻塞在一個線程內等待其完成 它是基于流模型實現(xiàn)的,交互的方式是同步、阻塞方式,也就是說在讀入輸入流或者輸出流時,在讀寫動作完成之前,線程會一直阻塞在那里,它們之間的調用時可靠的線性順序。它的優(yōu)點就是代碼比較簡單、直觀;缺點就是IO的效率和擴展性很低,容易成為應用性能瓶頸。主要最大的缺點就是會對我們的操作進行阻塞,例如我們去醫(yī)院看醫(yī)生,需要排隊取號,如果中途過號了,那么對不起,過號不厚,要么退號,要么重新掛號,這就限制了我們取了號之后非得要在醫(yī)院里面等著啥也不能干,盡管你不知道你得要等多久,所以這種阻塞式的IO非常不友好,會浪費我們大量的時間采用BIO通信模型的服務端,通常由一個獨立的Acceptor線程負責監(jiān)聽客戶端的連接,接收到客戶端連接之后為客戶端連接創(chuàng)建一個新的線程處理請求消息,處理完成之后,返回應答消息給客戶端,線程銷。該架構最大的問題就是不具備彈性伸縮能力,當并發(fā)訪問量增加后,服務端的線程個數(shù)和并發(fā)訪問數(shù)成線性正比,由于線程是JAVA虛擬機非常寶貴的系統(tǒng)資源,當線程數(shù)膨脹之后,系統(tǒng)的性能急劇下降,隨著并發(fā)量的繼續(xù)增加,可能會發(fā)生句柄溢出、線程堆棧溢出等問題,并導致服務器最終宕機。 NIO:一種非阻塞式通信模式,線程在執(zhí)行這個通信業(yè)務過程中,如果有一個環(huán)節(jié)沒有準備好,那么線程可以去執(zhí)行其他任務,線程占用的情況大幅度釋放。例如我們去飯店吃飯,以前沒有外賣的時候,我們去飯店吃飯得要排隊等著,等著廚師把我們的飯菜做好了,然后我們才能開吃,這種模型就類似于BIO,阻塞式IO,效率極其低下,為了節(jié)約等待的時間,等待的時候我們可以去打球,每隔一會兒回來問一下老板我們的飯菜有沒有做好 傳統(tǒng)的IO操作面向數(shù)據(jù)流,意味著每次從流中讀一個或多個字節(jié),直至完成,數(shù)據(jù)沒有被緩存在任何地方。NIO操作面向緩沖區(qū),數(shù)據(jù)從Channel讀取到Buffer緩沖區(qū),隨后在Buffer中處理數(shù)據(jù)。利用Buffer讀寫數(shù)據(jù),通常遵循四個步驟:1.把數(shù)據(jù)寫入buffer;2.調用flip;3.從Buffer中讀取數(shù)據(jù);4.調用buffer.clear()當寫入數(shù)據(jù)到buffer中時,buffer會記錄已經(jīng)寫入的數(shù)據(jù)大小。當需要讀數(shù)據(jù)時,通過flip()方法把buffer從寫模式調整為讀模式;在讀模式下,可以讀取所有已經(jīng)寫入的數(shù)據(jù)。2、channel通道javaNIOChannel通道和流非常相似,主要有以下幾點區(qū)別:1.通道可以讀也可以寫,流一般來說是單向的(只能讀或者寫)。2.通道可以異步讀寫。3.通道總是基于緩沖區(qū)Buffer來讀寫。正如上面提到的,我們可以從通道中讀取數(shù)據(jù),寫入到buffer;也可以中buffer內讀數(shù)據(jù),寫入到通道有個示意圖:Channel有:1.FileChannel2.DatagramChannel3.SocketChannel4.ServerSocketChannelTCP的數(shù)據(jù)讀寫。ServerSocketChannel允許我們監(jiān)聽TCP鏈接請求,每個請求會創(chuàng)建會一個SocketChannel。3、selector選擇器如此可以實現(xiàn)單線程管理多個channels,也就是可以管理多個網(wǎng)絡鏈接。通過上面的了解我們知道Selector是一種IOmultiplexing的情況。下面這幅圖描述了單線程處理三個channel的情況:1、多個Client同時注冊到多路復用器selector上;2、selector遍歷所有注冊的通道;3、查看通道狀態(tài)(包括Connect、Accept、Read、Write);4、根據(jù)狀態(tài)執(zhí)行相應狀態(tài)的操作; 1.由一個專門的線程來處理所有的IO事件,并負責分發(fā)。2.事件驅動機制:事件到的時候觸發(fā),而不是同步的去監(jiān)視事件。3.線程通訊:線程之間通過wait,notify等方式通訊。保證每次上下文切換都是有意義的。減少無謂6、AIO通信模型介紹(異步非阻塞式IO) JDK1.7升級了NIO類庫,升級后的NIO類庫被稱為NIO2.0。也就是我們要介紹的AIO。NIO2.0引入了新的異步通道的概念,并提供了異步文件通道和異步套接字通道的實現(xiàn)。還是以吃飯為例,我們有了外賣之后,我們就徹底與老板進行解耦了,我們下單老板做飯,然后外賣員給我們送過來,在這等待老板做飯的時間,我們可以繼續(xù)去做別的事情,吃飯等待的時間也被我們利用起來了,更加提高了效率用戶程序可以通過向內核發(fā)出I/O請求命令,不用等帶I/O事件真正發(fā)生,可以繼續(xù)做另外的事情,等I/O操作完成,內核會通過函數(shù)回調或者信號機制通知用戶進程。這樣很大程度提高了系統(tǒng)吞吐量。異步通道提供兩種方式獲取操作結果。(1)通過Java.util.concurrent.Future類來表示異步操作的結果;(2)在執(zhí)行異步操作的時候傳入一個Java.nio.channels.CompletionHandler接口的實現(xiàn)類作為操作完成的回調。NIO2.0的異步套接字通道是真正的異步非阻塞IO,它對應UNIX網(wǎng)絡編程中的事件驅動IO(AIO),它不需要通過多路復用器(Selector)對注冊的通道進行輪詢操作即可實現(xiàn)異步讀寫,從而簡化了NIO的編程模我們可以得出結論:異步SocketChannel是被動執(zhí)行對象,我們不需要想NIO編程那樣創(chuàng)建一個獨立的IO線程來處理讀寫操作。對于AsynchronousServerSocketChannel和AsynchronousSocketChannel,它們都由JDK底層的線程池負責回調并驅動讀寫操作。正因為如此,基于NIO2.0新的異步非阻塞Channel進行編程比NIO編程更為簡單。信模型的對比 由上述總結得出,并不意味著所有的Java網(wǎng)絡編程都必須要選擇NIO和Netty,具體選擇什么樣的IO模型或者NIO框架,完全基于業(yè)務的實際應用場景和性能訴求,如果客戶端并發(fā)連接數(shù)不多,周邊對接的網(wǎng)元不多,服務器的負載也不重,那就完全沒必要選擇NIO做服務端;如果是相反情況,那就考慮選擇合NIO行開發(fā)。定義bio服務端
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經(jīng)權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
- 6. 下載文件中如有侵權或不適當內容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 當事人誠信悔過書3篇
- 代簽委托書在出口退稅中的應用3篇
- 卷簾門安裝協(xié)議格式3篇
- 迎新生的精彩演講稿(4篇)
- 2025求職簡歷里的自我評價(11篇)
- 景區(qū)旅游信息化應用水平提升考核試卷
- 園林綠化合同范本(4篇)
- 社會救助住宿服務滿意度調查考核試卷
- 白酒的品牌形象與市場認可研究考核試卷
- 綠色交通與城市出行方式的投資策略與前景預測考核試卷
- 宿舍樓施工方案方案
- 甲醇-水精餾塔
- 中國話劇史專題知識
- GB/T 15544.1-2023三相交流系統(tǒng)短路電流計算第1部分:電流計算
- GB/T 90.3-2010緊固件質量保證體系
- GB/T 18799-2020家用和類似用途電熨斗性能測試方法
- 科技公司涉密計算機軟件安裝審批表
- GA/T 1369-2016人員密集場所消防安全評估導則
- GA 1517-2018金銀珠寶營業(yè)場所安全防范要求
- FZ/T 64014-2009膜結構用涂層織物
- 高考試卷命題設計的技巧 課件24張
評論
0/150
提交評論