Hadoop平臺搭建與應用(第2版)(微課版) 課件 項目8 Hadoop平臺應用綜合案例_第1頁
Hadoop平臺搭建與應用(第2版)(微課版) 課件 項目8 Hadoop平臺應用綜合案例_第2頁
Hadoop平臺搭建與應用(第2版)(微課版) 課件 項目8 Hadoop平臺應用綜合案例_第3頁
Hadoop平臺搭建與應用(第2版)(微課版) 課件 項目8 Hadoop平臺應用綜合案例_第4頁
Hadoop平臺搭建與應用(第2版)(微課版) 課件 項目8 Hadoop平臺應用綜合案例_第5頁
已閱讀5頁,還剩41頁未讀 繼續免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

任務8.1本地數據集上傳到數據倉庫Hive任務實施下面把test.txt中的數據導入到數據倉庫Hive中。為了完成這個操作,需要先把test.txt上傳到HDFS中,再在Hive中創建一個外部表,完成數據的導入。1.啟動HDFSHDFS是Hadoop的核心組件,因此,要想使用HDFS,必須先安裝Hadoop。這里已經安裝了Hadoop,打開一個終端,執行命令“start-all.sh”,啟動Hadoop服務,操作命令如下。[root@master1~]#start-all.sh執行命令“jps”,查看當前運行的進程,操作命令及結果如下。[root@master1~]#jps673ResourceManager503SecondaryNameNode935Jps298NameNode[root@slave1~]#jps256NodeManager388Jps140DataNode[root@slave2~]#jps243NodeManager375Jps127DataNode2.將本地文件上傳到HDFS中將本地文件test.txt上傳到HDFS中,并存儲在HDFS的/bigdatacase/dataset目錄中。在HDFS的根目錄中創建一個新的目錄bigdatacase,并在其中創建一個子目錄dataset,操作命令及結果如下。[root@master1~]#hadoopdfs-mkdir-p/bigdatacase/datasetDEPRECATED:Useofthisscripttoexecutehdfscommandisdeprecated.Insteadusethehdfscommandforit.執行命令“hadoopdfs-put/opt/test.txt/bigdatacase/dataset”,將test.txt文件上傳到HDFS的/bigdatacase/dataset目錄中,操作命令及結果如下。[root@master1~]#hadoopdfs-put/opt/test.txt/bigdatacase/datasetDEPRECATED:Useofthisscripttoexecutehdfscommandisdeprecated.Insteadusethehdfscommandforit.

執行命令“hadoopdfs-cat/bigdatacase/dataset/test.txt|head-10”,查看HDFS中的test.txt的前10條記錄,操作命令及結果如下。[root@master1~]#hadoopdfs-cat/bigdatacase/dataset/test.txt|head-10

DEPRECATED:Useofthisscripttoexecutehdfscommandisdeprecated.Insteadusethehdfscommandforit.43,2015/3/3017:38,/static/image/common/faq.gif26,2015/3/3017:38,/data/cache/style_1_widthauto.css?y7a43,2015/3/3017:38,/static/image/common/hot_1.gif43,2015/3/3017:38,/static/image/common/hot_2.gif43,2015/3/3017:38,/static/image/filetype/common.gif26,2015/3/3017:38,/source/plugin/wsh_wx/img/wsh_zk.css26,2015/3/3017:38,/data/cache/style_1_forum_index.css?y7a26,2015/3/3017:38,/source/plugin/wsh_wx/img/wx_jqr.gif43,2015/3/3017:38,/static/image/common/recommend_1.gif26,2015/3/3017:38,/static/image/common/logo.png3.在Hive中創建數據庫(1)創建數據庫和數據表執行命令“servicemysqlstart”,啟動MySQL數據庫,操作命令及結果如下。[root@master1~]#servicemysqlstartStartingMySQLSUCCESS!Hive是基于Hadoop的數據倉庫,使用HiveQL語言編寫的查詢語句,最終都會被Hive自動解析為MapReduce任務,并由Hadoop具體執行。因此,需要先啟動Hadoop服務,再啟動Hive服務,可通過執行命令“./hive”來啟動Hive服務,操作命令及結果如下。[root@master1bin]#./hive

