




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
一、自我介XXX2019XXXX大學(xué)通信工程專業(yè)的畢業(yè)生。我的上為我在大學(xué)期間學(xué)了一些計(jì)算機(jī)相關(guān)的知識(shí),包括計(jì)算機(jī)網(wǎng)絡(luò)、數(shù)據(jù)結(jié)構(gòu)、C語言、Java語言等計(jì)算機(jī)相關(guān)知識(shí)。在大二大三的時(shí)候我做的是電子信息相關(guān)的嵌在那之后開始對(duì)大數(shù)據(jù)這個(gè)方向非常感于是決定想往大數(shù)據(jù)這個(gè)方向發(fā)展,所以在后續(xù)打大學(xué)生涯里先后學(xué)習(xí)了大數(shù)據(jù)相關(guān)的框架以及各種組件,比如用于分布式和計(jì)算的Hadoop框架及其生態(tài)(離線計(jì)算的Mapreduce分布式系統(tǒng)Hdfs、用于資源調(diào)度的Yarn、數(shù)據(jù)層的Hive數(shù)據(jù)查詢)、用于傳輸和數(shù)據(jù)的Flum、Kafka、用于任務(wù)調(diào)度的Azkaban、以及分布式實(shí)時(shí)計(jì)算框架如FlinkSparkstreaming實(shí)時(shí)計(jì)算。后來在畢業(yè)以后進(jìn)了一家公司做數(shù)據(jù)開發(fā)崗因?yàn)閯傔M(jìn)入公司的時(shí)候公01們部門所做的項(xiàng)目是對(duì)公司的平臺(tái)的業(yè)務(wù)數(shù)據(jù)和日志數(shù)據(jù)做分析和處理為離線指標(biāo)分析計(jì)算以及實(shí)時(shí)分析計(jì)算數(shù)據(jù)質(zhì)量等待一系列流程。二、從服務(wù)器開始規(guī)劃大數(shù)物理 云主和2T固態(tài)硬盤(SSD,單臺(tái)報(bào)價(jià)4W出頭,惠普品牌。一般物理機(jī)②5W①物理機(jī)需要有專門的運(yùn)維人員大概每個(gè)月需要1W的薪資支付,②阿里云主機(jī):運(yùn)維工作基本上由阿里的工作人員完成,運(yùn)維相對(duì)比較輕數(shù)據(jù)量統(tǒng)每日活躍用戶:100萬人每人產(chǎn)生日志數(shù)量:100條每天產(chǎn)生日志數(shù)量:100萬*100條=1每條日志數(shù)據(jù)大小:0.5-2k之間,平均每條1k左右每天數(shù)據(jù)量大小:1億*1k 大概100G左右ods層:100Glzo10Gdwd層:采用lzo壓縮+parquet列式后10G左dws/dwt層 dws+dwt(輕度聚合 為了快速計(jì)算不采用壓50Gods半年不擴(kuò)容:210G*18020%~30%2:Kafka1100G2、2個(gè)副 600G/0.7=約3:Flume1100101K左右,10萬*10條*1k=1G2、數(shù)倉4層(不壓縮)3、保存三個(gè)副 4、半年不擴(kuò) 5、預(yù)留 集群規(guī)模:服務(wù)器臺(tái) 56T/8T=7臺(tái)服務(wù)根據(jù)數(shù)據(jù)規(guī)模進(jìn)行集群規(guī)1234567C①NNMysql2dn②zkkf發(fā)在相同節(jié)點(diǎn)。客戶端的服務(wù)盡量安裝在相同節(jié)點(diǎn),方便使用,數(shù)量>=1即hivespark2個(gè)。Spark③ESHBase分開安裝,HBaseES耗空間。Flume1.3.3離線測(cè)試集群環(huán)境規(guī)劃(3臺(tái)√√√√√√√√√√√√√Flume(√√√√√√√√√√Azkaban√√√873產(chǎn)版特分布產(chǎn)版特分布式,資源調(diào)度,離線計(jì)HDFS接口0.10后,sparkstreamingKafka實(shí)時(shí)Kafka的性能狀認(rèn)證/(權(quán)限成功和失敗會(huì)發(fā)電子郵件、 提易于使用的Wev分布式,可擴(kuò)展,海量數(shù)據(jù)的NoSQL數(shù)據(jù)Hbase的開源SQL皮膚,支持HbaseCEs基于內(nèi)存運(yùn)算,比Hadoop100支持Java,PythonScala的極易方便和Hadoop(1)數(shù)據(jù)來通過Nginx把數(shù)據(jù)均勻的分配到各個(gè)Tomcat服務(wù)Nginx日志服務(wù)器數(shù)據(jù)-Flume實(shí)時(shí)的服務(wù)器過來的數(shù)據(jù),三臺(tái)Flume分別三臺(tái)日志服務(wù)器上的數(shù)據(jù),source使用的是tailDirsource實(shí)現(xiàn)了斷點(diǎn)續(xù)傳,多。Flume1.6Source記錄每次Channel選擇了KafkaChannel,FlumeSink為Kafka,所以使用Kafka作為channel從而省去了Sink,效率大大的提高。同時(shí)KafkaChannel的數(shù)據(jù)是在Kafka里面的,所以數(shù)據(jù)在磁盤里面。發(fā)往不同Kafka在針對(duì)過來的數(shù)據(jù)通過自定義啷個(gè)Flume分別是ETL器,還有用于區(qū)分日志類型的器ETL器用于過濾叼時(shí)間戳不合法和Json數(shù)據(jù)不完整的數(shù)據(jù)然后日志類型區(qū)分器主要用于講啟動(dòng)的日志和日志區(qū)KafkaTopicl里面。Kafka作為消息中間件,它保證我們公司數(shù)據(jù)在傳輸?shù)臅r(shí)候的安全可靠Kafka,因?yàn)樗纳a(chǎn)者生產(chǎn)的數(shù)據(jù)要寫入磁盤,PageCacheKafkaack=1的應(yīng)答機(jī)制,也就是說我的數(shù)據(jù)在leader節(jié)點(diǎn)落盤以后就直接返回ack應(yīng)答給生ack以后便會(huì)進(jìn)行下一輪的數(shù)據(jù)發(fā)送,否則數(shù)據(jù)會(huì)重新發(fā)送,ack=1kafka的性能。同時(shí)在選擇Kafka的機(jī)器數(shù)量的時(shí)候,因?yàn)榻o了Kafka一個(gè)壓力測(cè)試們的測(cè)試發(fā)現(xiàn)Kafka的生產(chǎn)者峰值生產(chǎn)速度為50M/s,并且我們以2個(gè)副本為生Kafka2*(50*2/100)+1=3臺(tái);Flume消費(fèi)因?yàn)槲覀兊臄?shù)據(jù)后面要傳輸?shù)紿DFS當(dāng)中所以我們選擇了Flume為Kafka的KafkaflumechannelMemoryChannel,因?yàn)镸emoryChannelhdfssink使用了hdfssink,并把數(shù)據(jù)按照日期在hdfs上。業(yè)務(wù)數(shù)據(jù)從Flume->Mysql-MysqlSinkSqoopMysqlHdfs數(shù)據(jù)在Flume傳到Hdfs時(shí)對(duì)小文件的處理措FlumehdfsSinkhdfs通過調(diào)節(jié)參數(shù) hdfs當(dāng)中Flume相Flume常規(guī)配FileChannel通過配置dataDirs指向多個(gè)路徑,每個(gè)路徑對(duì)應(yīng)不同的硬盤,增大Flume吞checkpointDir和backupCheckpointDir也盡量配置在不同硬盤對(duì)應(yīng)的 保證checkpoint壞掉后,可以快速使用backupCheckpointDir恢復(fù)數(shù)據(jù)Flume掛Flume如果掛掉,項(xiàng)目主要從以下方面找的原因taildirsourcesource支持?jǐn)帱c(diǎn)續(xù)傳和多存下游數(shù)倉ods->dwd的時(shí)候去重或者使用SparkStreaming去重。所以如果是taildirsource掛了,重啟恢復(fù)工作就可以了;memorychannelmemorychannel每批次傳輸?shù)臄?shù)據(jù)100event,flume即可;sink掛掉了,需要排查對(duì)應(yīng)配置文件的參數(shù)或者配置文件本身2.1.4Flume在項(xiàng)目里我們使用了2個(gè)器,一個(gè)是ETL器,另一個(gè)則是分日志類型的器ETL器:主要用于過濾一些不符合Json數(shù)據(jù)格式和一些關(guān)鍵字段日志分類器,根據(jù)日志的啟動(dòng)類型,通過器給讓數(shù)據(jù)發(fā)送到kafka的不同當(dāng)中;Interceptor接口并實(shí)現(xiàn)四個(gè)方法,一個(gè)是eventInterceptor.Builder,然后打包上傳到Flume的lib下,并在配置文件中進(jìn)行關(guān)聯(lián)器GangliaGangliaFlume發(fā)現(xiàn)嘗試提交的次數(shù)大于最終成功的次Flume4-Flume臺(tái)數(shù)(增加日志服務(wù)器)618KafkaKafka數(shù)據(jù)會(huì)不會(huì)丟失01:leader-1:leaderfollowerKafka有重復(fù)數(shù)據(jù)處理關(guān)于Kafka數(shù)據(jù)得重負(fù)可以冪等性得機(jī)制使得kafka的數(shù)據(jù)實(shí)現(xiàn)分區(qū)mysql進(jìn)行綁定,實(shí)現(xiàn)全局?jǐn)?shù)據(jù)不重復(fù);odsdwdgroupbyRedis進(jìn)行去重;Kafka掛KafkaFlumeChannel可以緩存數(shù)據(jù)一段時(shí)間,短期30Kafka消息數(shù)據(jù)積壓,Kafka消費(fèi)能力不足怎么處理Kafka的分區(qū),提高其并發(fā)能力,相應(yīng)的下一級(jí)的消費(fèi)線程也要FlumeKafka優(yōu)數(shù)據(jù)保存冊(cè)羅策略(三天數(shù)據(jù)保存冊(cè)羅策略(三天:log.retention.hous=72replica.lag.time.max.ms=600000#如果網(wǎng)絡(luò)不好,或者kafka集群壓力較大,會(huì)出 內(nèi)存調(diào)整:exportKAFKA_HEAP_OPTS="-Xms4gHadoopHadoop宕MRYarn同時(shí)運(yùn)行的任務(wù)數(shù)以及每個(gè)任務(wù)申請(qǐng)的最大內(nèi)內(nèi)存;可以調(diào)整參數(shù)yarn.scheduler.um-allocation-mb。,如果寫入文件過量造成NameNode宕機(jī)。那么調(diào)高Kafka的大小,控制從Kafka到HDFS的寫入速度期的時(shí)候用Kafka進(jìn)行緩存期過后。,小文件處MRConbineTextInputformat將一些小文件HarFlumeFlumeSinkJVM數(shù)據(jù)傾斜問Mapcombine,xreduceshuffleIOreducekey加上一個(gè)隨機(jī)的前綴值,使大量的key均勻分散到不同的分區(qū)進(jìn)行局部聚合,然后再一次進(jìn)行YARN參數(shù)調(diào)優(yōu)yarn-單個(gè)任務(wù)可申請(qǐng)的最多物理內(nèi)存量,默認(rèn)是8192(MB。備注:根據(jù)任SqoopSqoop導(dǎo)入導(dǎo)出Null一致性問Hive中的Null在底層是“\N來而MySQL中的Null在底層就是Null,為了保證數(shù)據(jù)兩端的一致性。在導(dǎo)出數(shù)據(jù)時(shí)采用--input-null-string和--input-null-non-string兩個(gè)參數(shù)。導(dǎo)入數(shù)據(jù)時(shí)采用--null-string和--null-non-string。Sqoop數(shù)據(jù)導(dǎo)出Parquet問adsmysqladsorc而mysql并不支持,需要text格式才行;mysqladstext;Sqoop導(dǎo)入數(shù)據(jù)的時(shí)候發(fā)生數(shù)據(jù)傾SqoopROWNUM()生成一個(gè)嚴(yán)格均勻分布的字段,然后指定為分割字段split-by:按照某一列來切分表的工作單num-mappersNmap4AzkabanAzkaban進(jìn)行,這里采用了第平Onealert平臺(tái)里的提示進(jìn)行問題的解決并重新執(zhí)(數(shù)倉建模、hive優(yōu)化、數(shù)據(jù)傾斜說dwddws數(shù)據(jù)服務(wù)層、ads我們對(duì)dws數(shù)據(jù)服務(wù)層再做了一次細(xì)分,把dws層分為dws層和dwt數(shù)據(jù)1據(jù)的計(jì)算結(jié)果的復(fù)用性O(shè)DSods表、三級(jí)、二級(jí)、一級(jí)表,券領(lǐng)取表、活動(dòng)表、地區(qū)表等;DWDdwd層:該層的數(shù)據(jù)主要是講ods層的數(shù)據(jù)進(jìn)行ETL、脫敏處理然后根據(jù)維度建模1、選擇業(yè)務(wù)線:選擇感覺的、指標(biāo)需要的業(yè)務(wù)線2、粒度:選擇最細(xì)的粒度!可以由最細(xì)的粒度通過聚合的方式得到粗粒度3、確認(rèn)維度:根據(jù)3w原則確認(rèn)維度,挑選自己感的維ETL:過濾掉過期的、關(guān)鍵字段為空值的、超時(shí)的數(shù)據(jù);一般以數(shù)據(jù)的萬分之一以下的數(shù)據(jù)量為標(biāo)準(zhǔn),如果過濾的數(shù)據(jù)量超過了這個(gè)值,則需要與JavaEE工程師;、、脫敏:通過Spark以及Hive對(duì)數(shù)據(jù)的敏感信息進(jìn)行處理,如用戶的號(hào)證號(hào)等進(jìn)行加理;、、用戶行為數(shù)據(jù)的處理:解析公段;自定義UDTF(extendsGenerticUDTF->實(shí)現(xiàn)三個(gè)init(指定返回值的名稱和類型)、process(處理字段一進(jìn)多出)、close方法)。but_json_object終所需表。壓縮:采用lzo壓縮,壓縮率大,支持切片;DWSDWSDWD層數(shù)據(jù),每天輕度聚合!每天一個(gè)分區(qū)!根據(jù)業(yè)務(wù)需求進(jìn)行分建模!一般是建寬表!DWT張總表,記錄動(dòng)第一天開始至今的所有圍繞該的數(shù)據(jù)的匯總!根據(jù)業(yè)務(wù)需求進(jìn)行分建模!建的是寬表dws層數(shù)據(jù)有一些略微的區(qū)別,該層數(shù)據(jù)關(guān)注的是某用戶對(duì)應(yīng)某個(gè)的數(shù)據(jù)從開始創(chuàng)建到最后一次是什么時(shí)Dwtdws層拿數(shù)據(jù)ADSADSDWSDWT層取需要的數(shù)據(jù)!87HiveHive11個(gè)MapJoinMapJoinHiveJoin操作轉(zhuǎn)換成CommonJoinReduce階段完成join。容易發(fā)生數(shù)據(jù)傾斜。可以用MapJoin把小表mapjoinreducer處理。行列過SELECTSELECT*。Where列式采用分區(qū)技合理設(shè)置Map通常情況下,作業(yè)會(huì)通過input 產(chǎn)生一個(gè)或者多個(gè)map任務(wù)主要的決定因素有:input的文件總個(gè)數(shù),input是不是map答案是否定的。如果一個(gè)任務(wù)有很多小文件(遠(yuǎn)遠(yuǎn)小于塊大小128m則每個(gè)小文件也map數(shù)是受限的。map128mmapmap小文件進(jìn)行合MapMap數(shù):CombineHiveInputFormat具有對(duì)小文件進(jìn)行合并的功能(系統(tǒng)默認(rèn)的格式。HiveInputFormat沒有對(duì)小文件合并功能。ReduceReduce在設(shè)置Reduce個(gè)數(shù)的時(shí)候也需要考慮這兩個(gè)原則:處理大數(shù)據(jù)量利用合適的Reduce常用參//SEThive.merge.mapfilestrue;--默認(rèn)truemap-onlySEThive.merge.mapredfilestrue;--falsemap-reduce ;--SEThive.merge.smallfiles.avgsize= ;--16m該值時(shí),map-reducemerge開啟map端combiner(不影響最終業(yè)務(wù)邏輯set壓縮(選擇快的map(IO讀開啟JVM重hadoopReduce99.99%,一直不能結(jié)束。containerOOMReducerkillhiveSqlgroupbyjoinon數(shù)據(jù)傾斜產(chǎn)生原因(3個(gè)SparkHiveShuffleShufflekeyReducer節(jié)key:userid,egiter_ipip(IP表:ip, userregister_ipipip的用戶,統(tǒng)ip0。on10000%,其余城市的數(shù)據(jù)量不變。group解決數(shù)據(jù)傾斜思路(4個(gè)方面countMR,Hivecount(distinct)Reduce任groupbycount,就可以了。比如計(jì)算按用戶名去重后的reducecountselectname,count(distinctname)from//jobreducer3個(gè)。Hive默認(rèn)-1set//job,一個(gè)負(fù)責(zé)子查詢(reduce)count(1):selectcount(1)from(selectnamefromusergroupbyname)tmp;HadoopSpark都自帶了很多的參數(shù)和機(jī)制來調(diào)節(jié)數(shù)據(jù)傾斜,合理利用它們就能解決ip0的數(shù)據(jù),過濾掉keyhash,先將數(shù)據(jù)隨機(jī)打散讓它的并行度變大,再匯集Sparkshuffletask執(zhí)行特別慢的情stage如果是用 yarn-clusterSparkWebUI此外無論是使用yarn- 模式還是yarn-cluster模式我們都可以在SparkWebUI上深入看一下當(dāng)前這個(gè)stage各個(gè)task分配的數(shù)據(jù)量從而進(jìn)一步確定是不是task分配的數(shù)據(jù)tasktasktasktask運(yùn)行tasktask處理的數(shù)據(jù)量,明顯可以看到,運(yùn)行時(shí)間特別短的taskKBtaskKB的數(shù)據(jù),10倍。此時(shí)更加能夠確定是發(fā)生了數(shù)據(jù)傾斜。stagestage劃分原理,推算出stageshuffle類算stageSpark的源碼有深入的理解,這里我們可以Sparkshuffle類算子或者SparkSQLSQLshuffle的語句(groupby語句,那么就可以stage。words=lines.flatMap(_.split(""))valpairs=words.map((_,1))valwordCounts=pairs.reduceByKey(_+_)wordCounts.collect().foreach(println(_))reduceByKeyshuffle的算子,也就是說這個(gè)算子為界stage:stage0textFilemapshufflewrite操作(pairsRDDtaskkey會(huì)寫入同一個(gè)磁盤文件內(nèi)stage1reduceByKeycollectstage1task一開始運(yùn)行,就會(huì)首先執(zhí)行shuffleread操作(會(huì)從stage0的各個(gè)task所在節(jié)點(diǎn)拉取屬于自己處理的那些key,然后對(duì)同一個(gè)key進(jìn)行全局性的聚合或join等操作在這里就是對(duì)keyvalue值stage1在執(zhí)行完reduceByKey算子之后,就計(jì)算出了最終的wordCountsRDD,然后會(huì)執(zhí)collectDriver上,供我們遍歷和打印輸出。通過對(duì)單詞計(jì)數(shù)程序的分析,希望能夠讓大家了解最基本的stage劃分的原理,以及stageshufflestage的邊界處執(zhí)行的。然后我們就知道如何快速定stage對(duì)應(yīng)代碼的哪一個(gè)部分了。比如我們?cè)赟parkWebUI或者本地log中發(fā)現(xiàn),stage1的某幾個(gè)task執(zhí)行得特別慢,判定stage1出現(xiàn)了數(shù)據(jù)傾斜,那么就可以回到代碼中,定位出stage1主要包括了reduceByKeyshuffle類算子,此時(shí)基本就可以確定是是該算子導(dǎo)致了數(shù)據(jù)傾斜問題。10010stage1100stagetask某個(gè)task莫名其妙內(nèi)存溢出的情這種情況下去定位出問題的代碼就比較容易了我們建議直接看yarn- log的異常棧,或者是通過YARN查看yarn-cluster模式下的log中的異常棧。一般來說,通一般也會(huì)有shuffle類算子,此時(shí)很可能就是這個(gè)算子導(dǎo)致了數(shù)據(jù)傾斜。bukebUIaeask的運(yùn)行時(shí)間以及分配的數(shù)據(jù)量,才能確定是否是由于數(shù)據(jù)傾斜才導(dǎo)致了這次內(nèi)存溢出。key先對(duì)pairs采樣10%countByKey算子統(tǒng)計(jì)出每個(gè)keykeyvalsampledPairs=pairs.sample(false,案;(spark數(shù)據(jù)傾斜、優(yōu)化)數(shù)據(jù)來關(guān)于用戶行為數(shù)據(jù),通過Tomcat服務(wù)器到以后,使用Flume傳輸?shù)終afka當(dāng)中,然后用SparkStreaming進(jìn)行處理,再將處理的結(jié)果存到Hbase和ElasticSearch當(dāng)中,然后進(jìn)關(guān)于業(yè)務(wù)數(shù)據(jù),F(xiàn)lume以后直接傳到Mysql,然后Mysql通過C進(jìn)行實(shí)時(shí),將變化的數(shù)據(jù)傳入到Kafka,然后用SparkStreaming進(jìn)行處理,再將處理的結(jié)果存到Hbase和ElasticSearch當(dāng)中,然后進(jìn)行數(shù)據(jù)可視化;選擇 的原我們公司之所以選擇C,是因?yàn)榭紤]到Sqoop導(dǎo)入數(shù)據(jù)比較慢,沒辦法滿足實(shí)時(shí)性的要求。而C是用Java開發(fā)的基于數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱&消費(fèi)的中間件。目前,C主要支持了MySQL的Binlog解析,解析完成后才利用C 處理獲得的相關(guān)數(shù)據(jù)。C的工作原理很簡(jiǎn)單,就是把自己成Slave,假裝從Master復(fù)制數(shù)據(jù)。對(duì)于MySQLBinlog的格式,選擇的是row,保證了數(shù)據(jù)的絕對(duì)一致性。選擇基于Direct的方式的SparkStreamingkafka中的數(shù)據(jù)。esHbaseSparkStreamingkafkaDirect的方式。Receiver方式的不同之處是:Receiverbatch堆積,很容易出現(xiàn)內(nèi)存溢出的問題SparkStreamingjob會(huì)SparkStreaming的預(yù)寫日志機(jī)制(WriteAheadLog,WALDirectReceiverSpark1.3中引入的,從而能夠確保更加健KafkaconsumerapiKafkaoffset范圍的數(shù)據(jù)。簡(jiǎn)化并行:如果要多個(gè)partition,不需要?jiǎng)?chuàng)建多個(gè)輸入DStream然后對(duì)它們進(jìn)union操作。SparkKafkapartitionRDDpartitionKafka中數(shù)據(jù)。所以在Kafkapartition和RDDpartition之間,有一個(gè)一對(duì)一的映射關(guān)系。數(shù)據(jù)的,那么就可以通過Kafka的副本進(jìn)行恢復(fù)。基于direct的方式kafka的簡(jiǎn)單apiSparkStreaming自己就負(fù)責(zé)追蹤消費(fèi)的offset,checkpoint中。Spark自己一定是同步的,因此可以保證數(shù)據(jù)是消費(fèi)一次且僅消費(fèi)最后將SparkStreaming處理完的數(shù)據(jù)在Hbase和Es中ES是為用戶提供按關(guān)鍵字查詢的全文搜索功能。SparkStreaming從kafka中消費(fèi)數(shù)據(jù),根據(jù)條件進(jìn)行過濾篩選,生成日志;日ElasticSearchKibana快速搭建可視化圖形界面。其實(shí)即使不提前建立索引,ES也是可以將數(shù)據(jù)保存進(jìn)去的。這種情況,ES會(huì)根據(jù)第一ES的這種推斷往往不夠準(zhǔn)確。等。建立索引語句(包含Map)SparkSparkSparkshufflekeytasktask被分tasktask被分配的數(shù)據(jù)量都task都運(yùn)行緩慢。SparkSparkStreaming和SparkSqlDriverSparklog文件,logstageshuffle算子是哪一聚合原數(shù)shuffle絕大多數(shù)情況下,SparkHiveHive之后的昨天的數(shù)據(jù)。為了避免數(shù)據(jù)傾斜,我們可以考慮避免shuffleshuffleSparkHiveHive表中對(duì)數(shù)據(jù)進(jìn)行聚合,例如keykeyvalue用一種特殊的格式拼接到一個(gè)字符串里要進(jìn)行map操作即可,無需再進(jìn)行任何的shuffle操作。通過上述方式就避免了執(zhí)行shuffleHivekey的每一key粒度(task的數(shù)據(jù)量keykey粒度(task的數(shù)據(jù)量如果沒有辦法對(duì)每個(gè)key聚合出來一條數(shù)據(jù),在特定場(chǎng)景下,可以考慮擴(kuò)大key的聚合例如,目前有10萬條用戶數(shù)據(jù),當(dāng)前key的粒度是(省,城市,區(qū),日期我們過濾導(dǎo)致傾斜的Sparkkey進(jìn)行過濾,濾除可能導(dǎo)致數(shù)據(jù)傾斜的key對(duì)應(yīng)的數(shù)據(jù),這樣,在Spark作業(yè)中就不會(huì)發(fā)生數(shù)據(jù)shufflereduce端并行度,reducereducetasktaskreduce在大部分的shufflereduceByKey(500),這個(gè)參數(shù)會(huì)決定shuffle過程中reduce端的并行度,在進(jìn)行shuffle操作的時(shí)候,就會(huì)對(duì)應(yīng)著reducetask。SparkSQLshufflegroupby、join等,需要設(shè)置一個(gè)參數(shù),即spark.sql.shuffle.partitions,該參數(shù)代表了shufflereadtask的并行度,該值默認(rèn)是200,對(duì)于增加shufflereadtask的數(shù)量可以讓原本分配給一個(gè)task的多個(gè)key分配給多個(gè)task,task處理比原來更少的數(shù)據(jù)。舉例來說,如果原本有5keykey對(duì)應(yīng)10條數(shù)據(jù)5key都是分配給一個(gè)task的,那么這個(gè)task就要處理50條數(shù)據(jù)。而增加了shufflereadtask以后,每個(gè)task就分keytask10task的執(zhí)行時(shí)間都會(huì)變短了。reduce提高reduce端并行度并沒有從根本上改變數(shù)據(jù)傾斜的本質(zhì)和問題(方案一和方案二從根本上避免了數(shù)據(jù)傾斜的發(fā)生shufflereducetask的數(shù)據(jù)壓力,key對(duì)應(yīng)的數(shù)據(jù)量都比較大的情況。100task100key肯定task中去處理,因此注定還是會(huì)發(fā)生數(shù)據(jù)傾斜的。所以這種方案只能說使用隨機(jī)key實(shí)現(xiàn)雙重聚首先,通過map算子給每個(gè)數(shù)據(jù)的key添加隨機(jī)數(shù)前綴key進(jìn)行打散,將原先一樣的key變成不一樣的key,然后進(jìn)行第一次聚合,這樣就可以讓原本被一個(gè)task處理的數(shù)據(jù)taskkey的前綴,再次進(jìn)行聚合。shuffle操作,適用范圍相對(duì)較窄。joinshufflereducejoin轉(zhuǎn)換為map正常情況下,join操作都會(huì)執(zhí)行shuffle過程,并且執(zhí)行的是reducejoin,也就是先將所keyvaluereducetaskjoin。joinshuffleshufflekey的數(shù)據(jù)拉取shufflereadtaskjoinreducejoin。RDDRDD全量數(shù)據(jù)+RDD上面使(注意,RDDRDDcollectshuffle類的操作,徹底避免數(shù)據(jù)傾斜的發(fā)生和出現(xiàn)。RDDcollectDriverBroadcast變量;RDDmapBroadcastRDDRDDkeykeyRDDshufflejoin操作可能導(dǎo)致的數(shù)據(jù)joinRDD的數(shù)據(jù)量較小時(shí),可以優(yōu)先考慮這種方SparkExecutorRDD數(shù)據(jù)量都比RDDSpark常規(guī)性能調(diào)最優(yōu)資源配Spark性能調(diào)優(yōu)的第一步,就是為任務(wù)分配的資源,在一定范圍內(nèi),增資源的分配在使用提交Spark任務(wù)時(shí)進(jìn)行指定,標(biāo)準(zhǔn)的Spark任務(wù)提交如代碼:--class ysis--driver-memory6g名稱說明 Executor Driver內(nèi)存(影響不大–executor-memoryExecutor ExecutorCPUcore對(duì)于具體資源的分配,我們分別討論Spark的兩種Cluste運(yùn)行模式:運(yùn)維部門獲取到你可以使用的資源情況,在編寫submit的時(shí)候,就根據(jù)可2CPUcore,那么就指定15Executor,每個(gè)Executor分配8G內(nèi)存,2CPUSparkYarnYarn使用資源隊(duì)列進(jìn)行資源的分配和調(diào)度,在表寫submit的時(shí)候,就根據(jù)Spark作業(yè)要提交到的資源隊(duì)列,進(jìn)行100coreExecutor8G內(nèi)存,2CPUcore。名稱解析Executor·個(gè)數(shù)在資源允許的情況下,增加Executor的個(gè)數(shù)可以提高執(zhí)行task的并行度。比4個(gè)Executor,每個(gè)Executor有2個(gè)CPUcore,那么可以并行執(zhí)行8task,如果將Executor的個(gè)數(shù)增加到8個(gè)(資源允許的情況下16task,此時(shí)的并行能力提升了一倍。ExecutorCPUcore個(gè)數(shù)在資源允許的情況下,增加每個(gè)Executor的Cpucore個(gè)數(shù),可以提高執(zhí)行task的并行度。比4個(gè)Executor,coretaskCPUcore4(資源允許的情況下16個(gè)task,Executor的內(nèi)存量在資源允許的情況下,增加每個(gè)Executor的內(nèi)存量以后,對(duì)性能的提升有三點(diǎn):1.可以緩存的數(shù)據(jù)(即對(duì)RDD進(jìn)行cache2.可以為shuffle操作提供內(nèi)存,即有空間來存放reduce端拉取的可以為task的執(zhí)行提供內(nèi)存,在task的執(zhí)行過程中可能創(chuàng)建很多對(duì)象,內(nèi)存較小時(shí)會(huì)頻繁的GC,增加內(nèi)存后,可以避免頻繁的GC,提升整體性能生產(chǎn)環(huán)境Sparksubmit配--driver-memory6g--masteryarn--deploy-modecluster--queueroot.default--master: RDD優(yōu)RDDRDDRDD進(jìn)行重復(fù)RDD在SparkRDDRDDRDDRDD的重復(fù)計(jì)RDD進(jìn)行持久化,通過持久RDD的數(shù)據(jù)緩存到內(nèi)存/RDD/RDDRDD1,RDDRDD的數(shù)據(jù)完整的進(jìn)行存放的時(shí)候,可以考慮使用序列化的方式減小數(shù)據(jù)體積,將數(shù)據(jù)完整在內(nèi)存中。RDD數(shù)據(jù)進(jìn)行持久化。當(dāng)持久化啟用了復(fù)本機(jī)制時(shí),對(duì)于持久化的每個(gè)數(shù)據(jù)RDDfilterRDD后,應(yīng)該考慮盡早地過濾掉不需要的數(shù)據(jù),進(jìn)而減少對(duì)內(nèi)Spark作業(yè)的運(yùn)行效率。并行度調(diào)Sparkstagetask20ExecutorExecutor3CPUcoreSpark40個(gè)task,Executor分配到的task2ExecutorCPUcore空閑,導(dǎo)致資源的浪費(fèi)。置并行度,可以提升整個(gè)Spark作業(yè)的性能和運(yùn)行速度。Spark推薦task數(shù)量應(yīng)該設(shè)置為Spark作業(yè)總CPUcore數(shù)量的2~3倍。有的task執(zhí)行速度快而有的task執(zhí)行速度慢,如果task數(shù)量與CPUcore總數(shù)相CPUcore2~3task執(zhí)行完畢后,CPUcoretaskSparkSpark作業(yè)并行度的設(shè)置如代碼valconf=new常規(guī)性能調(diào)優(yōu)四:廣播大變默認(rèn)情況下,tasktask都會(huì)獲取-RDD進(jìn)-GC,GCSparkSpark性能。假設(shè)當(dāng)前任務(wù)配置了20Executor,指定500task,有一個(gè)20M的變量被所有task共用,此時(shí)會(huì)在500task中產(chǎn)生500個(gè)副本,耗費(fèi)集群10G的內(nèi)Executor400M5倍。ExecutorExecutortask共用此廣eraskecr對(duì)應(yīng)的BlockManaer中嘗試獲取變量,如果本地沒有,BlockManaerierer上拉取變量的復(fù)本,并由本地的BlockManaer進(jìn)行管理;ecraskBlockMer中獲取變量。Kryo序列默認(rèn)情況下,SparkJavaJava的序列化機(jī)制使用方便,KryoJava10倍左右,Spark之所以沒有需要用戶在使用前需要序列化的類型,不夠方便,但從Spark2.0.0版本開publicclassMyKryoRegistratorimplementsKryoRegistrator{publicvoidregisterClasses(Kryo}}//SparkConfvalconf=new//使用Kryo序列化庫,如果要使用Java序列化庫,需要把該行//在Kryo序列化庫中自定義的類集合,如果要使用Java序列化庫,需要把該行掉 調(diào)節(jié)本地化等待時(shí)Spark作業(yè)運(yùn)行過程中,Driverstagetask進(jìn)行分配。根據(jù)Sparktask分配算法,Sparktask能夠運(yùn)行在它要計(jì)算的數(shù)據(jù)所在的節(jié)task分配到離它要計(jì)算的數(shù)據(jù)比較近的tasktask所在節(jié)點(diǎn)上時(shí),會(huì)發(fā)生數(shù)據(jù)的傳輸。task會(huì)BlockManager獲取數(shù)據(jù),BlockManager發(fā)現(xiàn)數(shù)據(jù)不在本地時(shí),BlockManager處獲取數(shù)據(jù)。tasktaskSpark作業(yè)的整體性能。2-3Spark本地化等級(jí)名稱解析PROCESS_LOCAL進(jìn)程本地化,taskExecutorNODE_LOCAL節(jié)點(diǎn)本地化,tasktask和數(shù)據(jù)Executor中,數(shù)據(jù)需要在進(jìn)程間進(jìn)行傳輸。RACK_LOCAL機(jī)架本地化,task和數(shù)據(jù)在同一個(gè)機(jī)架的兩個(gè)節(jié)點(diǎn)上,數(shù)據(jù)NO_PREFtaskANYtask和數(shù)據(jù)可以在集群的任何地方,而且不在一個(gè)機(jī)架中,性能最在k目開發(fā)階段,可以使用式對(duì)程序進(jìn)試,此時(shí),可以askPLN_L、N,那么需要對(duì)本地化的等待時(shí)長(zhǎng)進(jìn)行調(diào)節(jié),通過延長(zhǎng)本askrkrkvalconf=new算子調(diào)普通的map算子對(duì)RDD中的每一個(gè)元素進(jìn)行操作,而mapPartitions算子對(duì)RDD中每一個(gè)分區(qū)進(jìn)行操作。mappartition1map算子中的function1萬次,也就是對(duì)每個(gè)元素進(jìn)行操作。mapPartitiontaskRDDpartition,那RDDJDBCmap算RDD中的每一個(gè)元素都創(chuàng)建一個(gè)數(shù)據(jù)庫連接,這樣對(duì)資源的消mapPartitionsmap操作,一次處理一條數(shù)據(jù)從內(nèi)存中回收掉;但是如果使用mapPartitions算子,但數(shù)據(jù)量非常大OOM,即內(nèi)存溢出。因此mapPartitions算子適用于數(shù)據(jù)量不是特別大的時(shí)候,此時(shí)使用mapPartitions(當(dāng)數(shù)據(jù)量很大的時(shí)候,一旦mapPartitionsOOM)RDD的數(shù)據(jù)量、每個(gè)partition的數(shù)據(jù)量,以及分配給每個(gè)Executor的內(nèi)存資源,如果mapPartitionsmap。foreachPartitionforeachPartitionforeachforeachRDD的foreachPartitionmapPartitions算子非常相似,foreachPartitionRDDforeachPartitionfunctionSQL在生產(chǎn)環(huán)境中,全部都會(huì)使用foreachPartition算子完成數(shù)據(jù)庫操作。foreachPartitionmapPartitionsOOM,即內(nèi)存溢出。filtercoalesce的配合使partitionpartitiontasktask要處理的數(shù)據(jù)量不同,這很有可能導(dǎo)致數(shù)據(jù)傾斜問題。在上圖中,100800task處理的數(shù)據(jù)量與task8倍,這也會(huì)導(dǎo)致運(yùn)行速度可task進(jìn)行處理即可,避免了資源的浪費(fèi)。partitioncoalescerepartition與coalescerepartition只是coalesceshuffletrue的簡(jiǎn)易實(shí)現(xiàn),coalesceshuffle,但是可況:1.A>B(多數(shù)分區(qū)合并為少數(shù)分區(qū))ABcoalesceshuffleAB此時(shí)可以使用coalesce并且不啟用shuffle過程,但是會(huì)導(dǎo)致合并過程性coalescetrueshuffle2.AB(少數(shù)分區(qū)分解為多數(shù)分區(qū)此時(shí)使用repartition即可如果coalesce需要將shuffle設(shè)置為true,coalesce無效。總結(jié):filtercoalescepartition的repartitionSparkSQL低并行度問SparkSQLSparkSQL以外的所Spark的stage生效。SparkSQL的并行度不允許用戶自己指定,SparkSQLhive表對(duì)應(yīng)的HDFS文件的split個(gè)數(shù)自動(dòng)設(shè)置SparkSQL所在的那個(gè)stage的并spark.default.parallelismSparkSQLstage中生效。SparkSQLstage的并行度無法手動(dòng)設(shè)置,如果數(shù)據(jù)量較大,并且此stage中后續(xù)的transformation操作有著復(fù)雜的業(yè)務(wù)邏輯SparkSQL自動(dòng)設(shè)tasktask要處理為數(shù)不少的數(shù)據(jù)量,然后還要執(zhí)行非常復(fù)雜的處理邏輯,這就可能表現(xiàn)為第一個(gè)有SparkSQL的stage速度很慢,而后續(xù)的沒有SparkSQL的stage運(yùn)行速度非常快。為了解決SparkSQL無法設(shè)置并行度和taskrepartitionSparkSQLtask數(shù)量肯定是沒有辦法去改變了,但是,對(duì)SparkSQLRDDrepartition算子,去重新進(jìn)行分區(qū),這SparkSQL,因此stage的并行度就會(huì)等于你手動(dòng)設(shè)置的值,這樣就避免了SparkSQL所在的stage只能用少量的taskreduceByKey1map端的數(shù)據(jù)量變少減少了磁盤IO也減少了對(duì)磁盤空間的占用2.本地聚合后下一個(gè)stage4reduce端進(jìn)行聚合的數(shù)據(jù)量減shufflegroupByKey。gyymapmap端的數(shù)據(jù)e到educeeduceeduceyy有map端聚合的特性,使得網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量減小,因此效率要明顯高于gyy。reduceByKey預(yù)聚reduceByKeyshufflemap端個(gè)stage的每個(gè)taskmapkey對(duì)應(yīng)的value,reduceByKey算子函數(shù)。Shuffle調(diào)調(diào)節(jié)map端緩沖區(qū)大Sparkshufflemap端處理的數(shù)據(jù)量比較大,但是map端緩沖的大小是固定的,可能會(huì)出現(xiàn)map端緩沖數(shù)據(jù)頻繁spill溢寫到map端緩沖的大小,可以避免頻繁的磁盤IO操作,進(jìn)而提升Spark任務(wù)的整體性能。map端緩沖的默認(rèn)配置是32KB,如果每個(gè)task處理640KB的數(shù)據(jù),那么會(huì)發(fā)生640/32=20次溢寫,如果每個(gè)task處理64000KB的數(shù)據(jù),機(jī)會(huì)發(fā)生64000/32=2000此溢寫,這對(duì)于性能的影響是非常嚴(yán)重的。valconf=new調(diào)節(jié)reduce端拉取數(shù)據(jù)緩沖區(qū)大SparkShuffleshufflereducetaskbufferreducetask48MB,valconf=new調(diào)節(jié)reduce端拉取數(shù)據(jù)重試次SparkShuffle過程中,reducetask拉取屬于自己的數(shù)據(jù)時(shí),如果因?yàn)榫W(wǎng)絡(luò)異shuffle操作的作業(yè),建議增加重試最大次數(shù)(60次JVMfullgc或者網(wǎng)十億~上百億)shuffle過程,調(diào)節(jié)該參數(shù)可以大幅度提升穩(wěn)定性。reduce端拉取數(shù)據(jù)重試次數(shù)可以通過spark.shuffle.io.maxRetries
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 建筑服務(wù)委托協(xié)議書
- 工地勞務(wù)分包協(xié)議書
- 賓館物業(yè)租賃協(xié)議書
- 工商注冊(cè)委托協(xié)議書
- 福利保密協(xié)議書
- 封裝設(shè)備出售協(xié)議書
- 社會(huì)保密協(xié)議書
- 種樹股份協(xié)議書
- 生產(chǎn)管理協(xié)議書
- 牙冠轉(zhuǎn)診協(xié)議書
- 新疆生產(chǎn)建設(shè)兵團(tuán)2025屆七年級(jí)數(shù)學(xué)第二學(xué)期期末監(jiān)測(cè)模擬試題含解析
- 2025屆陜西省咸陽市高三模擬檢測(cè)(三)生物試題(原卷版+解析版)
- 壓力容器焊工試題及答案
- 2025年安徽省合肥市第四十二中學(xué)中考二模物理試題(含答案)
- 少先隊(duì)理論測(cè)試題及答案
- 2024年河北省臨漳縣事業(yè)單位公開招聘村務(wù)工作者筆試題帶答案
- (市質(zhì)檢)莆田市2025屆高中畢業(yè)班第四次教學(xué)質(zhì)量檢測(cè)試卷英語試卷(含答案解析)
- 環(huán)宇電子科技公司鍍膜銑刀生產(chǎn)項(xiàng)目環(huán)評(píng)資料環(huán)境影響
- 2025廣西中馬欽州產(chǎn)業(yè)園區(qū)投資控股集團(tuán)限公司招聘49人易考易錯(cuò)模擬試題(共500題)試卷后附參考答案
- 工程過賬協(xié)議合同協(xié)議
- 快手開店合同協(xié)議
評(píng)論
0/150
提交評(píng)論