Logginginitializedusingconfigurationinjar:file:/simple/hive1.2.1/lib/hive-common-1.2.1.jar!/pertieshive>啟動Hive服務后,執行命令“createdatabasedblab”,在Hive中創建一個數據庫dblab,操作命令及結果如下。hive>createdatabasedblab;OKTimetaken:0.592seconds創建外部表,操作命令及結果如下。hive>createexternaltabledblab.bigdata_user(ipstring,timestring,urlstring)rowformatdelimitedfieldsterminatedby','storedastextfilelocation'/bigdatacase/dataset';OKTimetaken:0.042seconds(2)查詢數據在Hive命令行模式下,執行命令“showcreatetablebigdata_user”,查看表的各種屬性,操作命令及結果如下。(文本位于頁面下方)hive>showcreatetablebigdata_user;OKCREATEEXTERNALTABLE`bigdata_user`(`ip`string,`time`string,`url`string)ROWFORMATDELIMITEDFIELDSTERMINATEDBY','STOREDASINPUTFORMAT'org.apache.hadoop.mapred.TextInputFormat'OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'LOCATION'hdfs://master1:9000/bigdatacase/dataset'TBLPROPERTIES('COLUMN_STATS_ACCURATE'='false','numFiles'='0','numRows'='-1','rawDataSize'='-1','totalSize'='0','transient_lastDdlTime'='1677831958')Timetaken:0.033seconds,Fetched:19row(s)執行命令“descbigdata_user”,查看表的簡單結構,操作命令及結果如下。hive>desc

bigdata_user;OKipstringtimestringurlstringTimetaken:0.049seconds,Fetched:3row(s)執行命令“select*frombigdata_userlimit10”,查看表的前10條數據,操作命令及結果如下。hive>select*frombigdata_userlimit10;OK432015/3/3017:38/static/image/common/faq.gif262015/3/3017:38/data/cache/style_1_widthauto.css?y7a432015/3/3017:38/static/image/common/hot_1.gif432015/3/3017:38/static/image/common/hot_2.gif432015/3/3017:38/static/image/filetype/common.gif262015/3/3017:38/source/plugin/wsh_wx/img/wsh_zk.css262015/3/3017:38/data/cache/style_1_forum_index.css?y7a262015/3/3017:38/source/plugin/wsh_wx/img/wx_jqr.gif432015/3/3017:38/static/image/common/recommend_1.gif262015/3/3017:38/static/image/common/logo.pngTimetaken:0.046seconds,Fetched:10row(s)8.2

使用Hive進行簡單的數據分析任務實施1.簡單查詢分析執行命令“selectipfrombigdata_userlimit10”,查詢前10條記錄的ip,操作命令及結果如下。hive>selectipfrombigdata_userlimit10;OK43264343432626264326Timetaken:0.064seconds,Fetched:10row(s)2.查詢前20條記錄的ip和time執行命令“selectip,timefrombigdata_userlimit20”,查詢前20條記錄的ip和time,操作命令及結果如下。3.使用聚合函數count()統計表中的數據執行命令“selectcount(*)frombigdata_user”,統計表中的數據,操作命令及結果如下。8.3

Hive、MySQL、HBase數據的互導任務實施1.Hive預操作創建臨時表user_action,操作命令及結果如下。hive>createexternaltableuser_action(ipstring,timestring,urlstring)rowformatdelimitedfieldsterminatedby','storedastextfile;OKTimetaken:0.054seconds創建完成后,Hive會自動在HDFS中創建對應的數據文件/user/hive/warehouse/dbalb.db/user_action。執行命令“hadoopdfs-ls/user/hive/warehouse/dblab.db/”,在HDFS中查看創建的user_action表,操作命令及結果如下。[root@master1/]#hadoopdfs-ls/user/hive/warehouse/dblab.db/

DEPRECATED:Useofthisscripttoexecutehdfscommandisdeprecated.Insteadusethehdfscommandforit.Found1itemsdrwxr-xr-x-rootsupergroup02023-03-0316:32/user/hive/warehouse/dblab.db/user_action2.數據導入操作在HiveShell模式下執行命令“insertoverwritetabledblab.user_actionselect*fromdblab.bigdata_user”,將bigdata_user表中的數據導入到user_action表中,操作命令如下。hive>insertoverwritetabledblab.user_actionselect*fromdblab.bigdata_user;執行命令“select*fromuser_actionlimit10”,查詢表的前10條記錄,操作命令及結果如下。3.使用Sqoop將數據從Hive導入到MySQL中登錄MySQL,在dblab數據庫中創建與Hive對應的user_action表,并設置其編碼格式為UTF-8,操作命令及結果如下。mysql>usedblab;Databasechangedmysql>createtableuser_action(->ipvarchar(50),->timevarchar(50),->urlvarchar(255))->ENGINE=InnoDBDEFAULTCHARSET=utf8;QueryOK,0rowsaffected(0.00sec)退出MySQL,進入Sqoop的bin目錄,導入數據,操作命令如下。[root@master1bin]#./sqoopexport--connectjdbc:mysql://master1:3306/dblab--usernameroot--password123456--tableuser_action--export-dir/user/hive/warehouse/dblab.db/user_action--input-fields-terminated-by',';使用root用戶登錄MySQL,查看已經從Hive導入到MySQL中的數據,操作命令及結果如下。4.使用Sqoop將數據從MySQL導入到HBase中啟動Hadoop集群和HBase服務,并查看集群節點進程,操作命令及結果如下。[root@master1bin]#start-all.sh[root@master1bin]#./zkServer.shstart[root@master1bin]#./start-hbase.shmaster1節點的進程如下。[root@master1bin]#jps1714SecondaryNameNode4437Jps3207HMaster1514NameNode1883ResourceManager3358HRegionServer3039QuorumPeerMainslave1節點的進程如下。[root@slave1bin]#jps577NodeManager786QuorumPeerMain1811Jps854HRegionServer473DataNodeslave2節點的進程如下。[root@slave2bin]#jps578NodeManager3154Jps1028QuorumPeerMain474DataNode1102HRegionServer進入HBaseShell,操作命令及結果如下。在HBase中創建user_action表,操作命令及結果如下。hbase(main):001:0>create'user_action',{NAME=>'f1',VERSION=>5}0row(s)in1.4250seconds新建一個終端,導入數據,操作命令如下。[root@master1bin]#./sqoopimport--connectjdbc:mysql://localhost:3306/dblab?zeroDateTimeBehavior=ROUND--usernameroot--password123456--tableuser_action--hbase-tableuser_action--column-familyf1--hbase-row-keyip-m1再次切換到HBaseShell運行的終端窗口,執行命令“scan'user_action'”,查詢插入的數據,如圖所示。5.利用HBase-thrift庫將數據導入到HBase中首先,使用“pip”命令安裝最新版的HBase-thrift庫,操作命令如下。[root@master1bin]#pipinstallhbase-thrift其次,在hbase/bin目錄下啟動thrift相關命令,開啟9095端口,操作命令如下。[root@master1bin]#hbase-daemon.shstartthrift--infoport9095-p9090查看9095端口,操作命令及結果如下所示。[root@master1~]#netstat-anp|grep9095tcp00:9095:*LISTEN1802/java執行“jps”命令,查看進程運行情況,操作命令及結果如下。[root@master1~]#jps501SecondaryNameNode1125HMaster280NameNode1802ThriftServer1292HRegionServer942QuorumPeerMain1886Jps671ResourceManager在HBase中創建ip_info表,表中包含兩個列族分別為:ip、url,查看創建的表,操作命令及結果如下。hbase(main):004:0>create‘ip_info’,’ip’,’url’

=>["ip_info","user_action"]hbase(main):005:0>desc'ip_info'Tableip_infoisENABLEDip_infoCOLUMNFAMILIESDESCRIPTION{NAME=>'ip',BLOOMFILTER=>'ROW',VERSIONS=>'1',IN_MEMORY=>'false',KEEP_DELETED_CELLS=>'FALSE',DATA_BLOCK_ENCODING=>'NONE',TTL=>'FOREVER',COMPRESSION=>'NONE',MIN_VERSIONS=>'0',BLOCKCACHE=>'true',BLOCKSIZE=>'65536',REPLICATION_SCOPE=>'0'}{NAME=>'url',BLOOMFILTER=>'ROW',VERSIONS=>'1',IN_MEMORY=>'false',KEEP_DELETED_CELLS=>'FALSE',DATA_BLOCK_ENCODING=>'NONE',TTL=>'FOREVER',COMPRESSION=>'NONE',MIN_VERSIONS=>'0',BLOCKCACHE=>'true',BLOCKSIZE=>'65536',REPLICATION_SCOPE=>'0'}2row(s)in0.0820seconds

使用Python編程將本地數據導入到HBase中,程序代碼如下。切換到HBaseShell運行窗口,查詢ip_info中插入的3條數據,如圖所示。任務8.4

流數據處理的簡單應用任務實施在進行操作前,需確保Hadoop、Zookeeper和Kafka集群已啟動。1.Telnet工具的安裝與使用(1)通過yum命令安裝Telnet和Netcat工具,操作命令如下。[root@master1~]#yuminstall-ytelnet[root@master1~]#yuminstall-ync(2)Netcat工具的使用測試,開啟一個本地7777的TCP協議端口,等待客戶端發起連接,操作命令及結果如下。[root@master1~]#nc-lk7777新打開一個終端窗口,執行“telnet”命令,進入telnet客戶端命令模式,操作命令及結果如下。[root@master1~]#telnet7777Trying...Connectedto.Escapecharacteris'^]'.在telnet客戶端命令模式下輸入如下信息,向目標服務器發送“test”和“txt”信息,若要退出命令發送模式,直接按'^]'即可,然后執行quit退出telnet客戶端,輸入信息如下。testtxt在“nc”監聽窗口監聽到打印數據代表測試成功,操作結果如下。[root@master1~]#nc-lk7777

testTxt如果Netcat順利監聽到數據,即可終止上述兩個命令完成測試。2.Kafka主題的創建與查看(1)創建wordcount主題,分區數設為4,副本數為1,操作命令及結果如下。[root@master1bin]#./kafka-topics.sh--create--zookeepermaster1:2181,slave1:2181,slave2:2181--partitions4--replication-factor1--topicwordcountCreatedtopic"wordcount"(2)使用"--list"命令查看已經創建好的主題列表,操作命令及結果如下。[root@master1bin]#./kafka-topics.sh--list--zookeepermaster1:2181,slave1:2181,slave2:2181__consumer_offsetstestwordcount(3)通過“--describe--topicwordcount”,可以查看wordcount主題的具體信息,操作命令及結果如下。[root@master1bin]#./kafka-topics.sh--describe--zookeepermaster1:2181,slave1:2181,slave2:2181--topicwordcountTopic:wordcount

PartitionCount:4ReplicationFactor:1Configs:Topic:wordcountPartition:0Leader:1Replicas:1Isr:1Topic:wordcountPartition:1Leader:2Replicas:2Isr:2Topic:wordcountPartition:2Leader:0Replicas:0Isr:0Topic:wordcountPartition:3Leader:1Replicas:1Isr:13.Flume配置與啟動(1)切換到Flume的conf目錄下,操作命令如下。[root@master1~]#cd/usr/local/flume/conf/(2)新建并編輯一個配置文件nc_wordcount.conf,操作命令及配置文件如下。[root@master1conf]#vinc_wordcount.confa1.sources=s1a1.channels=c1a1.sources.s1.type=netcata1.sources.s1.bind=localhosta1.sources.s1.port=7777a1.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannela1.channels.c1.kafka.bootstrap.servers=master1:9092,slave1:9092,slave2:9092a1.channels.c1.kafka.topic=wordcounta1.channels.c1.parseAsFlumeEvent=false

a1.sources.s1.channels=c1(3)啟動Flume(操作窗口請不要關掉),并指定配置文件為nc_wordcount.conf,操作命令及部分結果如下。[root@master1flume]#flume-ngagent-na1-cconf--f./conf/nc_wordcount.conf-Dflume.root.logger=info,console.......2023-03-2714:13:58,868(lifecycleSupervisor-1-0)[INFO-org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)]Componenttype:CHANNEL,name:c1started2023-03-2714:13:58,869(conf-file-poller-0)[INFO-org.apache.flume.node.Application.startAllComponents(Application.java:182)]StartingSources12023-03-2714:13:58,872(lifecycleSupervisor-1-1)[INFO-org.apache.flume.source.NetcatSource.start(NetcatSource.java:155)]Sourcestarting2023-03-2714:13:58,885(lifecycleSupervisor-1-1)[INFO-org.apache.flume.source.NetcatSource.start(NetcatSource.java:169)]CreatedserverSocket:sun.nio.ch.ServerSocketChannelImpl[/:7777]4.數據流寫入Kafka(1)新開一個終端窗口,通過“telnet7777”連接對應端口,并向端口寫入一些單詞數據,操作命令及結果如下。[root@master1~]#telnet7777Trying...Connectedto.Escapecharacteris'^]'.helloworldOKhelloscalaOKhelloworldOKhelloworldOK(2)再次打開一個終端窗口,創建Kafka消費者用于消費“wordcount”主題,操作命令及結果如下。[root@master1bin]#./kafka-console-consumer.sh--bootstrap-servermaster1:9092,slave2:9092,slave2:9092--topicwordcount--from-beginninghelloworldhelloscalahelloworldhelloworld5.Flink編程(1)在IDEA中創建一個maven項目,命名為FlinkWordCount。需要在IDEA中安裝Scala插件,并勾選起用。(2)在項目的src/main下新建一個Directory,命名為scala,選中并右擊,選擇”MarkDirectoryas“,在二級菜單下選擇SourcesRoot。(3)在項目名上,右擊選擇OpenModuleSettings,在彈出的頁面中選擇Modules,單擊+號,選擇Scala,若本地沒有scala,則單擊Create...,再單擊Download...,選擇scala版本2.11.0,單擊OK。(4)編輯pom.xml文件,編輯文件內容如下。

<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="/POM/4.0.0"

xmlns:xsi="/2001/XMLSchema-instance"

xsi:schemaLocation="/POM/4.0.0/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkWordCount</artifactId><version>1.0-SNAPSHOT</version><properties><piler.source>8</piler.source><piler.target>8</piler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.10.0</version></dependency></dependencies></project>(5)在scala文件夾下,新建scala文件,選擇Object型,命名為WordCount,編寫代碼如下。importorg.apache.flink.api.common.serialization.SimpleStringSchemaimportorg.apache.flink.streaming.api.TimeCharacteristicimportorg.apache.flink.streaming.api.scala._importorg.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,

FlinkKafkaProducer}importorg.apache.kafka.clients.consumer.ConsumerConfigimportjava.util.PropertiesobjectWordCount{defmain(args:Array[String]):Unit={//創建流處理環境valenv=StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)//設置kafka連接屬性配置valproperties=newProperties()properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master1:9092,slave1:9092,slave2:9092")//kafka集群properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest")//從主題的第一條消息開始讀取//鍵值反序列化配置properties.setProperty

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論